You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2021/04/22 22:34:03 UTC

[druid] branch master updated: Revert "Adjust HadoopIndexTask temp segment renaming to avoid potential race conditions (#11075)" (#11151)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 49a9c3f  Revert "Adjust HadoopIndexTask temp segment renaming to avoid potential race conditions (#11075)" (#11151)
49a9c3f is described below

commit 49a9c3ffb7b2da3401696d583bc2cd52e83f77bf
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Thu Apr 22 15:33:27 2021 -0700

    Revert "Adjust HadoopIndexTask temp segment renaming to avoid potential race conditions (#11075)" (#11151)
    
    This reverts commit a2892d9c40793027ba8a8977c85b3de4a949e11c.
---
 indexing-hadoop/pom.xml                            |  15 -
 .../indexer/DataSegmentAndIndexZipFilePath.java    |  97 ----
 .../org/apache/druid/indexer/FileSystemHelper.java |  38 --
 .../HadoopDruidDetermineConfigurationJob.java      |   7 +-
 .../druid/indexer/HadoopDruidIndexerJob.java       |  13 +-
 .../apache/druid/indexer/IndexGeneratorJob.java    |  20 +-
 .../java/org/apache/druid/indexer/JobHelper.java   | 105 ++---
 .../druid/indexer/MetadataStorageUpdaterJob.java   |   4 +-
 .../druid/indexer/BatchDeltaIngestionTest.java     |  10 +-
 .../DataSegmentAndIndexZipFilePathTest.java        | 185 --------
 .../druid/indexer/HadoopDruidIndexerJobTest.java   |  76 ----
 .../druid/indexer/IndexGeneratorJobTest.java       |  16 +-
 .../druid/indexer/JobHelperPowerMockTest.java      | 216 ---------
 .../indexer/MetadataStorageUpdaterJobTest.java     |  82 ----
 .../indexing/common/task/HadoopIndexTask.java      | 494 ++++++---------------
 integration-tests/pom.xml                          |   4 -
 .../apache/druid/cli/CliInternalHadoopIndexer.java |   7 +-
 17 files changed, 212 insertions(+), 1177 deletions(-)

diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 8eacc7e..9557ab5 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -202,21 +202,6 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-core</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-module-junit4</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-api-easymock</artifactId>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java
deleted file mode 100644
index e12f7fb..0000000
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.timeline.DataSegment;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * holds a {@link DataSegment} with the temporary file path where the corresponding index zip file is currently stored
- * and the final path where the index zip file should eventually be moved to.
- * see {@link JobHelper#renameIndexFilesForSegments(HadoopIngestionSpec, List)}
- */
-public class DataSegmentAndIndexZipFilePath
-{
-  private final DataSegment segment;
-  private final String tmpIndexZipFilePath;
-  private final String finalIndexZipFilePath;
-
-  @JsonCreator
-  public DataSegmentAndIndexZipFilePath(
-      @JsonProperty("segment") DataSegment segment,
-      @JsonProperty("tmpIndexZipFilePath") String tmpIndexZipFilePath,
-      @JsonProperty("finalIndexZipFilePath") String finalIndexZipFilePath
-  )
-  {
-    this.segment = segment;
-    this.tmpIndexZipFilePath = tmpIndexZipFilePath;
-    this.finalIndexZipFilePath = finalIndexZipFilePath;
-  }
-
-  @JsonProperty
-  public DataSegment getSegment()
-  {
-    return segment;
-  }
-
-  @JsonProperty
-  public String getTmpIndexZipFilePath()
-  {
-    return tmpIndexZipFilePath;
-  }
-
-  @JsonProperty
-  public String getFinalIndexZipFilePath()
-  {
-    return finalIndexZipFilePath;
-  }
-
-  @Override
-  public boolean equals(Object o)
-  {
-    if (o instanceof DataSegmentAndIndexZipFilePath) {
-      DataSegmentAndIndexZipFilePath that = (DataSegmentAndIndexZipFilePath) o;
-      return segment.equals(((DataSegmentAndIndexZipFilePath) o).getSegment())
-          && tmpIndexZipFilePath.equals(that.getTmpIndexZipFilePath())
-          && finalIndexZipFilePath.equals(that.getFinalIndexZipFilePath());
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return Objects.hash(segment.getId(), tmpIndexZipFilePath);
-  }
-
-  @Override
-  public String toString()
-  {
-    return "DataSegmentAndIndexZipFilePath{" +
-           "segment=" + segment +
-           ", tmpIndexZipFilePath=" + tmpIndexZipFilePath +
-           ", finalIndexZipFilePath=" + finalIndexZipFilePath +
-           '}';
-  }
-}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java
deleted file mode 100644
index 96fde6b..0000000
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * This class exists for testing purposes, see {@link JobHelperPowerMockTest}. Using the
- * raw {@link FileSystem} class resulted in errors with java assist.
- */
-public class FileSystemHelper
-{
-  public static FileSystem get(URI uri, Configuration conf) throws IOException
-  {
-    return FileSystem.get(uri, conf);
-  }
-}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index ea37db1..8b5b4b6 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -59,12 +59,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
     if (config.isDeterminingPartitions()) {
       job = createPartitionJob(config);
       config.setHadoopJobIdFileName(hadoopJobIdFile);
-      boolean jobSucceeded = JobHelper.runSingleJob(job);
-      JobHelper.maybeDeleteIntermediatePath(
-          jobSucceeded,
-          config.getSchema()
-      );
-      return jobSucceeded;
+      return JobHelper.runSingleJob(job, config);
     } else {
       final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
       final int shardsPerInterval;
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
index 58977ad..25683f3 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexer;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -39,7 +40,7 @@ public class HadoopDruidIndexerJob implements Jobby
   @Nullable
   private IndexGeneratorJob indexJob;
   @Nullable
-  private volatile List<DataSegmentAndIndexZipFilePath> publishedSegmentAndIndexZipFilePaths = null;
+  private volatile List<DataSegment> publishedSegments = null;
   @Nullable
   private String hadoopJobIdFile;
 
@@ -90,14 +91,14 @@ public class HadoopDruidIndexerJob implements Jobby
           @Override
           public boolean run()
           {
-            publishedSegmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
+            publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
             return true;
           }
         }
     );
 
     config.setHadoopJobIdFileName(hadoopJobIdFile);
-    return JobHelper.runJobs(jobs);
+    return JobHelper.runJobs(jobs, config);
   }
 
   @Override
@@ -121,12 +122,12 @@ public class HadoopDruidIndexerJob implements Jobby
     return indexJob.getErrorMessage();
   }
 
-  public List<DataSegmentAndIndexZipFilePath> getPublishedSegmentAndIndexZipFilePaths()
+  public List<DataSegment> getPublishedSegments()
   {
-    if (publishedSegmentAndIndexZipFilePaths == null) {
+    if (publishedSegments == null) {
       throw new IllegalStateException("Job hasn't run yet. No segments have been published yet.");
     }
-    return publishedSegmentAndIndexZipFilePaths;
+    return publishedSegments;
   }
 
   public void setHadoopJobIdFile(String hadoopJobIdFile)
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index 9124b9b..a12e765 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -102,14 +102,14 @@ public class IndexGeneratorJob implements Jobby
 {
   private static final Logger log = new Logger(IndexGeneratorJob.class);
 
-  public static List<DataSegmentAndIndexZipFilePath> getPublishedSegmentAndIndexZipFilePaths(HadoopDruidIndexerConfig config)
+  public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
   {
     final Configuration conf = JobHelper.injectSystemProperties(new Configuration(), config);
     config.addJobProperties(conf);
 
     final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER;
 
-    ImmutableList.Builder<DataSegmentAndIndexZipFilePath> publishedSegmentAndIndexZipFilePathsBuilder = ImmutableList.builder();
+    ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
 
     final Path descriptorInfoDir = config.makeDescriptorInfoDir();
 
@@ -117,9 +117,9 @@ public class IndexGeneratorJob implements Jobby
       FileSystem fs = descriptorInfoDir.getFileSystem(conf);
 
       for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
-        final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegmentAndIndexZipFilePath.class);
-        publishedSegmentAndIndexZipFilePathsBuilder.add(segmentAndIndexZipFilePath);
-        log.info("Adding segment %s to the list of published segments", segmentAndIndexZipFilePath.getSegment().getId());
+        final DataSegment segment = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegment.class);
+        publishedSegmentsBuilder.add(segment);
+        log.info("Adding segment %s to the list of published segments", segment.getId());
       }
     }
     catch (FileNotFoundException e) {
@@ -133,9 +133,9 @@ public class IndexGeneratorJob implements Jobby
     catch (IOException e) {
       throw new RuntimeException(e);
     }
-    List<DataSegmentAndIndexZipFilePath> publishedSegmentAndIndexZipFilePaths = publishedSegmentAndIndexZipFilePathsBuilder.build();
+    List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
 
-    return publishedSegmentAndIndexZipFilePaths;
+    return publishedSegments;
   }
 
   private final HadoopDruidIndexerConfig config;
