You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/26 21:46:16 UTC
[incubator-pinot] 01/01: Fixing hadoop plugins jar copy issue
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch fixing_hadoop_copy_plugin_dir
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4e21e1fb072cec417e01473dc1e8e2b3a6f86b08
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Jan 26 13:45:33 2021 -0800
Fixing hadoop plugins jar copy issue
---
.../batch/hadoop/HadoopSegmentGenerationJobRunner.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index 59beadc..7efa405 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.ingestion.batch.hadoop;
+import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
import static org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
import static org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY_NAME;
@@ -167,6 +168,8 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
outputDirFS.mkdir(stagingInputDir.toUri());
Path stagingSegmentTarUri = new Path(stagingDirURI.toString(), SEGMENT_TAR_DIR);
outputDirFS.mkdir(stagingSegmentTarUri.toUri());
+ Path stagingPluginDir = new Path(stagingDirURI.toString(), PINOT_PLUGINS_DIR);
+ outputDirFS.mkdir(stagingPluginDir.toUri());
//Get list of files to process
String[] files = inputDirFS.listFiles(inputDirURI, true);
@@ -240,7 +243,7 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
// In order to ensure pinot plugins would be loaded to each worker, this method
// tars entire plugins directory and set this file into Distributed cache.
// Then each mapper job will untar the plugin tarball, and set system properties accordingly.
- packPluginsToDistributedCache(job);
+ packPluginsToDistributedCache(job, outputDirFS, stagingPluginDir);
// Add dependency jars
if (_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) {
@@ -285,7 +288,8 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
return HadoopSegmentCreationMapper.class;
}
- protected void packPluginsToDistributedCache(Job job) {
+ protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, Path stagingPluginDir)
+ throws Exception {
File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir());
if (pluginsRootDir.exists()) {
File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
@@ -295,7 +299,9 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
LOGGER.error("Failed to tar plugins directory", e);
throw new RuntimeException(e);
}
- job.addCacheArchive(pluginsTarGzFile.toURI());
+ final URI stagingPluginDirURI = stagingPluginDir.toUri();
+ outputDirFS.copyFromLocalFile(pluginsTarGzFile, stagingPluginDirURI);
+ job.addCacheArchive(stagingPluginDirURI);
String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME);
if (pluginsIncludes != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org