You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by za...@apache.org on 2021/06/01 23:15:18 UTC

[druid] branch master updated: Fix Index hadoop failing with index.zip is not a valid DFS filename (#11316)

This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 27f1b6c  Fix Index hadoop failing with index.zip is not a valid DFS filename (#11316)
27f1b6c is described below

commit 27f1b6cbf377fb9698076d0eefb3f72545efb6ed
Author: zachjsh <za...@gmail.com>
AuthorDate: Tue Jun 1 19:14:50 2021 -0400

    Fix Index hadoop failing with index.zip is not a valid DFS filename (#11316)
    
    * * Fix bug
    
    * * simplify class loading
    
    * * fix example configs for integration tests
    
    * Small classloader cleanup
    
    Co-authored-by: jon-wei <jo...@imply.io>
---
 .../java/org/apache/druid/indexer/JobHelper.java   |  4 +--
 .../indexing/common/task/HadoopIndexTask.java      | 35 ++++++++--------------
 .../override-examples/hadoop/s3_to_hdfs            |  2 +-
 .../override-examples/hadoop/s3_to_s3              |  2 +-
 4 files changed, 16 insertions(+), 27 deletions(-)

diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
index c680d27..7e0ab4c 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
@@ -476,8 +476,8 @@ public class JobHelper
 
     return new DataSegmentAndIndexZipFilePath(
         finalSegment,
-        tmpPath.toUri().getPath(),
-        finalIndexZipFilePath.toUri().getPath()
+        tmpPath.toUri().toString(),
+        finalIndexZipFilePath.toUri().toString()
     );
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 37ffb4c..a163905 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -450,16 +450,11 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
         List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths();
         if (dataSegmentAndIndexZipFilePaths != null) {
           indexGeneratorJobSuccess = true;
-          try {
-            Thread.currentThread().setContextClassLoader(oldLoader);
-            renameSegmentIndexFilesJob(
-                toolbox.getJsonMapper().writeValueAsString(indexerSchema),
-                toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
-            );
-          }
-          finally {
-            Thread.currentThread().setContextClassLoader(loader);
-          }
+          renameSegmentIndexFilesJob(
+              toolbox.getJsonMapper().writeValueAsString(indexerSchema),
+              toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
+          );
+
           ArrayList<DataSegment> segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream()
                                                                                            .map(
                                                                                                DataSegmentAndIndexZipFilePath::getSegment)
@@ -545,22 +540,20 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     }
   }
 
+  /**
+   * Must be called only when the hadoopy classloader is the current classloader
+   */
   private void renameSegmentIndexFilesJob(
       String hadoopIngestionSpecStr,
       String dataSegmentAndIndexZipFilePathListStr
   )
   {
-    final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+    final ClassLoader loader = Thread.currentThread().getContextClassLoader();
     try {
-      ClassLoader loader = HadoopTask.buildClassLoader(
-          getHadoopDependencyCoordinates(),
-          taskConfig.getDefaultHadoopCoordinates()
-      );
-
-      Object renameSegmentIndexFilesRunner = getForeignClassloaderObject(
-          "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner",
-          loader
+      final Class<?> clazz = loader.loadClass(
+          "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner"
       );
+      Object renameSegmentIndexFilesRunner = clazz.newInstance();
 
       String[] renameSegmentIndexFilesJobInput = new String[]{
           hadoopIngestionSpecStr,
@@ -573,7 +566,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
           renameSegmentIndexFilesJobInput.getClass()
       );
 
-      Thread.currentThread().setContextClassLoader(loader);
       renameSegmentIndexFiles.invoke(
           renameSegmentIndexFilesRunner,
           new Object[]{renameSegmentIndexFilesJobInput}
@@ -582,9 +574,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     catch (Exception e) {
       throw new RuntimeException(e);
     }
-    finally {
-      Thread.currentThread().setContextClassLoader(oldLoader);
-    }
   }
 
   private void indexerGeneratorCleanupJob(
diff --git a/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_hdfs b/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_hdfs
index cd70973..b16500a 100644
--- a/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_hdfs
+++ b/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_hdfs
@@ -31,4 +31,4 @@ AWS_REGION=<OVERRIDE_THIS>
 
 druid_extensions_loadList=["druid-s3-extensions","druid-hdfs-storage"]
 
-druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]
+druid_indexer_task_defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]
diff --git a/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_s3 b/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_s3
index 4ad6896..60dd856 100644
--- a/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_s3
+++ b/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_s3
@@ -32,4 +32,4 @@ AWS_REGION=<OVERRIDE_THIS>
 
 druid_extensions_loadList=["druid-s3-extensions","druid-hdfs-storage"]
 
-druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]
+druid_indexer_task_defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org