@@ -809,7 +809,7 @@ public class IndexGeneratorJob implements Jobby
             0
         );
 
-        final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = JobHelper.serializeOutIndex(
+        final DataSegment segment = JobHelper.serializeOutIndex(
             segmentTemplate,
             context.getConfiguration(),
             context,
@@ -831,7 +831,7 @@ public class IndexGeneratorJob implements Jobby
             HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
         );
 
-        Path descriptorPath = config.makeDescriptorInfoPath(segmentAndIndexZipFilePath.getSegment());
+        Path descriptorPath = config.makeDescriptorInfoPath(segment);
         descriptorPath = JobHelper.prependFSIfNullScheme(
             FileSystem.get(
                 descriptorPath.toUri(),
@@ -842,7 +842,7 @@ public class IndexGeneratorJob implements Jobby
         log.info("Writing descriptor to path[%s]", descriptorPath);
         JobHelper.writeSegmentDescriptor(
             config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()),
-            segmentAndIndexZipFilePath,
+            segment,
             descriptorPath,
             context
         );
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
index b8c29b8..7d99d03 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
@@ -386,13 +386,29 @@ public class JobHelper
     }
   }
 
-  public static boolean runSingleJob(Jobby job)
+  public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config)
   {
     boolean succeeded = job.run();
+
+    if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) {
+      if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
+        Path workingPath = config.makeIntermediatePath();
+        log.info("Deleting path[%s]", workingPath);
+        try {
+          Configuration conf = injectSystemProperties(new Configuration(), config);
+          config.addJobProperties(conf);
+          workingPath.getFileSystem(conf).delete(workingPath, true);
+        }
+        catch (IOException e) {
+          log.error(e, "Failed to cleanup path[%s]", workingPath);
+        }
+      }
+    }
+
     return succeeded;
   }
 
-  public static boolean runJobs(List<Jobby> jobs)
+  public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config)
   {
     boolean succeeded = true;
     for (Jobby job : jobs) {
@@ -402,33 +418,25 @@ public class JobHelper
       }
     }
 
-    return succeeded;
-  }
-
-  public static void maybeDeleteIntermediatePath(
-      boolean jobSucceeded,
-      HadoopIngestionSpec indexerSchema)
-  {
-    HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema);
-    final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config);
-    config.addJobProperties(configuration);
-    JobHelper.injectDruidProperties(configuration, config);
     if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) {
-      if (jobSucceeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
+      if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
         Path workingPath = config.makeIntermediatePath();
         log.info("Deleting path[%s]", workingPath);
         try {
-          config.addJobProperties(configuration);
-          workingPath.getFileSystem(configuration).delete(workingPath, true);
+          Configuration conf = injectSystemProperties(new Configuration(), config);
+          config.addJobProperties(conf);
+          workingPath.getFileSystem(conf).delete(workingPath, true);
         }
         catch (IOException e) {
           log.error(e, "Failed to cleanup path[%s]", workingPath);
         }
       }
     }
+
+    return succeeded;
   }
 
-  public static DataSegmentAndIndexZipFilePath serializeOutIndex(
+  public static DataSegment serializeOutIndex(
       final DataSegment segmentTemplate,
       final Configuration configuration,
       final Progressable progressable,
@@ -474,16 +482,20 @@ public class JobHelper
         .withSize(size.get())
         .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase));
 
-    return new DataSegmentAndIndexZipFilePath(
-        finalSegment,
-        tmpPath.toUri().getPath(),
-        finalIndexZipFilePath.toUri().getPath()
-    );
+    if (!renameIndexFiles(outputFS, tmpPath, finalIndexZipFilePath)) {
+      throw new IOE(
+          "Unable to rename [%s] to [%s]",
+          tmpPath.toUri().toString(),
+          finalIndexZipFilePath.toUri().toString()
+      );
+    }
+
+    return finalSegment;
   }
 
   public static void writeSegmentDescriptor(
       final FileSystem outputFS,
-      final DataSegmentAndIndexZipFilePath segmentAndPath,
+      final DataSegment segment,
       final Path descriptorPath,
       final Progressable progressable
   )
@@ -499,12 +511,9 @@ public class JobHelper
             try {
               progressable.progress();
               if (outputFS.exists(descriptorPath)) {
-                // If the descriptor path already exists, don't overwrite, and risk clobbering it.
-                // If it already exists, it means that the segment data is already written to the
-                // tmp path, and the existing descriptor written should give us the information we
-                // need to rename the segment index to final path and publish it in the top level task.
-                log.info("descriptor path [%s] already exists, not overwriting", descriptorPath);
-                return -1;
+                if (!outputFS.delete(descriptorPath, false)) {
+                  throw new IOE("Failed to delete descriptor at [%s]", descriptorPath);
+                }
               }
               try (final OutputStream descriptorOut = outputFS.create(
                   descriptorPath,
@@ -512,7 +521,7 @@ public class JobHelper
                   DEFAULT_FS_BUFFER_SIZE,
                   progressable
               )) {
-                HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segmentAndPath);
+                HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segment);
               }
             }
             catch (RuntimeException | IOException ex) {
@@ -623,39 +632,7 @@ public class JobHelper
   }
 
   /**
-   * Renames the index files for the segments. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
-   * which will not overwrite. Note: segments should be renamed in the index task, not in a hadoop job, as race
-   * conditions between job retries can cause the final segment index file path to get clobbered.
-   *
-   * @param indexerSchema  the hadoop ingestion spec
-   * @param segmentAndIndexZipFilePaths the list of segments with their currently stored tmp path and the final path
-   *                                    that they should be renamed to.
-   */
-  public static void renameIndexFilesForSegments(
-      HadoopIngestionSpec indexerSchema,
-      List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths
-  ) throws IOException
-  {
-    HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema);
-    final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config);
-    config.addJobProperties(configuration);
-    JobHelper.injectDruidProperties(configuration, config);
-    for (DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath : segmentAndIndexZipFilePaths) {
-      Path tmpPath = new Path(segmentAndIndexZipFilePath.getTmpIndexZipFilePath());
-      Path finalIndexZipFilePath = new Path(segmentAndIndexZipFilePath.getFinalIndexZipFilePath());
-      final FileSystem outputFS = FileSystemHelper.get(finalIndexZipFilePath.toUri(), configuration);
-      if (!renameIndexFile(outputFS, tmpPath, finalIndexZipFilePath)) {
-        throw new IOE(
-            "Unable to rename [%s] to [%s]",
-            tmpPath.toUri().toString(),
-            finalIndexZipFilePath.toUri().toString()
-        );
-      }
-    }
-  }
-
-  /**
-   * Rename the file. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
+   * Rename the files. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
    * which will not overwrite
    *
    * @param outputFS              The output fs
@@ -664,7 +641,7 @@ public class JobHelper
    *
    * @return False if a rename failed, true otherwise (rename success or no rename needed)
    */
-  private static boolean renameIndexFile(
+  private static boolean renameIndexFiles(
       final FileSystem outputFS,
       final Path indexZipFilePath,
       final Path finalIndexZipFilePath
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java
index bdadc95..b7eb60b 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexer;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  */
@@ -43,8 +42,7 @@ public class MetadataStorageUpdaterJob implements Jobby
   @Override
   public boolean run()
   {
-    final List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
-    final List<DataSegment> segments = segmentAndIndexZipFilePaths.stream().map(s -> s.getSegment()).collect(Collectors.toList());
+    final List<DataSegment> segments = IndexGeneratorJob.getPublishedSegments(config);
     final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable();
     handler.publishSegments(segmentTable, segments, HadoopDruidIndexerConfig.JSON_MAPPER);
 
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
index 913c648..404d5ed 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
@@ -372,15 +372,7 @@ public class BatchDeltaIngestionTest
   ) throws Exception
   {
     IndexGeneratorJob job = new IndexGeneratorJob(config);
-    Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job)));
-
-    List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths =
-        IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
-    JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths);
-
-    JobHelper.maybeDeleteIntermediatePath(true, config.getSchema());
-    File workingPath = new File(config.makeIntermediatePath().toUri().getPath());
-    Assert.assertFalse(workingPath.exists());
+    Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config));
 
     File segmentFolder = new File(
         StringUtils.format(
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
deleted file mode 100644
index 3dcd203..0000000
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer;
-
-import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.partition.NumberedShardSpec;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class DataSegmentAndIndexZipFilePathTest
-{
-  private static final SegmentId SEGMENT_ID = SegmentId.dummy("data-source", 1);
-  private static final SegmentId OTHER_SEGMENT_ID = SegmentId.dummy("data-source2", 1);
-  private static final DataSegment SEGMENT = new DataSegment(
-      SEGMENT_ID,
-      null,
-      null,
-      null,
-      new NumberedShardSpec(1, 10),
-      null,
-      0,
-      0
-  );
-  private static final DataSegment OTHER_SEGMENT = new DataSegment(
-      OTHER_SEGMENT_ID,
-      null,
-      null,
-      null,
-      new NumberedShardSpec(1, 10),
-      null,
-      0,
-      0
-  );
-
-  private DataSegmentAndIndexZipFilePath target;
-
-  @Test
-  public void test_equals_otherNull_notEqual()
-  {
-    String tmpPath = "tmpPath";
-    String finalPath = "finalPath";
-    target = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        tmpPath,
-        finalPath
-    );
-    Assert.assertNotEquals(target, null);
-  }
-
-  @Test
-  public void test_equals_differentSegmentId_notEqual()
-  {
-    String tmpPath = "tmpPath";
-    String finalPath = "finalPath";
-    target = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        tmpPath,
-        finalPath
-    );
-
-    DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
-        OTHER_SEGMENT,
-        tmpPath,
-        finalPath
-    );
-    Assert.assertNotEquals(target, other);
-  }
-
-  @Test
-  public void test_equals_differentTmpPath_notEqual()
-  {
-    String tmpPath = "tmpPath";
-    String otherTmpPath = "otherTmpPath";
-    String finalPath = "finalPath";
-    target = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        tmpPath,
-        finalPath
-    );
-
-    DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        otherTmpPath,
-        finalPath
-    );
-    Assert.assertNotEquals(target, other);
-  }
-
-  @Test
-  public void test_equals_differentFinalPath_notEqual()
-  {
-    String tmpPath = "tmpPath";
-    String finalPath = "finalPath";
-    String otherFinalPath = "otherFinalPath";
-    target = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        tmpPath,
-        finalPath
-    );
-
-    DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        tmpPath,
-        otherFinalPath
-    );
-    Assert.assertNotEquals(target, other);
-  }
-
-  @Test
-  public void test_equals_allFieldsEqualValue_equal()
-  {
-    String tmpPath = "tmpPath";
-    String finalPath = "finalPath";
-    target = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        tmpPath,
-        finalPath
-    );
-
-    DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        tmpPath,
-        finalPath
-    );
-    Assert.assertEquals(target, other);
-  }
-
-  @Test
-  public void test_equals_sameObject_equal()
-  {
-    String tmpPath = "tmpPath";
-    String finalPath = "finalPath";
-    target = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        tmpPath,
-        finalPath
-    );
-
-    Assert.assertEquals(target, target);
-  }
-
-  @Test
-  public void test_serde() throws IOException
-  {
-    String tmpPath = "tmpPath";
-    String finalPath = "finalPath";
-    target = new DataSegmentAndIndexZipFilePath(
-        SEGMENT,
-        tmpPath,
-        finalPath
-    );
-
-    final InjectableValues.Std injectableValues = new InjectableValues.Std();
-    injectableValues.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT);
-    final ObjectMapper mapper = new DefaultObjectMapper();
-    mapper.setInjectableValues(injectableValues);
-    final String json = mapper.writeValueAsString(target);
-    final DataSegmentAndIndexZipFilePath fromJson =
-        mapper.readValue(json, DataSegmentAndIndexZipFilePath.class);
-    Assert.assertEquals(target, fromJson);
-  }
-}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java
deleted file mode 100644
index 8231b7c..0000000
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer;
-
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.List;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({
-    JobHelper.class,
-    IndexGeneratorJob.class
-})
-@PowerMockIgnore({"javax.net.ssl.*"})
-public class HadoopDruidIndexerJobTest
-{
-  private HadoopDruidIndexerConfig config;
-  private MetadataStorageUpdaterJobHandler handler;
-  private HadoopDruidIndexerJob target;
-
-  @Test
-  public void test_run()
-  {
-    config = PowerMock.createMock(HadoopDruidIndexerConfig.class);
-    handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class);
-    PowerMock.mockStaticNice(JobHelper.class);
-    PowerMock.mockStaticNice(IndexGeneratorJob.class);
-    config.verify();
-    EasyMock.expectLastCall();
-    EasyMock.expect(config.isUpdaterJobSpecSet()).andReturn(false).anyTimes();
-    config.setHadoopJobIdFileName(EasyMock.anyString());
-    EasyMock.expectLastCall();
-    JobHelper.ensurePaths(config);
-    EasyMock.expectLastCall();
-    Capture<List<Jobby>> capturedJobs = Capture.newInstance();
-    EasyMock.expect(JobHelper.runJobs(EasyMock.capture(capturedJobs))).andReturn(true);
-    EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(EasyMock.anyObject())).andReturn(null);
-
-
-    PowerMock.replayAll();
-
-    target = new HadoopDruidIndexerJob(config, handler);
-    target.run();
-
-    List<Jobby> jobs = capturedJobs.getValue();
-    Assert.assertEquals(2, jobs.size());
-    jobs.stream().filter(job -> !(job instanceof IndexGeneratorJob)).forEach(job -> Assert.assertTrue(job.run()));
-
-    PowerMock.verifyAll();
-  }
-}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
index 24ff122..97967bd 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
@@ -621,21 +621,13 @@ public class IndexGeneratorJobTest
 
   private void verifyJob(IndexGeneratorJob job) throws IOException
   {
-    Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job)));
+    Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config));
 
     final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
     IndexGeneratorJob
-        .getPublishedSegmentAndIndexZipFilePaths(config)
-        .forEach(segmentAndIndexZipFilePath -> intervalToSegments.computeIfAbsent(segmentAndIndexZipFilePath.getSegment().getInterval(), k -> new ArrayList<>())
-                                              .add(segmentAndIndexZipFilePath.getSegment()));
-
-    List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths =
-        IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
-    JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths);
-
-    JobHelper.maybeDeleteIntermediatePath(true, config.getSchema());
-    File workingPath = new File(config.makeIntermediatePath().toUri().getPath());
-    Assert.assertTrue(workingPath.exists());
+        .getPublishedSegments(config)
+        .forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>())
+                                              .add(segment));
 
     final Map<Interval, List<File>> intervalToIndexFiles = new HashMap<>();
     int segmentNum = 0;
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java
deleted file mode 100644
index 48f653a..0000000
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.java.util.common.IOE;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.easymock.EasyMock;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({
-    FileSystemHelper.class,
-    HadoopDruidIndexerConfig.class
-})
-@PowerMockIgnore({"javax.net.ssl.*"})
-public class JobHelperPowerMockTest
-{
-  private static final String TMP_PATH = "/tmp/index.zip.0";
-  private static final String FINAL_PATH = "/final/index.zip.0";
-
-  private HadoopDruidIndexerConfig indexerConfig;
-
-  @Test
-  public void test_renameIndexFilesForSegments_emptySegments() throws IOException
-  {
-    HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
-    List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths = ImmutableList.of();
-
-    PowerMock.replayAll();
-
-    JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths);
-
-    PowerMock.verifyAll();
-  }
-
-  @Test
-  public void test_renameIndexFilesForSegments_segmentIndexFileRenamedSuccessfully()
-      throws IOException
-  {
-    HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
-    mockFileSystem(true);
-    DataSegment segment = PowerMock.createMock(DataSegment.class);
-
-    List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths = ImmutableList.of(
-        new DataSegmentAndIndexZipFilePath(
-            segment,
-            TMP_PATH,
-            FINAL_PATH
-        )
-    );
-    PowerMock.replayAll();
-
-    JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths);
-
-    PowerMock.verifyAll();
-  }
-
-  @Test (expected = IOE.class)
-  public void test_renameIndexFilesForSegments_segmentIndexFileRenamedFailed_throwsException()
-      throws IOException
-  {
-    HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
-    mockFileSystem(false);
-    DataSegment segment = PowerMock.createMock(DataSegment.class);
-    List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths = ImmutableList.of(
-        new DataSegmentAndIndexZipFilePath(
-            segment,
-            TMP_PATH,
-            FINAL_PATH
-        )
-    );
-
-    PowerMock.replayAll();
-
-    JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths);
-
-    PowerMock.verifyAll();
-  }
-
-  @Test
-  public void test_maybeDeleteIntermediatePath_leaveIntermediate_doesNotDeleteIntermediatePath()
-  {
-    HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
-    HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
-    EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(true);
-    EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig);
-
-    PowerMock.replayAll();
-
-    JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec);
-
-    PowerMock.verifyAll();
-  }
-
-  @Test
-  public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexerJobSucceeded_deleteIntermediatePath()
-      throws IOException
-  {
-    HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
-    HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
-    Path workingPath = PowerMock.createMock(Path.class);
-    FileSystem workingPathFs = PowerMock.createMock(FileSystem.class);
-    EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false);
-    EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig);
-    EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true);
-    EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs);
-    EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath);
-
-    PowerMock.replayAll();
-
-    JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec);
-
-    PowerMock.verifyAll();
-  }
-
-  @Test
-  public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexJobFailedAndCleanupOnFailure_deleteIntermediatePath()
-      throws IOException
-  {
-    HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
-    HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
-    Path workingPath = PowerMock.createMock(Path.class);
-    FileSystem workingPathFs = PowerMock.createMock(FileSystem.class);
-    EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false);
-    EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true);
-    EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes();
-    EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true);
-    EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs);
-    EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath);
-
-    PowerMock.replayAll();
-
-    JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec);
-
-    PowerMock.verifyAll();
-  }
-
-  @Test
-  public void test_maybeDeleteIntermediatePath_deleteThrowsException_noExceptionPropogated()
-      throws IOException
-  {
-    HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
-    HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
-    Path workingPath = PowerMock.createMock(Path.class);
-    FileSystem workingPathFs = PowerMock.createMock(FileSystem.class);
-    EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false);
-    EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true);
-    EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes();
-    EasyMock.expect(workingPathFs.delete(workingPath, true)).andThrow(new IOException("Delete Exception"));
-    EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs);
-    EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath);
-
-    PowerMock.replayAll();
-
-    JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec);
-
-    PowerMock.verifyAll();
-  }
-
-  private HadoopIngestionSpec mockIngestionSpec()
-  {
-    indexerConfig = PowerMock.createMock(HadoopDruidIndexerConfig.class);
-    HadoopIngestionSpec ingestionSpec = PowerMock.createMock(HadoopIngestionSpec.class);
-    PowerMock.mockStaticNice(HadoopDruidIndexerConfig.class);
-    EasyMock.expect(indexerConfig.getAllowedProperties()).andReturn(ImmutableMap.of()).anyTimes();
-    indexerConfig.addJobProperties(EasyMock.anyObject(Configuration.class));
-    EasyMock.expectLastCall().anyTimes();
-    EasyMock.expect(HadoopDruidIndexerConfig.fromSpec(ingestionSpec)).andReturn(indexerConfig);
-    EasyMock.expect(indexerConfig.getSchema()).andReturn(ingestionSpec).anyTimes();
-    return ingestionSpec;
-  }
-
-  private void mockFileSystem(boolean renameSuccess) throws IOException
-  {
-    PowerMock.mockStaticNice(FileSystemHelper.class);
-    FileSystem fileSystem = PowerMock.createMock(FileSystem.class);
-    EasyMock.expect(FileSystemHelper.get(
-        EasyMock.anyObject(URI.class),
-        EasyMock.anyObject(Configuration.class)
-    )).andReturn(fileSystem);
-    EasyMock.expect(fileSystem.exists(EasyMock.anyObject(Path.class))).andReturn(false);
-    EasyMock.expect(fileSystem.rename(EasyMock.anyObject(Path.class), EasyMock.anyObject(Path.class)))
-            .andReturn(renameSuccess);
-  }
-}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java
deleted file mode 100644
index 0b86763..0000000
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
-import org.easymock.EasyMock;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({
-    IndexGeneratorJob.class
-})
-@PowerMockIgnore({"javax.net.ssl.*"})
-public class MetadataStorageUpdaterJobTest
-{
-  private static final List<DataSegmentAndIndexZipFilePath> DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS = ImmutableList.of(
-      new DataSegmentAndIndexZipFilePath(null, null, null)
-  );
-  private static final String SEGMENT_TABLE = "segments";
-  private HadoopIngestionSpec spec;
-  private HadoopIOConfig ioConfig;
-  private MetadataStorageUpdaterJobSpec metadataUpdateSpec;
-  private HadoopDruidIndexerConfig config;
-  private MetadataStorageUpdaterJobHandler handler;
-  private MetadataStorageUpdaterJob target;
-
-  @Test
-  public void test_run()
-  {
-    metadataUpdateSpec = PowerMock.createMock(MetadataStorageUpdaterJobSpec.class);
-    ioConfig = PowerMock.createMock(HadoopIOConfig.class);
-    spec = PowerMock.createMock(HadoopIngestionSpec.class);
-    config = PowerMock.createMock(HadoopDruidIndexerConfig.class);
-    handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class);
-    PowerMock.mockStaticNice(IndexGeneratorJob.class);
-
-    EasyMock.expect(metadataUpdateSpec.getSegmentTable()).andReturn(SEGMENT_TABLE);
-    EasyMock.expect(ioConfig.getMetadataUpdateSpec()).andReturn(metadataUpdateSpec);
-    EasyMock.expect(spec.getIOConfig()).andReturn(ioConfig);
-    EasyMock.expect(config.getSchema()).andReturn(spec);
-    EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config))
-            .andReturn(DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS);
-    handler.publishSegments(
-        SEGMENT_TABLE,
-        DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS.stream().map(s -> s.getSegment()).collect(
-        Collectors.toList()), HadoopDruidIndexerConfig.JSON_MAPPER);
-    EasyMock.expectLastCall();
-    target = new MetadataStorageUpdaterJob(config, handler);
-
-    PowerMock.replayAll();
-
-    target.run();
-
-    PowerMock.verifyAll();
-  }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 37ffb4c..b66ae47 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -31,8 +30,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.druid.indexer.DataSegmentAndIndexZipFilePath;
 import org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob;
 import org.apache.druid.indexer.HadoopDruidIndexerConfig;
 import org.apache.druid.indexer.HadoopDruidIndexerJob;
@@ -86,7 +83,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class HadoopIndexTask extends HadoopTask implements ChatHandler
 {
@@ -311,197 +307,170 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
   @SuppressWarnings("unchecked")
   private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
   {
-    boolean indexGeneratorJobAttempted = false;
-    boolean indexGeneratorJobSuccess = false;
-    HadoopIngestionSpec indexerSchema = null;
-    try {
-      registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
-      String hadoopJobIdFile = getHadoopJobIdFileName();
-      final ClassLoader loader = buildClassLoader(toolbox);
-      boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
-
-      HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
-          spec,
-          jsonMapper,
-          new OverlordActionBasedUsedSegmentsRetriever(toolbox)
-      );
+    registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
+    String hadoopJobIdFile = getHadoopJobIdFileName();
+    final ClassLoader loader = buildClassLoader(toolbox);
+    boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
+
+    HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
+        spec,
+        jsonMapper,
+        new OverlordActionBasedUsedSegmentsRetriever(toolbox)
+    );
 
-      Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject(
-          "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner",
-          loader
-      );
-      determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner);
+    Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject(
+        "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner",
+        loader
+    );
+    determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner);
 
-      String[] determinePartitionsInput = new String[]{
-          toolbox.getJsonMapper().writeValueAsString(spec),
-          toolbox.getConfig().getHadoopWorkingPath(),
-          toolbox.getSegmentPusher().getPathForHadoop(),
-          hadoopJobIdFile
-      };
+    String[] determinePartitionsInput = new String[]{
+        toolbox.getJsonMapper().writeValueAsString(spec),
+        toolbox.getConfig().getHadoopWorkingPath(),
+        toolbox.getSegmentPusher().getPathForHadoop(),
+        hadoopJobIdFile
+    };
 
-      final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
-      Class<?> determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass();
-      Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod(
-          "runTask",
-          determinePartitionsInput.getClass()
-      );
-      try {
-        Thread.currentThread().setContextClassLoader(loader);
+    HadoopIngestionSpec indexerSchema;
+    final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+    Class<?> determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass();
+    Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod(
+        "runTask",
+        determinePartitionsInput.getClass()
+    );
+    try {
+      Thread.currentThread().setContextClassLoader(loader);
 
-        ingestionState = IngestionState.DETERMINE_PARTITIONS;
+      ingestionState = IngestionState.DETERMINE_PARTITIONS;
 
-        final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke(
-            determinePartitionsInnerProcessingRunner,
-            new Object[]{determinePartitionsInput}
-        );
+      final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke(
+          determinePartitionsInnerProcessingRunner,
+          new Object[]{determinePartitionsInput}
+      );
 
 
-        determineConfigStatus = toolbox
-            .getJsonMapper()
-            .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class);
+      determineConfigStatus = toolbox
+          .getJsonMapper()
+          .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class);
 
-        indexerSchema = determineConfigStatus.getSchema();
-        if (indexerSchema == null) {
-          errorMsg = determineConfigStatus.getErrorMsg();
-          toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
-          return TaskStatus.failure(
-              getId(),
-              errorMsg
-          );
-        }
-      }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-      finally {
-        Thread.currentThread().setContextClassLoader(oldLoader);
-      }
-
-      // We should have a lock from before we started running only if interval was specified
-      String version;
-      if (determineIntervals) {
-        Interval interval = JodaUtils.umbrellaInterval(
-            JodaUtils.condenseIntervals(
-                indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
-            )
-        );
-        final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
-        // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
-        final TaskLock lock = Preconditions.checkNotNull(
-            toolbox.getTaskActionClient().submit(
-                new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
-            ),
-            "Cannot acquire a lock for interval[%s]", interval
+      indexerSchema = determineConfigStatus.getSchema();
+      if (indexerSchema == null) {
+        errorMsg = determineConfigStatus.getErrorMsg();
+        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+        return TaskStatus.failure(
+            getId(),
+            errorMsg
         );
-        version = lock.getVersion();
-      } else {
-        Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());
-        final TaskLock myLock = Iterables.getOnlyElement(locks);
-        version = myLock.getVersion();
       }
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      Thread.currentThread().setContextClassLoader(oldLoader);
+    }
 
-      final String specVersion = indexerSchema.getTuningConfig().getVersion();
-      if (indexerSchema.getTuningConfig().isUseExplicitVersion()) {
-        if (specVersion.compareTo(version) < 0) {
-          version = specVersion;
-        } else {
-          log.error(
-              "Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
-              specVersion,
-              version
-          );
-          toolbox.getTaskReportFileWriter().write(getId(), null);
-          return TaskStatus.failure(getId());
-        }
+    // We should have a lock from before we started running only if interval was specified
+    String version;
+    if (determineIntervals) {
+      Interval interval = JodaUtils.umbrellaInterval(
+          JodaUtils.condenseIntervals(
+              indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
+          )
+      );
+      final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
+      // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
+      final TaskLock lock = Preconditions.checkNotNull(
+          toolbox.getTaskActionClient().submit(
+              new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
+          ),
+          "Cannot acquire a lock for interval[%s]", interval
+      );
+      version = lock.getVersion();
+    } else {
+      Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());
+      final TaskLock myLock = Iterables.getOnlyElement(locks);
+      version = myLock.getVersion();
+    }
+
+    final String specVersion = indexerSchema.getTuningConfig().getVersion();
+    if (indexerSchema.getTuningConfig().isUseExplicitVersion()) {
+      if (specVersion.compareTo(version) < 0) {
+        version = specVersion;
+      } else {
+        log.error(
+            "Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
+            specVersion,
+            version
+        );
+        toolbox.getTaskReportFileWriter().write(getId(), null);
+        return TaskStatus.failure(getId());
       }
+    }
 
-      log.info("Setting version to: %s", version);
+    log.info("Setting version to: %s", version);
 
-      Object innerProcessingRunner = getForeignClassloaderObject(
-          "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner",
-          loader
-      );
-      buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner);
+    Object innerProcessingRunner = getForeignClassloaderObject(
+        "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner",
+        loader
+    );
+    buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner);
 
-      String[] buildSegmentsInput = new String[]{
-          toolbox.getJsonMapper().writeValueAsString(indexerSchema),
-          version,
-          hadoopJobIdFile
-      };
+    String[] buildSegmentsInput = new String[]{
+        toolbox.getJsonMapper().writeValueAsString(indexerSchema),
+        version,
+        hadoopJobIdFile
+    };
 
-      Class<?> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
-      Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass());
+    Class<?> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
+    Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass());
 
-      try {
-        Thread.currentThread().setContextClassLoader(loader);
+    try {
+      Thread.currentThread().setContextClassLoader(loader);
 
-        ingestionState = IngestionState.BUILD_SEGMENTS;
-        indexGeneratorJobAttempted = true;
-        final String jobStatusString = (String) innerProcessingRunTask.invoke(
-            innerProcessingRunner,
-            new Object[]{buildSegmentsInput}
-        );
+      ingestionState = IngestionState.BUILD_SEGMENTS;
+      final String jobStatusString = (String) innerProcessingRunTask.invoke(
+          innerProcessingRunner,
+          new Object[]{buildSegmentsInput}
+      );
 
-        buildSegmentsStatus = toolbox.getJsonMapper().readValue(
-            jobStatusString,
-            HadoopIndexGeneratorInnerProcessingStatus.class
-        );
+      buildSegmentsStatus = toolbox.getJsonMapper().readValue(
+          jobStatusString,
+          HadoopIndexGeneratorInnerProcessingStatus.class
+      );
 
-        List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths();
-        if (dataSegmentAndIndexZipFilePaths != null) {
-          indexGeneratorJobSuccess = true;
-          try {
-            Thread.currentThread().setContextClassLoader(oldLoader);
-            renameSegmentIndexFilesJob(
-                toolbox.getJsonMapper().writeValueAsString(indexerSchema),
-                toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
-            );
-          }
-          finally {
-            Thread.currentThread().setContextClassLoader(loader);
-          }
-          ArrayList<DataSegment> segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream()
-                                                                                           .map(
-                                                                                               DataSegmentAndIndexZipFilePath::getSegment)
-                                                                                           .collect(Collectors.toList()));
-          toolbox.publishSegments(segments);
-
-          // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
-          // for awaitSegmentAvailabilityTimeoutMillis
-          if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) {
-            ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
-            segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability(
-                toolbox,
-                segments,
-                spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
-            );
-          }
-
-          ingestionState = IngestionState.COMPLETED;
-          toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
-          return TaskStatus.success(getId());
-        } else {
-          errorMsg = buildSegmentsStatus.getErrorMsg();
-          toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
-          return TaskStatus.failure(
-              getId(),
-              errorMsg
+      if (buildSegmentsStatus.getDataSegments() != null) {
+        toolbox.publishSegments(buildSegmentsStatus.getDataSegments());
+
+        // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
+        // for awaitSegmentAvailabilityTimeoutMillis
+        if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) {
+          ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
+          ArrayList<DataSegment> segmentsToWaitFor = new ArrayList<>(buildSegmentsStatus.getDataSegments());
+          segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability(
+              toolbox,
+              segmentsToWaitFor,
+              spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
           );
         }
+
+        ingestionState = IngestionState.COMPLETED;
+        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+        return TaskStatus.success(getId());
+      } else {
+        errorMsg = buildSegmentsStatus.getErrorMsg();
+        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+        return TaskStatus.failure(
+            getId(),
+            errorMsg
+        );
       }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-      finally {
-        Thread.currentThread().setContextClassLoader(oldLoader);
-      }
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
     }
     finally {
-      indexerGeneratorCleanupJob(
-          indexGeneratorJobAttempted,
-          indexGeneratorJobSuccess,
-          indexerSchema == null ? null : toolbox.getJsonMapper().writeValueAsString(indexerSchema)
-      );
+      Thread.currentThread().setContextClassLoader(oldLoader);
     }
   }
 
@@ -545,96 +514,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     }
   }
 
-  private void renameSegmentIndexFilesJob(
-      String hadoopIngestionSpecStr,
-      String dataSegmentAndIndexZipFilePathListStr
-  )
-  {
-    final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
-    try {
-      ClassLoader loader = HadoopTask.buildClassLoader(
-          getHadoopDependencyCoordinates(),
-          taskConfig.getDefaultHadoopCoordinates()
-      );
-
-      Object renameSegmentIndexFilesRunner = getForeignClassloaderObject(
-          "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner",
-          loader
-      );
-
-      String[] renameSegmentIndexFilesJobInput = new String[]{
-          hadoopIngestionSpecStr,
-          dataSegmentAndIndexZipFilePathListStr
-      };
-
-      Class<?> buildRenameSegmentIndexFilesJobRunnerClass = renameSegmentIndexFilesRunner.getClass();
-      Method renameSegmentIndexFiles = buildRenameSegmentIndexFilesJobRunnerClass.getMethod(
-          "runTask",
-          renameSegmentIndexFilesJobInput.getClass()
-      );
-
-      Thread.currentThread().setContextClassLoader(loader);
-      renameSegmentIndexFiles.invoke(
-          renameSegmentIndexFilesRunner,
-          new Object[]{renameSegmentIndexFilesJobInput}
-      );
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    finally {
-      Thread.currentThread().setContextClassLoader(oldLoader);
-    }
-  }
-
-  private void indexerGeneratorCleanupJob(
-      boolean indexGeneratorJobAttempted,
-      boolean indexGeneratorJobSuccess,
-      String hadoopIngestionSpecStr
-  )
-  {
-    if (!indexGeneratorJobAttempted) {
-      log.info("No need for cleanup as index generator job did not even run");
-      return;
-    }
-
-    final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
-    try {
-      ClassLoader loader = HadoopTask.buildClassLoader(
-          getHadoopDependencyCoordinates(),
-          taskConfig.getDefaultHadoopCoordinates()
-      );
-
-      Object indexerGeneratorCleanupRunner = getForeignClassloaderObject(
-          "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexerGeneratorCleanupRunner",
-          loader
-      );
-
-      String[] indexerGeneratorCleanupJobInput = new String[]{
-          indexGeneratorJobSuccess ? "true" : "false",
-          hadoopIngestionSpecStr,
-      };
-
-      Class<?> buildIndexerGeneratorCleanupRunnerClass = indexerGeneratorCleanupRunner.getClass();
-      Method indexerGeneratorCleanup = buildIndexerGeneratorCleanupRunnerClass.getMethod(
-          "runTask",
-          indexerGeneratorCleanupJobInput.getClass()
-      );
-
-      Thread.currentThread().setContextClassLoader(loader);
-      indexerGeneratorCleanup.invoke(
-          indexerGeneratorCleanupRunner,
-          new Object[]{indexerGeneratorCleanupJobInput}
-      );
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to cleanup after index generator job");
-    }
-    finally {
-      Thread.currentThread().setContextClassLoader(oldLoader);
-    }
-  }
-
   @GET
   @Path("/rowStats")
   @Produces(MediaType.APPLICATION_JSON)
@@ -843,7 +722,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
         if (job.run()) {
           return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(
               new HadoopIndexGeneratorInnerProcessingStatus(
-                  job.getPublishedSegmentAndIndexZipFilePaths(),
+                  job.getPublishedSegments(),
                   job.getStats(),
                   null
               )
@@ -911,111 +790,28 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     }
   }
 
-  @SuppressWarnings("unused")
-  public static class HadoopRenameSegmentIndexFilesRunner
-  {
-    TypeReference<List<DataSegmentAndIndexZipFilePath>> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH =
-        new TypeReference<List<DataSegmentAndIndexZipFilePath>>()
-        {
-        };
-
-    public void runTask(String[] args) throws Exception
-    {
-      if (args.length != 2) {
-        log.warn("HadoopRenameSegmentIndexFilesRunner called with improper number of arguments");
-      }
-      String hadoopIngestionSpecStr = args[0];
-      String dataSegmentAndIndexZipFilePathListStr = args[1];
-
-      HadoopIngestionSpec indexerSchema;
-      List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths;
-      try {
-        indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
-            hadoopIngestionSpecStr,
-            HadoopIngestionSpec.class
-        );
-        dataSegmentAndIndexZipFilePaths = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
-            dataSegmentAndIndexZipFilePathListStr,
-            LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH
-        );
-      }
-      catch (Exception e) {
-        log.warn(
-            e,
-            "HadoopRenameSegmentIndexFilesRunner: Error occurred while trying to read input parameters into data objects"
-        );
-        throw e;
-      }
-      JobHelper.renameIndexFilesForSegments(
-          indexerSchema,
-          dataSegmentAndIndexZipFilePaths
-      );
-    }
-  }
-
-  @SuppressWarnings("unused")
-  public static class HadoopIndexerGeneratorCleanupRunner
-  {
-    TypeReference<List<DataSegmentAndIndexZipFilePath>> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH =
-        new TypeReference<List<DataSegmentAndIndexZipFilePath>>()
-        {
-        };
-
-    public void runTask(String[] args) throws Exception
-    {
-      if (args.length != 2) {
-        log.warn("HadoopIndexerGeneratorCleanupRunner called with improper number of arguments");
-      }
-
-      String indexGeneratorJobSucceededStr = args[0];
-      String hadoopIngestionSpecStr = args[1];
-
-      HadoopIngestionSpec indexerSchema;
-      boolean indexGeneratorJobSucceeded;
-      List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths;
-      try {
-        indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
-            hadoopIngestionSpecStr,
-            HadoopIngestionSpec.class
-        );
-        indexGeneratorJobSucceeded = BooleanUtils.toBoolean(indexGeneratorJobSucceededStr);
-      }
-      catch (Exception e) {
-        log.warn(
-            e,
-            "HadoopIndexerGeneratorCleanupRunner: Error occurred while trying to read input parameters into data objects"
-        );
-        throw e;
-      }
-      JobHelper.maybeDeleteIntermediatePath(
-          indexGeneratorJobSucceeded,
-          indexerSchema
-      );
-    }
-  }
-
   public static class HadoopIndexGeneratorInnerProcessingStatus
   {
-    private final List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths;
+    private final List<DataSegment> dataSegments;
     private final Map<String, Object> metrics;
     private final String errorMsg;
 
     @JsonCreator
     public HadoopIndexGeneratorInnerProcessingStatus(
-        @JsonProperty("dataSegmentAndIndexZipFilePaths") List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths,
+        @JsonProperty("dataSegments") List<DataSegment> dataSegments,
         @JsonProperty("metrics") Map<String, Object> metrics,
         @JsonProperty("errorMsg") String errorMsg
     )
     {
-      this.dataSegmentAndIndexZipFilePaths = dataSegmentAndIndexZipFilePaths;
+      this.dataSegments = dataSegments;
       this.metrics = metrics;
       this.errorMsg = errorMsg;
     }
 
     @JsonProperty
-    public List<DataSegmentAndIndexZipFilePath> getDataSegmentAndIndexZipFilePaths()
+    public List<DataSegment> getDataSegments()
     {
-      return dataSegmentAndIndexZipFilePaths;
+      return dataSegments;
     }
 
     @JsonProperty
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 8ad9cec..2fb9adc 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -102,10 +102,6 @@
                     <groupId>javax.servlet</groupId>
                     <artifactId>servlet-api</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>com.squareup.okhttp</groupId>
-                    <artifactId>okhttp</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
diff --git a/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java
index b227aba..4235abb 100644
--- a/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java
@@ -117,12 +117,9 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
       );
 
       List<Jobby> jobs = new ArrayList<>();
-      HadoopDruidIndexerJob indexerJob = new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class));
       jobs.add(new HadoopDruidDetermineConfigurationJob(config));
-      jobs.add(indexerJob);
-      boolean jobsSucceeded = JobHelper.runJobs(jobs);
-      JobHelper.renameIndexFilesForSegments(config.getSchema(), indexerJob.getPublishedSegmentAndIndexZipFilePaths());
-      JobHelper.maybeDeleteIntermediatePath(jobsSucceeded, config.getSchema());
+      jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class)));
+      JobHelper.runJobs(jobs, config);
 
     }
     catch (Exception e) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org