You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/03/16 06:29:34 UTC

[incubator-druid] branch master updated: Deprecate NoneShardSpec and drop support for automatic segment merge (#6883)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 892d1d3  Deprecate NoneShardSpec and drop support for automatic segment merge (#6883)
892d1d3 is described below

commit 892d1d35d6cc00487d583f05f1cc138782180ef9
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Mar 15 23:29:25 2019 -0700

    Deprecate NoneShardSpec and drop support for automatic segment merge (#6883)
    
    * Deprecate noneShardSpec
    
    * clean up noneShardSpec constructor
    
    * revert unnecessary change
    
    * Deprecate mergeTask
    
    * add more doc
    
    * remove convert from indexMerger
    
    * Remove mergeTask
    
    * remove HadoopDruidConverterConfig
    
    * fix build
    
    * fix build
    
    * fix teamcity
    
    * fix teamcity
    
    * fix ServerModule
    
    * fix compilation
    
    * fix compilation
---
 .../apache/druid/guice/ConditionalMultibind.java   |  27 +-
 .../org/apache/druid/timeline/DataSegment.java     |   6 +-
 .../partition/HashBasedNumberedShardSpec.java      |   0
 .../druid/timeline/partition/LinearShardSpec.java  |   0
 .../druid/timeline/partition/NoneShardSpec.java    |   7 +-
 .../timeline/partition/NumberedShardSpec.java      |   0
 .../apache/druid/timeline/partition/ShardSpec.java |   8 +-
 .../partition/SingleDimensionShardSpec.java        |  53 +-
 docs/content/configuration/index.md                |   5 +-
 docs/content/ingestion/hadoop.md                   |   2 +-
 docs/content/ingestion/native_tasks.md             |   9 +-
 docs/content/tutorials/tutorial-batch.md           |   3 +-
 docs/content/tutorials/tutorial-compaction.md      |   3 +-
 docs/content/tutorials/tutorial-rollup.md          |   3 +-
 docs/content/tutorials/tutorial-transform-spec.md  |   3 +-
 .../quickstart/tutorial/compaction-init-index.json |   3 +-
 .../tutorial/compaction-keep-granularity.json      |   3 +-
 examples/quickstart/tutorial/deletion-index.json   |   3 +-
 examples/quickstart/tutorial/retention-index.json  |   3 +-
 examples/quickstart/tutorial/rollup-index.json     |   3 +-
 examples/quickstart/tutorial/transform-index.json  |   3 +-
 .../quickstart/tutorial/updates-append-index.json  |   3 +-
 .../quickstart/tutorial/updates-append-index2.json |   3 +-
 .../quickstart/tutorial/updates-init-index.json    |   3 +-
 .../tutorial/updates-overwrite-index.json          |   3 +-
 examples/quickstart/tutorial/wikipedia-index.json  |   3 +-
 .../google/GoogleDataSegmentPusherTest.java        |   2 +-
 .../indexer/DetermineHashedPartitionsJob.java      |  31 +-
 .../druid/indexer/DeterminePartitionsJob.java      |  57 +-
 .../HadoopDruidDetermineConfigurationJob.java      |  40 +-
 .../java/org/apache/druid/indexer/JobHelper.java   |  63 +-
 .../druid/indexer/updater/HadoopConverterJob.java  | 678 ---------------------
 .../updater/HadoopDruidConverterConfig.java        | 192 ------
 .../indexer/updater/HadoopConverterJobTest.java    | 428 -------------
 .../updater/HadoopDruidConverterConfigTest.java    |  64 --
 indexing-hadoop/src/test/resources/log4j2.xml      |  35 ++
 .../index/RealtimeAppenderatorTuningConfig.java    |   4 +-
 .../druid/indexing/common/task/AppendTask.java     | 165 -----
 .../druid/indexing/common/task/IndexTask.java      | 101 +--
 .../druid/indexing/common/task/MergeTask.java      | 124 ----
 .../druid/indexing/common/task/MergeTaskBase.java  | 336 ----------
 .../common/task/SameIntervalMergeTask.java         | 181 ------
 .../apache/druid/indexing/common/task/Task.java    |   3 -
 .../parallel/ParallelIndexSupervisorTask.java      |   1 -
 .../batch/parallel/ParallelIndexTuningConfig.java  |   3 -
 .../apache/druid/indexing/common/TestUtils.java    |   8 -
 .../common/task/CompactionTaskRunTest.java         |   2 +-
 .../indexing/common/task/CompactionTaskTest.java   |   6 -
 .../druid/indexing/common/task/HadoopTaskTest.java |   4 +-
 .../druid/indexing/common/task/IndexTaskTest.java  |  98 +--
 .../indexing/common/task/MergeTaskBaseTest.java    |  91 ---
 .../common/task/SameIntervalMergeTaskTest.java     | 267 --------
 .../druid/indexing/common/task/TaskSerdeTest.java  | 162 -----
 .../ParallelIndexSupervisorTaskKillTest.java       |   1 -
 .../ParallelIndexSupervisorTaskResourceTest.java   |   1 -
 .../ParallelIndexSupervisorTaskSerdeTest.java      |   1 -
 .../parallel/ParallelIndexSupervisorTaskTest.java  |   3 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java |   3 -
 .../apache/druid/jackson/DefaultObjectMapper.java  |   1 -
 .../org/apache/druid/jackson/SegmentsModule.java   |  45 --
 .../java/org/apache/druid/segment/IndexMerger.java |   8 -
 .../org/apache/druid/segment/IndexMergerV9.java    |   3 +-
 .../apache/druid/segment/ProgressIndicator.java    |   4 +-
 .../druid/client/indexing/ClientAppendQuery.java   |  73 ---
 .../druid/client/indexing/ClientMergeQuery.java    |  85 ---
 .../apache/druid/client/indexing/ClientQuery.java  |   2 -
 .../client/indexing/HttpIndexingServiceClient.java |  21 -
 .../client/indexing/IndexingServiceClient.java     |   2 -
 .../java/org/apache/druid/guice/ServerModule.java  |  28 +-
 .../segment/indexing/RealtimeTuningConfig.java     |   4 +-
 .../helper/DruidCoordinatorSegmentMerger.java      | 331 ----------
 .../client/indexing/ClientAppendQueryTest.java     |  72 ---
 .../client/indexing/ClientMergeQueryTest.java      |  77 ---
 .../client/indexing/NoopIndexingServiceClient.java |   6 -
 .../segment/indexing/RealtimeTuningConfigTest.java |   6 +-
 .../org/apache/druid/server/ServerTestHelper.java  |   8 -
 .../coordinator/DruidCoordinatorConfigTest.java    |   1 -
 .../DruidCoordinatorSegmentMergerTest.java         | 482 ---------------
 .../druid/server/http/ServersResourceTest.java     |   4 +-
 .../java/org/apache/druid/cli/CliCoordinator.java  |  21 +-
 80 files changed, 227 insertions(+), 4374 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/guice/ConditionalMultibind.java b/core/src/main/java/org/apache/druid/guice/ConditionalMultibind.java
index 7cba822..88657c3 100644
--- a/core/src/main/java/org/apache/druid/guice/ConditionalMultibind.java
+++ b/core/src/main/java/org/apache/druid/guice/ConditionalMultibind.java
@@ -184,11 +184,7 @@ public class ConditionalMultibind<T>
       Class<? extends T> target
   )
   {
-    final String value = properties.getProperty(property);
-    if (value == null) {
-      return this;
-    }
-    if (condition.apply(value)) {
+    if (matchCondition(property, condition)) {
       multibinder.addBinding().to(target);
     }
     return this;
@@ -209,11 +205,7 @@ public class ConditionalMultibind<T>
       T target
   )
   {
-    final String value = properties.getProperty(property);
-    if (value == null) {
-      return this;
-    }
-    if (condition.apply(value)) {
+    if (matchCondition(property, condition)) {
       multibinder.addBinding().toInstance(target);
     }
     return this;
@@ -235,13 +227,18 @@ public class ConditionalMultibind<T>
       TypeLiteral<T> target
   )
   {
-    final String value = properties.getProperty(property);
-    if (value == null) {
-      return this;
-    }
-    if (condition.apply(value)) {
+    if (matchCondition(property, condition)) {
       multibinder.addBinding().to(target);
     }
     return this;
   }
+
+  public boolean matchCondition(String property, Predicate<String> condition)
+  {
+    final String value = properties.getProperty(property);
+    if (value == null) {
+      return false;
+    }
+    return condition.apply(value);
+  }
 }
diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java
index ee75ec1..97fd14a 100644
--- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java
+++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java
@@ -38,7 +38,7 @@ import org.apache.druid.jackson.CommaListJoinDeserializer;
 import org.apache.druid.jackson.CommaListJoinSerializer;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -146,7 +146,7 @@ public class DataSegment implements Comparable<DataSegment>
     // dataSource
     this.dimensions = prepareDimensionsOrMetrics(dimensions, DIMENSIONS_INTERNER);
     this.metrics = prepareDimensionsOrMetrics(metrics, METRICS_INTERNER);
-    this.shardSpec = (shardSpec == null) ? NoneShardSpec.instance() : shardSpec;
+    this.shardSpec = (shardSpec == null) ? new NumberedShardSpec(0, 1) : shardSpec;
     this.binaryVersion = binaryVersion;
     this.size = size;
   }
@@ -373,7 +373,7 @@ public class DataSegment implements Comparable<DataSegment>
       this.loadSpec = ImmutableMap.of();
       this.dimensions = ImmutableList.of();
       this.metrics = ImmutableList.of();
-      this.shardSpec = NoneShardSpec.instance();
+      this.shardSpec = new NumberedShardSpec(0, 1);
       this.size = -1;
     }
 
diff --git a/server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
similarity index 100%
rename from server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
rename to core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
diff --git a/server/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java
similarity index 100%
rename from server/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java
rename to core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
index 006a063..7fb2aab 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java
@@ -30,6 +30,7 @@ import java.util.Map;
 
 /**
  */
+@Deprecated
 public class NoneShardSpec implements ShardSpec
 {
   private static final NoneShardSpec INSTANCE = new NoneShardSpec();
@@ -40,11 +41,7 @@ public class NoneShardSpec implements ShardSpec
     return INSTANCE;
   }
 
-  /**
-   * @deprecated use {@link #instance()} instead
-   */
-  @Deprecated
-  public NoneShardSpec()
+  private NoneShardSpec()
   {
     // empty
   }
diff --git a/server/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java
similarity index 100%
rename from server/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java
rename to core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
index b555cda..10a6d6c 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
@@ -33,8 +33,12 @@ import java.util.Map;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes({
-                  @JsonSubTypes.Type(name = "none", value = NoneShardSpec.class),
-              })
+    @JsonSubTypes.Type(name = "none", value = NoneShardSpec.class),
+    @JsonSubTypes.Type(name = "single", value = SingleDimensionShardSpec.class),
+    @JsonSubTypes.Type(name = "linear", value = LinearShardSpec.class),
+    @JsonSubTypes.Type(name = "numbered", value = NumberedShardSpec.class),
+    @JsonSubTypes.Type(name = "hashed", value = HashBasedNumberedShardSpec.class)
+})
 public interface ShardSpec
 {
   <T> PartitionChunk<T> createChunk(T obj);
diff --git a/server/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
similarity index 86%
rename from server/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
rename to core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
index 044f163..f2b8441 100644
--- a/server/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
@@ -19,13 +19,16 @@
 
 package org.apache.druid.timeline.partition;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.ISE;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -36,24 +39,22 @@ import java.util.Map;
  */
 public class SingleDimensionShardSpec implements ShardSpec
 {
-  private String dimension;
-  private String start;
-  private String end;
-  private int partitionNum;
-
-  public SingleDimensionShardSpec()
-  {
-    this(null, null, null, -1);
-  }
-
+  private final String dimension;
+  @Nullable
+  private final String start;
+  @Nullable
+  private final String end;
+  private final int partitionNum;
+
+  @JsonCreator
   public SingleDimensionShardSpec(
-      String dimension,
-      String start,
-      String end,
-      int partitionNum
+      @JsonProperty("dimension") String dimension,
+      @JsonProperty("start") @Nullable String start,
+      @JsonProperty("end") @Nullable String end,
+      @JsonProperty("partitionNum") int partitionNum
   )
   {
-    this.dimension = dimension;
+    this.dimension = Preconditions.checkNotNull(dimension, "dimension");
     this.start = start;
     this.end = end;
     this.partitionNum = partitionNum;
@@ -65,33 +66,20 @@ public class SingleDimensionShardSpec implements ShardSpec
     return dimension;
   }
 
-  public void setDimension(String dimension)
-  {
-    this.dimension = dimension;
-  }
-
+  @Nullable
   @JsonProperty("start")
   public String getStart()
   {
     return start;
   }
 
-  public void setStart(String start)
-  {
-    this.start = start;
-  }
-
+  @Nullable
   @JsonProperty("end")
   public String getEnd()
   {
     return end;
   }
 
-  public void setEnd(String end)
-  {
-    this.end = end;
-  }
-
   @Override
   @JsonProperty("partitionNum")
   public int getPartitionNum()
@@ -143,11 +131,6 @@ public class SingleDimensionShardSpec implements ShardSpec
     return !rangeSet.subRangeSet(getRange()).isEmpty();
   }
 
-  public void setPartitionNum(int partitionNum)
-  {
-    this.partitionNum = partitionNum;
-  }
-
   @Override
   public <T> PartitionChunk<T> createChunk(T obj)
   {
diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index 14cbbfc..43067d6 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -576,7 +576,7 @@ If you are running the indexing service in remote mode, the task logs must be st
 |`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file|
 
 You can also configure the Overlord to automatically retain the task logs in log directory and entries in task-related metadata storage tables only for last x milliseconds by configuring following additional properties.
-Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid processes and backing store nodes might result in un-intended behavior.  
+Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid processes and backing store nodes might result in un-intended behavior.
 
 |Property|Description|Default|
 |--------|-----------|-------|
@@ -719,7 +719,6 @@ These Coordinator static configurations can be defined in the `coordinator/runti
 |`druid.coordinator.period`|The run period for the Coordinator. The Coordinator’s operates by maintaining the current state of the world in memory and periodically looking at the set of segments available and segments being served to make decisions about whether any changes need to be made to the data topology. This property sets the delay between each of these runs.|PT60S|
 |`druid.coordinator.period.indexingPeriod`|How often to send compact/merge/conversion tasks to the indexing service. It's recommended to be longer than `druid.manager.segments.pollDuration`|PT1800S (30 mins)|
 |`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S|
-|`druid.coordinator.merge.on`|Boolean flag for whether or not the Coordinator should try and merge small segments into a more optimal segment size.|false|
 |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M|
 |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earlist running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configurati [...]
 |`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all segments except for the last `durationToRetain` period. Whitelist or All can be set via dynamic configuration `killAllDataSourc [...]
@@ -1457,7 +1456,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti
 
 This section describes caching configuration that is common to Broker, Historical, and MiddleManager/Peon processes.
  
-Caching can optionally be enabled on the Broker, Historical, and MiddleManager/Peon processses. See [Broker](#broker-caching), 
+Caching can optionally be enabled on the Broker, Historical, and MiddleManager/Peon processses. See [Broker](#broker-caching),
 [Historical](#Historical-caching), and [Peon](#peon-caching) configuration options for how to enable it for different processes.
 
 Druid uses a local in-memory cache by default, unless a diffrent type of cache is specified.
diff --git a/docs/content/ingestion/hadoop.md b/docs/content/ingestion/hadoop.md
index c824fd0..8882c59 100644
--- a/docs/content/ingestion/hadoop.md
+++ b/docs/content/ingestion/hadoop.md
@@ -194,7 +194,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)|
 |indexSpec|Object|Tune how data is indexed. See below for more information.|no|
 |numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
-|forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|no (default = false)|
+|forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitioning-specification). This option can be useful when you need to append more data to existing dataSource.|no (default = false)|
 |useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)|
 |logParseExceptions|Boolean|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|false|no|
 |maxParseExceptions|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overrides `ignoreInvalidRows` if `maxParseExceptions` is defined.|unlimited|no|
diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md
index 4ecaccf..f61b395 100644
--- a/docs/content/ingestion/native_tasks.md
+++ b/docs/content/ingestion/native_tasks.md
@@ -170,7 +170,7 @@ that range if there's some stray data with unexpected timestamps.
 |--------|-----------|-------|---------|
 |type|The task type, this should always be `index_parallel`.|none|yes|
 |firehose|Specify a [Firehose](../ingestion/firehose.html) here.|none|yes|
-|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs (which can be forced by setting 'forceExtendableShardSpecs' in the tuning config).|false|no|
+|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs.|false|no|
 
 #### TuningConfig
 
@@ -186,7 +186,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
 |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
 |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
-|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
 |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
 |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
 |segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentWriteOutMediumFactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no|
@@ -377,7 +376,6 @@ An example of the result is
           "longEncoding": "longs"
         },
         "maxPendingPersists": 0,
-        "forceExtendableShardSpecs": false,
         "reportParseExceptions": false,
         "pushTimeout": 0,
         "segmentWriteOutMediumFactory": null,
@@ -541,7 +539,7 @@ that range if there's some stray data with unexpected timestamps.
 |--------|-----------|-------|---------|
 |type|The task type, this should always be "index".|none|yes|
 |firehose|Specify a [Firehose](../ingestion/firehose.html) here.|none|yes|
-|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs (which can be forced by setting 'forceExtendableShardSpecs' in the tuning config).|false|no|
+|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs.|false|no|
 
 #### TuningConfig
 
@@ -558,7 +556,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no|
 |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
 |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
-|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
 |forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.html#roll-up-modes). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. You can set `forceExtendableShard [...]
 |reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no|
 |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
@@ -617,4 +614,4 @@ the index task immediately pushes all segments created until that moment, cleans
 continues to ingest remaining data.
 
 To enable bulk pushing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot
-be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig.
+be used with `appendToExisting` of IOConfig.
diff --git a/docs/content/tutorials/tutorial-batch.md b/docs/content/tutorials/tutorial-batch.md
index a18d434..b679573 100644
--- a/docs/content/tutorials/tutorial-batch.md
+++ b/docs/content/tutorials/tutorial-batch.md
@@ -99,8 +99,7 @@ which has been configured to read the `quickstart/tutorial/wikiticker-2015-09-12
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/docs/content/tutorials/tutorial-compaction.md b/docs/content/tutorials/tutorial-compaction.md
index e51b3c6..9f0e1a8 100644
--- a/docs/content/tutorials/tutorial-compaction.md
+++ b/docs/content/tutorials/tutorial-compaction.md
@@ -85,8 +85,7 @@ We have included a compaction task spec for this tutorial datasource at `quickst
   "tuningConfig" : {
     "type" : "index",
     "maxRowsPerSegment" : 5000000,
-    "maxRowsInMemory" : 25000,
-    "forceExtendableShardSpecs" : true
+    "maxRowsInMemory" : 25000
   }
 }
 ```
diff --git a/docs/content/tutorials/tutorial-rollup.md b/docs/content/tutorials/tutorial-rollup.md
index 013fa0e..56c02cf 100644
--- a/docs/content/tutorials/tutorial-rollup.md
+++ b/docs/content/tutorials/tutorial-rollup.md
@@ -100,8 +100,7 @@ We'll ingest this data using the following ingestion task spec, located at `quic
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/docs/content/tutorials/tutorial-transform-spec.md b/docs/content/tutorials/tutorial-transform-spec.md
index 60d7bf8..45d62a0 100644
--- a/docs/content/tutorials/tutorial-transform-spec.md
+++ b/docs/content/tutorials/tutorial-transform-spec.md
@@ -115,8 +115,7 @@ We will ingest the sample data using the following spec, which demonstrates the
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/compaction-init-index.json b/examples/quickstart/tutorial/compaction-init-index.json
index d74f5c0..b6b59b6 100644
--- a/examples/quickstart/tutorial/compaction-init-index.json
+++ b/examples/quickstart/tutorial/compaction-init-index.json
@@ -56,8 +56,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "maxRowsPerSegment" : 1000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsPerSegment" : 1000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/compaction-keep-granularity.json b/examples/quickstart/tutorial/compaction-keep-granularity.json
index a7ff422..6721e7b 100644
--- a/examples/quickstart/tutorial/compaction-keep-granularity.json
+++ b/examples/quickstart/tutorial/compaction-keep-granularity.json
@@ -5,7 +5,6 @@
   "tuningConfig" : {
     "type" : "index",
     "maxRowsPerSegment" : 5000000,
-    "maxRowsInMemory" : 25000,
-    "forceExtendableShardSpecs" : true
+    "maxRowsInMemory" : 25000
   }
 }
diff --git a/examples/quickstart/tutorial/deletion-index.json b/examples/quickstart/tutorial/deletion-index.json
index d84dc21..0faf468 100644
--- a/examples/quickstart/tutorial/deletion-index.json
+++ b/examples/quickstart/tutorial/deletion-index.json
@@ -57,8 +57,7 @@
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/retention-index.json b/examples/quickstart/tutorial/retention-index.json
index 9aee62a..95416e2 100644
--- a/examples/quickstart/tutorial/retention-index.json
+++ b/examples/quickstart/tutorial/retention-index.json
@@ -57,8 +57,7 @@
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/rollup-index.json b/examples/quickstart/tutorial/rollup-index.json
index b4d96aa..2c1426e 100644
--- a/examples/quickstart/tutorial/rollup-index.json
+++ b/examples/quickstart/tutorial/rollup-index.json
@@ -44,8 +44,7 @@
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/transform-index.json b/examples/quickstart/tutorial/transform-index.json
index ea8ead7..8d40b19 100644
--- a/examples/quickstart/tutorial/transform-index.json
+++ b/examples/quickstart/tutorial/transform-index.json
@@ -66,8 +66,7 @@
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/updates-append-index.json b/examples/quickstart/tutorial/updates-append-index.json
index b8de06a..75d2335 100644
--- a/examples/quickstart/tutorial/updates-append-index.json
+++ b/examples/quickstart/tutorial/updates-append-index.json
@@ -52,8 +52,7 @@
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/updates-append-index2.json b/examples/quickstart/tutorial/updates-append-index2.json
index a5c49c4..247192a 100644
--- a/examples/quickstart/tutorial/updates-append-index2.json
+++ b/examples/quickstart/tutorial/updates-append-index2.json
@@ -42,8 +42,7 @@
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/updates-init-index.json b/examples/quickstart/tutorial/updates-init-index.json
index 3620ff1..71c449b 100644
--- a/examples/quickstart/tutorial/updates-init-index.json
+++ b/examples/quickstart/tutorial/updates-init-index.json
@@ -42,8 +42,7 @@
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/updates-overwrite-index.json b/examples/quickstart/tutorial/updates-overwrite-index.json
index 95f2e2d..451750e 100644
--- a/examples/quickstart/tutorial/updates-overwrite-index.json
+++ b/examples/quickstart/tutorial/updates-overwrite-index.json
@@ -42,8 +42,7 @@
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/examples/quickstart/tutorial/wikipedia-index.json b/examples/quickstart/tutorial/wikipedia-index.json
index 888e08e..785fbda 100644
--- a/examples/quickstart/tutorial/wikipedia-index.json
+++ b/examples/quickstart/tutorial/wikipedia-index.json
@@ -57,8 +57,7 @@
     "tuningConfig" : {
       "type" : "index",
       "maxRowsPerSegment" : 5000000,
-      "maxRowsInMemory" : 25000,
-      "forceExtendableShardSpecs" : true
+      "maxRowsInMemory" : 25000
     }
   }
 }
diff --git a/extensions-contrib/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPusherTest.java b/extensions-contrib/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPusherTest.java
index 817f288..c5ba422 100644
--- a/extensions-contrib/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPusherTest.java
+++ b/extensions-contrib/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPusherTest.java
@@ -75,7 +75,7 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
         new HashMap<>(),
         new ArrayList<>(),
         new ArrayList<>(),
-        new NoneShardSpec(),
+        NoneShardSpec.instance(),
         0,
         size
     );
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index b4b0d8e..d30899c 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -36,7 +36,6 @@ import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
-import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -184,23 +183,19 @@ public class DetermineHashedPartitionsJob implements Jobby
           log.info("Creating [%,d] shards", numberOfShards);
 
           List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
-          if (numberOfShards == 1) {
-            actualSpecs.add(new HadoopyShardSpec(NoneShardSpec.instance(), shardCount++));
-          } else {
-            for (int i = 0; i < numberOfShards; ++i) {
-              actualSpecs.add(
-                  new HadoopyShardSpec(
-                      new HashBasedNumberedShardSpec(
-                          i,
-                          numberOfShards,
-                          null,
-                          HadoopDruidIndexerConfig.JSON_MAPPER
-                      ),
-                      shardCount++
-                  )
-              );
-              log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
-            }
+          for (int i = 0; i < numberOfShards; ++i) {
+            actualSpecs.add(
+                new HadoopyShardSpec(
+                    new HashBasedNumberedShardSpec(
+                        i,
+                        numberOfShards,
+                        null,
+                        HadoopDruidIndexerConfig.JSON_MAPPER
+                    ),
+                    shardCount++
+                )
+            );
+            log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
           }
 
           shardSpecs.put(bucket.getMillis(), actualSpecs);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index 7dad5c6..dc0cf4e 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -41,7 +41,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.nary.BinaryFn;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
 import org.apache.hadoop.conf.Configurable;
@@ -697,37 +696,33 @@ public class DeterminePartitionsJob implements Jobby
             // One more shard to go
             final ShardSpec shardSpec;
 
-            if (currentDimPartitions.partitions.isEmpty()) {
-              shardSpec = NoneShardSpec.instance();
+            if (currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) {
+              // Combine with previous shard
+              final DimPartition previousDimPartition = currentDimPartitions.partitions.remove(
+                  currentDimPartitions.partitions.size() - 1
+              );
+
+              final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec;
+
+              shardSpec = new SingleDimensionShardSpec(
+                  currentDimPartitions.dim,
+                  previousShardSpec.getStart(),
+                  null,
+                  previousShardSpec.getPartitionNum()
+              );
+
+              log.info("Removing possible shard: %s", previousShardSpec);
+
+              currentDimPartition.rows += previousDimPartition.rows;
+              currentDimPartition.cardinality += previousDimPartition.cardinality;
             } else {
-              if (currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) {
-                // Combine with previous shard
-                final DimPartition previousDimPartition = currentDimPartitions.partitions.remove(
-                    currentDimPartitions.partitions.size() - 1
-                );
-
-                final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec;
-
-                shardSpec = new SingleDimensionShardSpec(
-                    currentDimPartitions.dim,
-                    previousShardSpec.getStart(),
-                    null,
-                    previousShardSpec.getPartitionNum()
-                );
-
-                log.info("Removing possible shard: %s", previousShardSpec);
-
-                currentDimPartition.rows += previousDimPartition.rows;
-                currentDimPartition.cardinality += previousDimPartition.cardinality;
-              } else {
-                // Create new shard
-                shardSpec = new SingleDimensionShardSpec(
-                    currentDimPartitions.dim,
-                    currentDimPartitionStart,
-                    null,
-                    currentDimPartitions.partitions.size()
-                );
-              }
+              // Create new shard
+              shardSpec = new SingleDimensionShardSpec(
+                  currentDimPartitions.dim,
+                  currentDimPartitionStart,
+                  null,
+                  currentDimPartitions.partitions.size()
+              );
             }
 
             log.info(
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index 10551ba..534afcb 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -23,11 +23,9 @@ import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
-import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -64,28 +62,24 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
       int shardCount = 0;
       for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
         DateTime bucket = segmentGranularity.getStart();
-        if (shardsPerInterval > 0) {
-          List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(shardsPerInterval);
-          for (int i = 0; i < shardsPerInterval; i++) {
-            specs.add(
-                new HadoopyShardSpec(
-                    new HashBasedNumberedShardSpec(
-                        i,
-                        shardsPerInterval,
-                        config.getPartitionsSpec().getPartitionDimensions(),
-                        HadoopDruidIndexerConfig.JSON_MAPPER
-                    ),
-                    shardCount++
-                )
-            );
-          }
-          shardSpecs.put(bucket.getMillis(), specs);
-          log.info("DateTime[%s], spec[%s]", bucket, specs);
-        } else {
-          final HadoopyShardSpec spec = new HadoopyShardSpec(NoneShardSpec.instance(), shardCount++);
-          shardSpecs.put(bucket.getMillis(), Collections.singletonList(spec));
-          log.info("DateTime[%s], spec[%s]", bucket, spec);
+        // negative shardsPerInterval means a single shard
+        final int realShardsPerInterval = shardsPerInterval < 0 ? 1 : shardsPerInterval;
+        List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(realShardsPerInterval);
+        for (int i = 0; i < realShardsPerInterval; i++) {
+          specs.add(
+              new HadoopyShardSpec(
+                  new HashBasedNumberedShardSpec(
+                      i,
+                      realShardsPerInterval,
+                      config.getPartitionsSpec().getPartitionDimensions(),
+                      HadoopDruidIndexerConfig.JSON_MAPPER
+                  ),
+                  shardCount++
+              )
+          );
         }
+        shardSpecs.put(bucket.getMillis(), specs);
+        log.info("DateTime[%s], spec[%s]", bucket, specs);
       }
       config.setShardSpecs(shardSpecs);
       return true;
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
index 1981099..19303ff 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.io.Files;
-import org.apache.druid.indexer.updater.HadoopDruidConverterConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.IAE;
@@ -32,7 +31,6 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.segment.ProgressIndicator;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.timeline.DataSegment;
@@ -47,7 +45,6 @@ import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
@@ -793,7 +790,7 @@ public class JobHelper
       try {
         throw new IAE(
             "Cannot figure out loadSpec %s",
-            HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec)
+            HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(loadSpec)
         );
       }
       catch (JsonProcessingException e) {
@@ -803,64 +800,6 @@ public class JobHelper
     return segmentLocURI;
   }
 
-  public static ProgressIndicator progressIndicatorForContext(
-      final TaskAttemptContext context
-  )
-  {
-    return new ProgressIndicator()
-    {
-
-      @Override
-      public void progress()
-      {
-        context.progress();
-      }
-
-      @Override
-      public void start()
-      {
-        context.progress();
-        context.setStatus("STARTED");
-      }
-
-      @Override
-      public void stop()
-      {
-        context.progress();
-        context.setStatus("STOPPED");
-      }
-
-      @Override
-      public void startSection(String section)
-      {
-        context.progress();
-        context.setStatus(StringUtils.format("STARTED [%s]", section));
-      }
-
-      @Override
-      public void stopSection(String section)
-      {
-        context.progress();
-        context.setStatus(StringUtils.format("STOPPED [%s]", section));
-      }
-    };
-  }
-
-  public static boolean deleteWithRetry(final FileSystem fs, final Path path, final boolean recursive)
-  {
-    try {
-      return RetryUtils.retry(
-          () -> fs.delete(path, recursive),
-          shouldRetryPredicate(),
-          NUM_RETRIES
-      );
-    }
-    catch (Exception e) {
-      log.error(e, "Failed to cleanup path[%s]", path);
-      throw new RuntimeException(e);
-    }
-  }
-
   public static String getJobTrackerAddress(Configuration config)
   {
     String jobTrackerAddress = config.get("mapred.job.tracker");
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/updater/HadoopConverterJob.java
deleted file mode 100644
index a44fc80..0000000
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/updater/HadoopConverterJob.java
+++ /dev/null
@@ -1,678 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer.updater;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
-import org.apache.commons.io.FileUtils;
-import org.apache.druid.indexer.JobHelper;
-import org.apache.druid.indexer.hadoop.DatasourceInputSplit;
-import org.apache.druid.indexer.hadoop.WindowedDataSegment;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobPriority;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskReport;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.util.Progressable;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HadoopConverterJob
-{
-  private static final Logger log = new Logger(HadoopConverterJob.class);
-  private static final String COUNTER_GROUP = "Hadoop Druid Converter";
-  private static final String COUNTER_LOADED = "Loaded Bytes";
-  private static final String COUNTER_WRITTEN = "Written Bytes";
-
-  private static void setJobName(JobConf jobConf, List<DataSegment> segments)
-  {
-    if (segments.size() == 1) {
-      final DataSegment segment = segments.get(0);
-      jobConf.setJobName(
-          StringUtils.format(
-              "druid-convert-%s-%s-%s",
-              segment.getDataSource(),
-              segment.getInterval(),
-              segment.getVersion()
-          )
-      );
-    } else {
-      final Set<String> dataSources = Sets.newHashSet(
-          Iterables.transform(
-              segments,
-              new Function<DataSegment, String>()
-              {
-                @Override
-                public String apply(DataSegment input)
-                {
-                  return input.getDataSource();
-                }
-              }
-          )
-      );
-      final Set<String> versions = Sets.newHashSet(
-          Iterables.transform(
-              segments,
-              new Function<DataSegment, String>()
-              {
-                @Override
-                public String apply(DataSegment input)
-                {
-                  return input.getVersion();
-                }
-              }
-          )
-      );
-      jobConf.setJobName(
-          StringUtils.format(
-              "druid-convert-%s-%s",
-              Arrays.toString(dataSources.toArray()),
-              Arrays.toString(versions.toArray())
-          )
-      );
-    }
-  }
-
-  public static Path getJobPath(JobID jobID, Path workingDirectory)
-  {
-    return new Path(workingDirectory, jobID.toString());
-  }
-
-  public static Path getTaskPath(JobID jobID, TaskAttemptID taskAttemptID, Path workingDirectory)
-  {
-    return new Path(getJobPath(jobID, workingDirectory), taskAttemptID.toString());
-  }
-
-  public static Path getJobClassPathDir(String jobName, Path workingDirectory)
-  {
-    return new Path(workingDirectory, StringUtils.removeChar(jobName, ':'));
-  }
-
-  public static void cleanup(Job job) throws IOException
-  {
-    final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory());
-    final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
-    RuntimeException e = null;
-    try {
-      JobHelper.deleteWithRetry(fs, jobDir, true);
-    }
-    catch (RuntimeException ex) {
-      e = ex;
-    }
-    try {
-      JobHelper.deleteWithRetry(fs, getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true);
-    }
-    catch (RuntimeException ex) {
-      if (e == null) {
-        e = ex;
-      } else {
-        e.addSuppressed(ex);
-      }
-    }
-    if (e != null) {
-      throw e;
-    }
-  }
-
-
-  public static HadoopDruidConverterConfig converterConfigFromConfiguration(Configuration configuration)
-      throws IOException
-  {
-    final String property = Preconditions.checkNotNull(
-        configuration.get(HadoopDruidConverterConfig.CONFIG_PROPERTY),
-        HadoopDruidConverterConfig.CONFIG_PROPERTY
-    );
-    return HadoopDruidConverterConfig.fromString(property);
-  }
-
-  public static void converterConfigIntoConfiguration(
-      HadoopDruidConverterConfig priorConfig,
-      List<DataSegment> segments,
-      Configuration configuration
-  )
-  {
-    final HadoopDruidConverterConfig config = new HadoopDruidConverterConfig(
-        priorConfig.getDataSource(),
-        priorConfig.getInterval(),
-        priorConfig.getIndexSpec(),
-        segments,
-        priorConfig.isValidate(),
-        priorConfig.getDistributedSuccessCache(),
-        priorConfig.getHadoopProperties(),
-        priorConfig.getJobPriority(),
-        priorConfig.getSegmentOutputPath()
-    );
-    try {
-      configuration.set(
-          HadoopDruidConverterConfig.CONFIG_PROPERTY,
-          HadoopDruidConverterConfig.jsonMapper.writeValueAsString(config)
-      );
-    }
-    catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private final HadoopDruidConverterConfig converterConfig;
-  private long loadedBytes = 0L;
-  private long writtenBytes = 0L;
-
-  public HadoopConverterJob(
-      HadoopDruidConverterConfig converterConfig
-  )
-  {
-    this.converterConfig = converterConfig;
-  }
-
-  public List<DataSegment> run() throws IOException
-  {
-    final JobConf jobConf = new JobConf();
-    jobConf.setKeepFailedTaskFiles(false);
-    for (Map.Entry<String, String> entry : converterConfig.getHadoopProperties().entrySet()) {
-      jobConf.set(entry.getKey(), entry.getValue(), "converterConfig.getHadoopProperties()");
-    }
-    final List<DataSegment> segments = converterConfig.getSegments();
-    if (segments.isEmpty()) {
-      throw new IAE(
-          "No segments found for datasource [%s]",
-          converterConfig.getDataSource()
-      );
-    }
-    converterConfigIntoConfiguration(converterConfig, segments, jobConf);
-
-    jobConf.setNumReduceTasks(0); // Map only. Number of map tasks determined by input format
-    jobConf.setWorkingDirectory(new Path(converterConfig.getDistributedSuccessCache()));
-
-    setJobName(jobConf, segments);
-
-    if (converterConfig.getJobPriority() != null) {
-      jobConf.setJobPriority(JobPriority.valueOf(converterConfig.getJobPriority()));
-    }
-
-    final Job job = Job.getInstance(jobConf);
-
-    job.setInputFormatClass(ConfigInputFormat.class);
-    job.setMapperClass(ConvertingMapper.class);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(Text.class);
-    job.setMapSpeculativeExecution(false);
-    job.setOutputFormatClass(ConvertingOutputFormat.class);
-
-    JobHelper.setupClasspath(
-        JobHelper.distributedClassPath(jobConf.getWorkingDirectory()),
-        JobHelper.distributedClassPath(getJobClassPathDir(job.getJobName(), jobConf.getWorkingDirectory())),
-        job
-    );
-
-    Throwable throwable = null;
-    try {
-      job.submit();
-      log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
-      final boolean success = job.waitForCompletion(true);
-      if (!success) {
-        final TaskReport[] reports = job.getTaskReports(TaskType.MAP);
-        if (reports != null) {
-          for (final TaskReport report : reports) {
-            log.error("Error in task [%s] : %s", report.getTaskId(), Arrays.toString(report.getDiagnostics()));
-          }
-        }
-        return null;
-      }
-      try {
-        loadedBytes = job.getCounters().findCounter(COUNTER_GROUP, COUNTER_LOADED).getValue();
-        writtenBytes = job.getCounters().findCounter(COUNTER_GROUP, COUNTER_WRITTEN).getValue();
-      }
-      catch (IOException ex) {
-        log.error(ex, "Could not fetch counters");
-      }
-      final JobID jobID = job.getJobID();
-
-      final Path jobDir = getJobPath(jobID, job.getWorkingDirectory());
-      final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
-      final RemoteIterator<LocatedFileStatus> it = fs.listFiles(jobDir, true);
-      final List<Path> goodPaths = new ArrayList<>();
-      while (it.hasNext()) {
-        final LocatedFileStatus locatedFileStatus = it.next();
-        if (locatedFileStatus.isFile()) {
-          final Path myPath = locatedFileStatus.getPath();
-          if (ConvertingOutputFormat.DATA_SUCCESS_KEY.equals(myPath.getName())) {
-            goodPaths.add(new Path(myPath.getParent(), ConvertingOutputFormat.DATA_FILE_KEY));
-          }
-        }
-      }
-      if (goodPaths.isEmpty()) {
-        log.warn("No good data found at [%s]", jobDir);
-        return null;
-      }
-      final List<DataSegment> returnList = ImmutableList.copyOf(
-          Lists.transform(
-              goodPaths,
-              new Function<Path, DataSegment>()
-              {
-                @Nullable
-                @Override
-                public DataSegment apply(final Path input)
-                {
-                  try {
-                    if (!fs.exists(input)) {
-                      throw new ISE(
-                          "Somehow [%s] was found but [%s] is missing at [%s]",
-                          ConvertingOutputFormat.DATA_SUCCESS_KEY,
-                          ConvertingOutputFormat.DATA_FILE_KEY,
-                          jobDir
-                      );
-                    }
-                  }
-                  catch (final IOException e) {
-                    throw new RuntimeException(e);
-                  }
-                  try (final InputStream stream = fs.open(input)) {
-                    return HadoopDruidConverterConfig.jsonMapper.readValue(stream, DataSegment.class);
-                  }
-                  catch (final IOException e) {
-                    throw new RuntimeException(e);
-                  }
-                }
-              }
-          )
-      );
-      if (returnList.size() == segments.size()) {
-        return returnList;
-      } else {
-        throw new ISE(
-            "Tasks reported success but result length did not match! Expected %d found %d at path [%s]",
-            segments.size(),
-            returnList.size(),
-            jobDir
-        );
-      }
-    }
-    catch (InterruptedException | ClassNotFoundException e) {
-      RuntimeException r = new RuntimeException(e);
-      throwable = r;
-      throw r;
-    }
-    catch (Throwable t) {
-      throwable = t;
-      throw t;
-    }
-    finally {
-      try {
-        cleanup(job);
-      }
-      catch (IOException e) {
-        if (throwable != null) {
-          throwable.addSuppressed(e);
-        } else {
-          log.error(e, "Could not clean up job [%s]", job.getJobID());
-        }
-      }
-    }
-  }
-
-  public long getLoadedBytes()
-  {
-    return loadedBytes;
-  }
-
-  public long getWrittenBytes()
-  {
-    return writtenBytes;
-  }
-
-  public static class ConvertingOutputFormat extends OutputFormat<Text, Text>
-  {
-    protected static final String DATA_FILE_KEY = "result";
-    protected static final String DATA_SUCCESS_KEY = "_SUCCESS";
-    protected static final String PUBLISHED_SEGMENT_KEY = "org.apache.druid.indexer.updater.converter.publishedSegment";
-    private static final Logger log = new Logger(ConvertingOutputFormat.class);
-
-    @Override
-    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
-    {
-      return new RecordWriter<Text, Text>()
-      {
-        @Override
-        public void write(Text key, Text value)
-        {
-          // NOOP
-        }
-
-        @Override
-        public void close(TaskAttemptContext context)
-        {
-          // NOOP
-        }
-      };
-    }
-
-    @Override
-    public void checkOutputSpecs(JobContext context)
-    {
-      // NOOP
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(final TaskAttemptContext context)
-    {
-      return new OutputCommitter()
-      {
-        @Override
-        public void setupJob(JobContext jobContext)
-        {
-          // NOOP
-        }
-
-        @Override
-        public void setupTask(TaskAttemptContext taskContext)
-        {
-          // NOOP
-        }
-
-        @Override
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-          return taskContext.getConfiguration().get(PUBLISHED_SEGMENT_KEY) != null;
-        }
-
-        @Override
-        public void commitTask(final TaskAttemptContext taskContext) throws IOException
-        {
-          final Progressable commitProgressable = new Progressable()
-          {
-            @Override
-            public void progress()
-            {
-              taskContext.progress();
-            }
-          };
-          final String finalSegmentString = taskContext.getConfiguration().get(PUBLISHED_SEGMENT_KEY);
-          if (finalSegmentString == null) {
-            throw new IOException("Could not read final segment");
-          }
-          final DataSegment newSegment = HadoopDruidConverterConfig.jsonMapper.readValue(
-              finalSegmentString,
-              DataSegment.class
-          );
-          log.info("Committing new segment [%s]", newSegment);
-          taskContext.progress();
-
-          final FileSystem fs = taskContext.getWorkingDirectory().getFileSystem(taskContext.getConfiguration());
-          final Path taskAttemptDir = getTaskPath(
-              context.getJobID(),
-              context.getTaskAttemptID(),
-              taskContext.getWorkingDirectory()
-          );
-          final Path taskAttemptFile = new Path(taskAttemptDir, DATA_FILE_KEY);
-          final Path taskAttemptSuccess = new Path(taskAttemptDir, DATA_SUCCESS_KEY);
-          try (final OutputStream outputStream = fs.create(taskAttemptFile, false, 1 << 10, commitProgressable)) {
-            outputStream.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsBytes(newSegment));
-          }
-
-          fs.create(taskAttemptSuccess, false).close();
-
-          taskContext.progress();
-          taskContext.setStatus("Committed");
-        }
-
-        @Override
-        public void abortTask(TaskAttemptContext taskContext)
-        {
-          log.warn("Aborting task. Nothing to clean up.");
-        }
-      };
-    }
-  }
-
-
-  public static class ConvertingMapper extends Mapper<String, String, Text, Text>
-  {
-    private static final Logger log = new Logger(ConvertingMapper.class);
-    private static final String TMP_FILE_LOC_KEY = "org.apache.druid.indexer.updater.converter.reducer.tmpDir";
-
-    @Override
-    protected void map(String key, String value, final Context context) throws IOException, InterruptedException
-    {
-      final InputSplit split = context.getInputSplit();
-      if (!(split instanceof DatasourceInputSplit)) {
-        throw new IAE(
-            "Unexpected split type. Expected [%s] was [%s]",
-            DatasourceInputSplit.class.getCanonicalName(),
-            split.getClass().getCanonicalName()
-        );
-      }
-
-      final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
-      final File tmpDir = Paths.get(tmpDirLoc).toFile();
-
-      final DataSegment segment = Iterables.getOnlyElement(((DatasourceInputSplit) split).getSegments()).getSegment();
-
-      final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration());
-
-      context.setStatus("DOWNLOADING");
-      context.progress();
-      final Path inPath = new Path(JobHelper.getURIFromSegment(segment));
-      final File inDir = new File(tmpDir, "in");
-
-      if (inDir.exists() && !inDir.delete()) {
-        log.warn("Could not delete [%s]", inDir);
-      }
-
-      if (!inDir.mkdir() && (!inDir.exists() || inDir.isDirectory())) {
-        log.warn("Unable to make directory");
-      }
-
-      final long inSize = JobHelper.unzipNoGuava(inPath, context.getConfiguration(), inDir, context, null);
-      log.debug("Loaded %d bytes into [%s] for converting", inSize, inDir.getAbsolutePath());
-      context.getCounter(COUNTER_GROUP, COUNTER_LOADED).increment(inSize);
-
-      context.setStatus("CONVERTING");
-      context.progress();
-      final File outDir = new File(tmpDir, "out");
-      FileUtils.forceMkdir(outDir);
-      try {
-        HadoopDruidConverterConfig.INDEX_MERGER.convert(
-            inDir,
-            outDir,
-            config.getIndexSpec(),
-            JobHelper.progressIndicatorForContext(context),
-            null
-        );
-      }
-      catch (Exception e) {
-        log.error(e, "Conversion failed.");
-        throw e;
-      }
-      if (config.isValidate()) {
-        context.setStatus("Validating");
-        HadoopDruidConverterConfig.INDEX_IO.validateTwoSegments(inDir, outDir);
-      }
-      context.progress();
-      context.setStatus("Starting PUSH");
-      final Path baseOutputPath = new Path(config.getSegmentOutputPath());
-      final FileSystem outputFS = baseOutputPath.getFileSystem(context.getConfiguration());
-      final DataSegment finalSegmentTemplate = segment.withVersion(
-          segment.getVersion()
-          + "_converted"
-      );
-      final DataSegment finalSegment = JobHelper.serializeOutIndex(
-          finalSegmentTemplate,
-          context.getConfiguration(),
-          context,
-          outDir,
-          JobHelper.makeFileNamePath(
-              baseOutputPath,
-              outputFS,
-              finalSegmentTemplate,
-              JobHelper.INDEX_ZIP,
-              config.DATA_SEGMENT_PUSHER
-          ),
-          JobHelper.makeTmpPath(
-              baseOutputPath,
-              outputFS,
-              finalSegmentTemplate,
-              context.getTaskAttemptID(),
-              config.DATA_SEGMENT_PUSHER
-          ),
-          config.DATA_SEGMENT_PUSHER
-      );
-      context.progress();
-      context.setStatus("Finished PUSH");
-      final String finalSegmentString = HadoopDruidConverterConfig.jsonMapper.writeValueAsString(finalSegment);
-      context.getConfiguration().set(ConvertingOutputFormat.PUBLISHED_SEGMENT_KEY, finalSegmentString);
-      context.write(new Text("dataSegment"), new Text(finalSegmentString));
-
-      context.getCounter(COUNTER_GROUP, COUNTER_WRITTEN).increment(finalSegment.getSize());
-      context.progress();
-      context.setStatus("Ready To Commit");
-    }
-
-    @Override
-    protected void setup(Context context)
-    {
-      final File tmpFile = Files.createTempDir();
-      context.getConfiguration().set(TMP_FILE_LOC_KEY, tmpFile.getAbsolutePath());
-    }
-
-    @Override
-    protected void cleanup(
-        Context context
-    ) throws IOException
-    {
-      final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
-      final File tmpDir = Paths.get(tmpDirLoc).toFile();
-      FileUtils.deleteDirectory(tmpDir);
-      context.progress();
-      context.setStatus("Clean");
-    }
-  }
-
-  public static class ConfigInputFormat extends InputFormat<String, String>
-  {
-    @Override
-    public List<InputSplit> getSplits(final JobContext jobContext) throws IOException
-    {
-      final HadoopDruidConverterConfig config = converterConfigFromConfiguration(jobContext.getConfiguration());
-      final List<DataSegment> segments = config.getSegments();
-      if (segments == null) {
-        throw new IOException("Bad config, missing segments");
-      }
-      return Lists.transform(
-          segments,
-          new Function<DataSegment, InputSplit>()
-          {
-            @Nullable
-            @Override
-            public InputSplit apply(DataSegment input)
-            {
-              return new DatasourceInputSplit(ImmutableList.of(WindowedDataSegment.of(input)), null);
-            }
-          }
-      );
-    }
-
-    @Override
-    public RecordReader<String, String> createRecordReader(
-        final InputSplit inputSplit,
-        final TaskAttemptContext taskAttemptContext
-    )
-    {
-      return new RecordReader<String, String>()
-      {
-        boolean readAnything = false;
-
-        @Override
-        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-        {
-          // NOOP
-        }
-
-        @Override
-        public boolean nextKeyValue()
-        {
-          return !readAnything;
-        }
-
-        @Override
-        public String getCurrentKey()
-        {
-          return "key";
-        }
-
-        @Override
-        public String getCurrentValue()
-        {
-          readAnything = true;
-          return "fakeValue";
-        }
-
-        @Override
-        public float getProgress()
-        {
-          return readAnything ? 0.0F : 1.0F;
-        }
-
-        @Override
-        public void close()
-        {
-          // NOOP
-        }
-      };
-    }
-  }
-}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/updater/HadoopDruidConverterConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/updater/HadoopDruidConverterConfig.java
deleted file mode 100644
index 67c53ad..0000000
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/updater/HadoopDruidConverterConfig.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer.updater;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Binder;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.Module;
-import org.apache.druid.guice.GuiceInjectors;
-import org.apache.druid.guice.JsonConfigProvider;
-import org.apache.druid.guice.annotations.Self;
-import org.apache.druid.initialization.Initialization;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.IndexMerger;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.server.DruidNode;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-
-public class HadoopDruidConverterConfig
-{
-  public static final String CONFIG_PROPERTY = "org.apache.druid.indexer.updater.converter";
-  public static final ObjectMapper jsonMapper;
-  public static final IndexIO INDEX_IO;
-  public static final IndexMerger INDEX_MERGER;
-  public static final DataSegmentPusher DATA_SEGMENT_PUSHER;
-
-  private static final Injector injector = Initialization.makeInjectorWithModules(
-      GuiceInjectors.makeStartupInjector(),
-      ImmutableList.<Module>of(
-          new Module()
-          {
-            @Override
-            public void configure(Binder binder)
-            {
-              JsonConfigProvider.bindInstance(
-                  binder,
-                  Key.get(DruidNode.class, Self.class),
-                  new DruidNode("hadoop-converter", null, false, null, null, true, false)
-              );
-            }
-          }
-      )
-  );
-
-  static {
-    jsonMapper = injector.getInstance(ObjectMapper.class);
-    jsonMapper.registerSubtypes(HadoopDruidConverterConfig.class);
-    INDEX_IO = injector.getInstance(IndexIO.class);
-    INDEX_MERGER = injector.getInstance(IndexMerger.class);
-    DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class);
-  }
-
-  public static HadoopDruidConverterConfig fromString(final String string) throws IOException
-  {
-    return fromMap(jsonMapper.readValue(string, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT));
-  }
-
-  public static HadoopDruidConverterConfig fromMap(final Map<String, Object> map)
-  {
-    return jsonMapper.convertValue(map, HadoopDruidConverterConfig.class);
-  }
-
-  @JsonProperty
-  private final String dataSource;
-  @JsonProperty
-  private final Interval interval;
-  @JsonProperty
-  private final IndexSpec indexSpec;
-  @JsonProperty
-  private final List<DataSegment> segments;
-  @JsonProperty
-  private final boolean validate;
-  @JsonProperty
-  private final URI distributedSuccessCache;
-  @JsonProperty
-  private final Map<String, String> hadoopProperties;
-  @JsonProperty
-  private final String jobPriority;
-  @JsonProperty
-  private final String segmentOutputPath;
-
-  @JsonCreator
-  public HadoopDruidConverterConfig(
-      @JsonProperty("dataSource") final String dataSource,
-      @JsonProperty("interval") final Interval interval,
-      @JsonProperty("indexSpec") final IndexSpec indexSpec,
-      @JsonProperty("segments") final List<DataSegment> segments,
-      @JsonProperty("validate") final Boolean validate,
-      @JsonProperty("distributedSuccessCache") URI distributedSuccessCache,
-      @JsonProperty("hadoopProperties") Map<String, String> hadoopProperties,
-      @JsonProperty("jobPriority") String jobPriority,
-      @JsonProperty("segmentOutputPath") String segmentOutputPath
-  )
-  {
-    this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
-    this.interval = Preconditions.checkNotNull(interval, "interval");
-    this.indexSpec = Preconditions.checkNotNull(indexSpec, "indexSpec");
-    this.distributedSuccessCache = Preconditions.checkNotNull(distributedSuccessCache, "distributedSuccessCache");
-    this.segments = segments;
-    this.validate = validate == null ? false : validate;
-    this.hadoopProperties = hadoopProperties == null
-                            ? ImmutableMap.of()
-                            : ImmutableMap.copyOf(hadoopProperties);
-    this.jobPriority = jobPriority;
-    this.segmentOutputPath = Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath");
-  }
-
-  @JsonProperty
-  public boolean isValidate()
-  {
-    return validate;
-  }
-
-  @JsonProperty
-  public String getDataSource()
-  {
-    return dataSource;
-  }
-
-  @JsonProperty
-  public Interval getInterval()
-  {
-    return interval;
-  }
-
-  @JsonProperty
-  public IndexSpec getIndexSpec()
-  {
-    return indexSpec;
-  }
-
-  @JsonProperty
-  public List<DataSegment> getSegments()
-  {
-    return segments;
-  }
-
-  @JsonProperty
-  public URI getDistributedSuccessCache()
-  {
-    return distributedSuccessCache;
-  }
-
-  @JsonProperty
-  public Map<String, String> getHadoopProperties()
-  {
-    return hadoopProperties;
-  }
-
-  @JsonProperty
-  public String getJobPriority()
-  {
-    return jobPriority;
-  }
-
-  @JsonProperty
-  public String getSegmentOutputPath()
-  {
-    return segmentOutputPath;
-  }
-}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/updater/HadoopConverterJobTest.java
deleted file mode 100644
index f81c02a..0000000
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/updater/HadoopConverterJobTest.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer.updater;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteSource;
-import com.google.common.io.Files;
-import org.apache.druid.client.ImmutableDruidDataSource;
-import org.apache.druid.data.input.impl.DelimitedParseSpec;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.StringInputRowParser;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob;
-import org.apache.druid.indexer.HadoopDruidIndexerConfig;
-import org.apache.druid.indexer.HadoopDruidIndexerJob;
-import org.apache.druid.indexer.HadoopIOConfig;
-import org.apache.druid.indexer.HadoopIngestionSpec;
-import org.apache.druid.indexer.HadoopTuningConfig;
-import org.apache.druid.indexer.JobHelper;
-import org.apache.druid.indexer.Jobby;
-import org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
-import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.metadata.MetadataSegmentManagerConfig;
-import org.apache.druid.metadata.MetadataStorageConnectorConfig;
-import org.apache.druid.metadata.MetadataStorageTablesConfig;
-import org.apache.druid.metadata.SQLMetadataSegmentManager;
-import org.apache.druid.metadata.TestDerbyConnector;
-import org.apache.druid.metadata.storage.derby.DerbyConnector;
-import org.apache.druid.query.Query;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
-import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.TestIndex;
-import org.apache.druid.segment.data.CompressionFactory;
-import org.apache.druid.segment.data.CompressionStrategy;
-import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.exceptions.CallbackFailedException;
-import org.skife.jdbi.v2.tweak.HandleCallback;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-public class HadoopConverterJobTest
-{
-  @Rule
-  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-  @Rule
-  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
-  private String storageLocProperty = null;
-  private File tmpSegmentDir = null;
-
-  private static final String DATASOURCE = "testDatasource";
-  private static final String STORAGE_PROPERTY_KEY = "druid.storage.storageDirectory";
-
-  private Supplier<MetadataStorageTablesConfig> metadataStorageTablesConfigSupplier;
-  private DerbyConnector connector;
-
-  private final Interval interval = Intervals.of("2011-01-01T00:00:00.000Z/2011-05-01T00:00:00.000Z");
-
-  @After
-  public void tearDown()
-  {
-    if (storageLocProperty == null) {
-      System.clearProperty(STORAGE_PROPERTY_KEY);
-    } else {
-      System.setProperty(STORAGE_PROPERTY_KEY, storageLocProperty);
-    }
-    tmpSegmentDir = null;
-  }
-
-  @Before
-  public void setUp() throws Exception
-  {
-    final MetadataStorageUpdaterJobSpec metadataStorageUpdaterJobSpec = new MetadataStorageUpdaterJobSpec()
-    {
-      @Override
-      public String getSegmentTable()
-      {
-        return derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
-      }
-
-      @Override
-      public MetadataStorageConnectorConfig get()
-      {
-        return derbyConnectorRule.getMetadataConnectorConfig();
-      }
-    };
-    final File scratchFileDir = temporaryFolder.newFolder();
-    storageLocProperty = System.getProperty(STORAGE_PROPERTY_KEY);
-    tmpSegmentDir = temporaryFolder.newFolder();
-    System.setProperty(STORAGE_PROPERTY_KEY, tmpSegmentDir.getAbsolutePath());
-
-    final URL url = Preconditions.checkNotNull(Query.class.getClassLoader().getResource("druid.sample.tsv"));
-    final File tmpInputFile = temporaryFolder.newFile();
-    FileUtils.retryCopy(
-        new ByteSource()
-        {
-          @Override
-          public InputStream openStream() throws IOException
-          {
-            return url.openStream();
-          }
-        },
-        tmpInputFile,
-        FileUtils.IS_EXCEPTION,
-        3
-    );
-    final HadoopDruidIndexerConfig hadoopDruidIndexerConfig = new HadoopDruidIndexerConfig(
-        new HadoopIngestionSpec(
-            new DataSchema(
-                DATASOURCE,
-                HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
-                    new StringInputRowParser(
-                        new DelimitedParseSpec(
-                            new TimestampSpec("ts", "iso", null),
-                            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TestIndex.DIMENSIONS)), null, null),
-                            "\t",
-                            "\u0001",
-                            Arrays.asList(TestIndex.COLUMNS),
-                            false,
-                            0
-                        ),
-                        null
-                    ),
-                    Map.class
-                ),
-                new AggregatorFactory[]{
-                    new DoubleSumAggregatorFactory(TestIndex.DOUBLE_METRICS[0], TestIndex.DOUBLE_METRICS[0]),
-                    new HyperUniquesAggregatorFactory("quality_uniques", "quality")
-                },
-                new UniformGranularitySpec(
-                    Granularities.MONTH,
-                    Granularities.DAY,
-                    ImmutableList.of(interval)
-                ),
-                null,
-                HadoopDruidIndexerConfig.JSON_MAPPER
-            ),
-            new HadoopIOConfig(
-                ImmutableMap.of(
-                    "type", "static",
-                    "paths", tmpInputFile.getAbsolutePath()
-                ),
-                metadataStorageUpdaterJobSpec,
-                tmpSegmentDir.getAbsolutePath()
-            ),
-            new HadoopTuningConfig(
-                scratchFileDir.getAbsolutePath(),
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                false,
-                false,
-                false,
-                false,
-                null,
-                false,
-                false,
-                null,
-                null,
-                null,
-                false,
-                false,
-                null,
-                null,
-                null
-            )
-        )
-    );
-    metadataStorageTablesConfigSupplier = derbyConnectorRule.metadataTablesConfigSupplier();
-    connector = derbyConnectorRule.getConnector();
-
-    try {
-      connector.getDBI().withHandle(
-          new HandleCallback<Void>()
-          {
-            @Override
-            public Void withHandle(Handle handle)
-            {
-              handle.execute("DROP TABLE druid_segments");
-              return null;
-            }
-          }
-      );
-    }
-    catch (CallbackFailedException e) {
-      // Who cares
-    }
-    List<Jobby> jobs = ImmutableList.of(
-        new Jobby()
-        {
-          @Override
-          public boolean run()
-          {
-            connector.createSegmentTable(metadataStorageUpdaterJobSpec.getSegmentTable());
-            return true;
-          }
-        },
-        new HadoopDruidDetermineConfigurationJob(hadoopDruidIndexerConfig),
-        new HadoopDruidIndexerJob(
-            hadoopDruidIndexerConfig,
-            new SQLMetadataStorageUpdaterJobHandler(connector)
-        )
-    );
-    Assert.assertTrue(JobHelper.runJobs(jobs, hadoopDruidIndexerConfig));
-  }
-
-  private List<DataSegment> getDataSegments(
-      SQLMetadataSegmentManager manager
-  ) throws InterruptedException
-  {
-    manager.start();
-    while (!manager.isStarted()) {
-      Thread.sleep(10);
-    }
-    manager.poll();
-    final ImmutableDruidDataSource druidDataSource = manager.getDataSource(DATASOURCE);
-    manager.stop();
-    return Lists.newArrayList(druidDataSource.getSegments());
-  }
-
-  @Test
-  public void testSimpleJob() throws IOException, InterruptedException
-  {
-
-    final SQLMetadataSegmentManager manager = new SQLMetadataSegmentManager(
-        HadoopDruidConverterConfig.jsonMapper,
-        new Supplier<MetadataSegmentManagerConfig>()
-        {
-          @Override
-          public MetadataSegmentManagerConfig get()
-          {
-            return new MetadataSegmentManagerConfig();
-          }
-        },
-        metadataStorageTablesConfigSupplier,
-        connector
-    );
-
-    final List<DataSegment> oldSemgments = getDataSegments(manager);
-    final File tmpDir = temporaryFolder.newFolder();
-    final HadoopConverterJob converterJob = new HadoopConverterJob(
-        new HadoopDruidConverterConfig(
-            DATASOURCE,
-            interval,
-            new IndexSpec(new RoaringBitmapSerdeFactory(null),
-                          CompressionStrategy.UNCOMPRESSED,
-                          CompressionStrategy.UNCOMPRESSED,
-                          CompressionFactory.LongEncodingStrategy.LONGS),
-            oldSemgments,
-            true,
-            tmpDir.toURI(),
-            ImmutableMap.of(),
-            null,
-            tmpSegmentDir.toURI().toString()
-        )
-    );
-
-    final List<DataSegment> segments = Lists.newArrayList(converterJob.run());
-    Assert.assertNotNull("bad result", segments);
-    Assert.assertEquals("wrong segment count", 4, segments.size());
-    Assert.assertTrue(converterJob.getLoadedBytes() > 0);
-    Assert.assertTrue(converterJob.getWrittenBytes() > 0);
-    Assert.assertTrue(converterJob.getWrittenBytes() > converterJob.getLoadedBytes());
-
-    Assert.assertEquals(oldSemgments.size(), segments.size());
-
-    final DataSegment segment = segments.get(0);
-    Assert.assertTrue(interval.contains(segment.getInterval()));
-    Assert.assertTrue(segment.getVersion().endsWith("_converted"));
-    Assert.assertTrue(segment.getLoadSpec().get("path").toString().contains("_converted"));
-
-    for (File file : tmpDir.listFiles()) {
-      Assert.assertFalse(file.isDirectory());
-      Assert.assertTrue(file.isFile());
-    }
-
-
-    final Comparator<DataSegment> segmentComparator = new Comparator<DataSegment>()
-    {
-      @Override
-      public int compare(DataSegment o1, DataSegment o2)
-      {
-        return o1.getId().compareTo(o2.getId());
-      }
-    };
-    Collections.sort(
-        oldSemgments,
-        segmentComparator
-    );
-    Collections.sort(
-        segments,
-        segmentComparator
-    );
-
-    for (int i = 0; i < oldSemgments.size(); ++i) {
-      final DataSegment oldSegment = oldSemgments.get(i);
-      final DataSegment newSegment = segments.get(i);
-      Assert.assertEquals(oldSegment.getDataSource(), newSegment.getDataSource());
-      Assert.assertEquals(oldSegment.getInterval(), newSegment.getInterval());
-      Assert.assertEquals(
-          Sets.newHashSet(oldSegment.getMetrics()),
-          Sets.newHashSet(newSegment.getMetrics())
-      );
-      Assert.assertEquals(
-          Sets.newHashSet(oldSegment.getDimensions()),
-          Sets.newHashSet(newSegment.getDimensions())
-      );
-      Assert.assertEquals(oldSegment.getVersion() + "_converted", newSegment.getVersion());
-      Assert.assertTrue(oldSegment.getSize() < newSegment.getSize());
-      Assert.assertEquals(oldSegment.getBinaryVersion(), newSegment.getBinaryVersion());
-    }
-  }
-
-  private static void corrupt(
-      DataSegment segment
-  ) throws IOException
-  {
-    final Map<String, Object> localLoadSpec = segment.getLoadSpec();
-    final Path segmentPath = Paths.get(localLoadSpec.get("path").toString());
-
-    final MappedByteBuffer buffer = Files.map(segmentPath.toFile(), FileChannel.MapMode.READ_WRITE);
-    while (buffer.hasRemaining()) {
-      buffer.put((byte) 0xFF);
-    }
-  }
-
-  @Test
-  @Ignore // This takes a long time due to retries
-  public void testHadoopFailure() throws IOException, InterruptedException
-  {
-    final SQLMetadataSegmentManager manager = new SQLMetadataSegmentManager(
-        HadoopDruidConverterConfig.jsonMapper,
-        new Supplier<MetadataSegmentManagerConfig>()
-        {
-          @Override
-          public MetadataSegmentManagerConfig get()
-          {
-            return new MetadataSegmentManagerConfig();
-          }
-        },
-        metadataStorageTablesConfigSupplier,
-        connector
-    );
-
-    final List<DataSegment> oldSemgments = getDataSegments(manager);
-    final File tmpDir = temporaryFolder.newFolder();
-    final HadoopConverterJob converterJob = new HadoopConverterJob(
-        new HadoopDruidConverterConfig(
-            DATASOURCE,
-            interval,
-            new IndexSpec(new RoaringBitmapSerdeFactory(null),
-                          CompressionStrategy.UNCOMPRESSED,
-                          CompressionStrategy.UNCOMPRESSED,
-                          CompressionFactory.LongEncodingStrategy.LONGS),
-            oldSemgments,
-            true,
-            tmpDir.toURI(),
-            ImmutableMap.of(),
-            null,
-            tmpSegmentDir.toURI().toString()
-        )
-    );
-
-    corrupt(oldSemgments.get(0));
-
-    final List<DataSegment> result = converterJob.run();
-    Assert.assertNull("result should be null", result);
-
-    final List<DataSegment> segments = getDataSegments(manager);
-
-    Assert.assertEquals(oldSemgments.size(), segments.size());
-
-    Assert.assertEquals(oldSemgments, segments);
-  }
-}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/updater/HadoopDruidConverterConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/updater/HadoopDruidConverterConfigTest.java
deleted file mode 100644
index 2bd8513..0000000
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/updater/HadoopDruidConverterConfigTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer.updater;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.segment.IndexSpec;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-import java.net.URI;
-
-public class HadoopDruidConverterConfigTest
-{
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-  @Test
-  public void simpleSerDe() throws IOException
-  {
-    final HadoopDruidConverterConfig config = new HadoopDruidConverterConfig(
-        "datasource",
-        Intervals.of("2000/2010"),
-        new IndexSpec(),
-        ImmutableList.of(),
-        true,
-        URI.create("file:/dev/null"),
-        ImmutableMap.of(),
-        "HIGH",
-        temporaryFolder.newFolder().getAbsolutePath()
-    );
-    final ObjectMapper mapper = new DefaultObjectMapper();
-    mapper.registerSubtypes(HadoopDruidConverterConfig.class);
-    final byte[] value = mapper.writeValueAsBytes(config);
-    final HadoopDruidConverterConfig config2 = mapper.readValue(
-        value,
-        HadoopDruidConverterConfig.class
-    );
-    Assert.assertEquals(mapper.writeValueAsString(config), mapper.writeValueAsString(config2));
-  }
-}
diff --git a/indexing-hadoop/src/test/resources/log4j2.xml b/indexing-hadoop/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..bca6c69
--- /dev/null
+++ b/indexing-hadoop/src/test/resources/log4j2.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+    <Logger level="debug" name="org.apache.druid" additivity="false">
+      <AppenderRef ref="Console"/>
+    </Logger>
+  </Loggers>
+</Configuration>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index c055202..9d2bed8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -28,7 +28,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.joda.time.Period;
 
@@ -42,7 +42,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
   private static final int defaultMaxRowsPerSegment = 5_000_000;
   private static final Period defaultIntermediatePersistPeriod = new Period("PT10M");
   private static final int defaultMaxPendingPersists = 0;
-  private static final ShardSpec defaultShardSpec = NoneShardSpec.instance();
+  private static final ShardSpec defaultShardSpec = new NumberedShardSpec(0, 1);
   private static final IndexSpec defaultIndexSpec = new IndexSpec();
   private static final Boolean defaultReportParseExceptions = Boolean.FALSE;
   private static final long defaultPublishAndHandoffTimeout = 0;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java
deleted file mode 100644
index 9232e86..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.IndexMerger;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.IndexableAdapter;
-import org.apache.druid.segment.QueryableIndexIndexableAdapter;
-import org.apache.druid.segment.RowFilteringIndexAdapter;
-import org.apache.druid.segment.RowPointer;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.TimelineObjectHolder;
-import org.apache.druid.timeline.VersionedIntervalTimeline;
-import org.apache.druid.timeline.partition.PartitionChunk;
-import org.joda.time.Interval;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class AppendTask extends MergeTaskBase
-{
-  private final IndexSpec indexSpec;
-  private final List<AggregatorFactory> aggregators;
-
-  @JsonCreator
-  public AppendTask(
-      @JsonProperty("id") String id,
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("segments") List<DataSegment> segments,
-      @JsonProperty("aggregations") List<AggregatorFactory> aggregators,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
-      @JsonProperty("buildV9Directly") Boolean buildV9Directly,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("context") Map<String, Object> context
-  )
-  {
-    super(id, dataSource, segments, segmentWriteOutMediumFactory, context);
-    this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
-    this.aggregators = aggregators;
-  }
-
-  @Override
-  public File merge(final TaskToolbox toolbox, final Map<DataSegment, File> segments, final File outDir)
-      throws Exception
-  {
-    VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(segments.keySet());
-
-    final Iterable<SegmentToMergeHolder> segmentsToMerge = Iterables.concat(
-        Iterables.transform(
-            timeline.lookup(Intervals.of("1000-01-01/3000-01-01")),
-            new Function<TimelineObjectHolder<String, DataSegment>, Iterable<SegmentToMergeHolder>>()
-            {
-              @Override
-              public Iterable<SegmentToMergeHolder> apply(final TimelineObjectHolder<String, DataSegment> input)
-              {
-                return Iterables.transform(
-                    input.getObject(),
-                    new Function<PartitionChunk<DataSegment>, SegmentToMergeHolder>()
-                    {
-                      @Nullable
-                      @Override
-                      public SegmentToMergeHolder apply(PartitionChunk<DataSegment> chunkInput)
-                      {
-                        DataSegment segment = chunkInput.getObject();
-                        return new SegmentToMergeHolder(
-                            input.getInterval(),
-                            Preconditions.checkNotNull(
-                                segments.get(segment),
-                                "File for segment %s", segment.getId()
-                            )
-                        );
-                      }
-                    }
-                );
-              }
-            }
-        )
-    );
-
-    List<IndexableAdapter> adapters = new ArrayList<>();
-    for (final SegmentToMergeHolder holder : segmentsToMerge) {
-      adapters.add(
-          new RowFilteringIndexAdapter(
-              new QueryableIndexIndexableAdapter(toolbox.getIndexIO().loadIndex(holder.getFile())),
-              (RowPointer rowPointer) -> holder.getInterval().contains(rowPointer.getTimestamp())
-          )
-      );
-    }
-
-    IndexMerger indexMerger = toolbox.getIndexMergerV9();
-    return indexMerger.append(
-        adapters,
-        aggregators == null ? null : aggregators.toArray(new AggregatorFactory[0]),
-        outDir,
-        indexSpec,
-        getSegmentWriteOutMediumFactory()
-    );
-  }
-
-  @Override
-  public String getType()
-  {
-    return "append";
-  }
-
-  @JsonProperty("aggregations")
-  public List<AggregatorFactory> getAggregators()
-  {
-    return aggregators;
-  }
-
-  private static class SegmentToMergeHolder
-  {
-    private final Interval interval;
-    private final File file;
-
-    private SegmentToMergeHolder(Interval interval, File file)
-    {
-      this.interval = interval;
-      this.file = file;
-    }
-
-    public Interval getInterval()
-    {
-      return interval;
-    }
-
-    public File getFile()
-    {
-      return file;
-    }
-  }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 5263402..2f52b69 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -88,7 +88,6 @@ import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
-import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.apache.druid.utils.CircularBuffer;
@@ -122,7 +121,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -559,11 +557,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
     return tuningConfig.isForceGuaranteedRollup();
   }
 
-  private static boolean isExtendableShardSpecs(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig)
-  {
-    return tuningConfig.isForceExtendableShardSpecs() || ioConfig.isAppendToExisting();
-  }
-
   /**
    * Determines intervals and shardSpecs for input data.  This method first checks that it must determine intervals and
    * shardSpecs by itself.  Intervals must be determined if they are not specified in {@link GranularitySpec}.
@@ -637,16 +630,16 @@ public class IndexTask extends AbstractTask implements ChatHandler
     if (isGuaranteedRollup(ioConfig, tuningConfig)) {
       // Overwrite mode, guaranteed rollup: shardSpecs must be known in advance.
       final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
-      final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
-          numShards,
-          tuningConfig.getPartitionDimensions(),
-          jsonMapper
-      );
 
       for (Interval interval : intervals) {
         final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
                                                             .mapToObj(
-                                                                shardId -> shardSpecCreateFn.apply(shardId, numShards)
+                                                                shardId -> new HashBasedNumberedShardSpec(
+                                                                    shardId,
+                                                                    numShards,
+                                                                    tuningConfig.partitionDimensions,
+                                                                    jsonMapper
+                                                                )
                                                             )
                                                             .collect(Collectors.toList());
         shardSpecs.put(interval, intervalShardSpecs);
@@ -705,15 +698,14 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
       if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
         // Overwrite mode, guaranteed rollup: shardSpecs must be known in advance.
-        final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
-            numShards,
-            tuningConfig.getPartitionDimensions(),
-            jsonMapper
-        );
-
         final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
                                                             .mapToObj(
-                                                                shardId -> shardSpecCreateFn.apply(shardId, numShards)
+                                                                shardId -> new HashBasedNumberedShardSpec(
+                                                                    shardId,
+                                                                    numShards,
+                                                                    tuningConfig.partitionDimensions,
+                                                                    jsonMapper
+                                                                )
                                                             ).collect(Collectors.toList());
 
         intervalToShardSpecs.put(interval, intervalShardSpecs);
@@ -824,26 +816,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
     return hllCollectors;
   }
 
-  private static BiFunction<Integer, Integer, ShardSpec> getShardSpecCreateFunction(
-      Integer numShards,
-      List<String> partitionDimensions,
-      ObjectMapper jsonMapper
-  )
-  {
-    Preconditions.checkNotNull(numShards, "numShards");
-
-    if (numShards == 1) {
-      return (shardId, totalNumShards) -> NoneShardSpec.instance();
-    } else {
-      return (shardId, totalNumShards) -> new HashBasedNumberedShardSpec(
-          shardId,
-          totalNumShards,
-          partitionDimensions,
-          jsonMapper
-      );
-    }
-  }
-
   /**
    * This method reads input data row by row and adds the read row to a proper segment using {@link BaseAppenderatorDriver}.
    * If there is no segment for the row, a new one is created.  Segments can be published in the middle of reading inputs
@@ -899,21 +871,10 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
       for (Map.Entry<Interval, List<ShardSpec>> entry : shardSpecs.getMap().entrySet()) {
         for (ShardSpec shardSpec : entry.getValue()) {
-          final ShardSpec shardSpecForPublishing;
-
-          if (isExtendableShardSpecs(ioConfig, tuningConfig)) {
-            shardSpecForPublishing = new NumberedShardSpec(
-                shardSpec.getPartitionNum(),
-                entry.getValue().size()
-            );
-          } else {
-            shardSpecForPublishing = shardSpec;
-          }
-
           final String version = findVersion(versions, entry.getKey());
           lookup.put(
               Appenderators.getSequenceName(entry.getKey(), version, shardSpec),
-              new SegmentIdWithShardSpec(getDataSource(), entry.getKey(), version, shardSpecForPublishing)
+              new SegmentIdWithShardSpec(getDataSource(), entry.getKey(), version, shardSpec)
           );
         }
       }
@@ -1300,7 +1261,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
     private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
     private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
-    private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false;
     private static final boolean DEFAULT_GUARANTEE_ROLLUP = false;
     private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
     private static final long DEFAULT_PUSH_TIMEOUT = 0;
@@ -1319,16 +1279,10 @@ public class IndexTask extends AbstractTask implements ChatHandler
     private final int maxPendingPersists;
 
     /**
-     * This flag is to force to always use an extendableShardSpec (like {@link NumberedShardSpec} even if
-     * {@link #forceGuaranteedRollup} is set.
-     */
-    private final boolean forceExtendableShardSpecs;
-
-    /**
      * This flag is to force _perfect rollup mode_. {@link IndexTask} will scan the whole input data twice to 1) figure
      * out proper shard specs for each segment and 2) generate segments. Note that perfect rollup mode basically assumes
-     * that no more data will be appended in the future. As a result, in perfect rollup mode, {@link NoneShardSpec} and
-     * {@link HashBasedNumberedShardSpec} are used for a single shard and two or shards, respectively.
+     * that no more data will be appended in the future. As a result, in perfect rollup mode,
+     * {@link HashBasedNumberedShardSpec} is used for shards.
      */
     private final boolean forceGuaranteedRollup;
     private final boolean reportParseExceptions;
@@ -1352,17 +1306,16 @@ public class IndexTask extends AbstractTask implements ChatHandler
         @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
         @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
         @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
-        @JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED
+        @JsonProperty("rowFlushBoundary") @Deprecated @Nullable Integer rowFlushBoundary_forBackCompatibility,
         @JsonProperty("numShards") @Nullable Integer numShards,
         @JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
         @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
         @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
         // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
         @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
-        @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs,
         @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup,
-        @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
-        @JsonProperty("publishTimeout") @Nullable Long publishTimeout, // deprecated
+        @JsonProperty("reportParseExceptions") @Deprecated @Nullable Boolean reportParseExceptions,
+        @JsonProperty("publishTimeout") @Deprecated @Nullable Long publishTimeout,
         @JsonProperty("pushTimeout") @Nullable Long pushTimeout,
         @JsonProperty("segmentWriteOutMediumFactory") @Nullable
             SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@@ -1380,7 +1333,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
           partitionDimensions,
           indexSpec,
           maxPendingPersists,
-          forceExtendableShardSpecs,
           forceGuaranteedRollup,
           reportParseExceptions,
           pushTimeout != null ? pushTimeout : publishTimeout,
@@ -1399,7 +1351,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
     private IndexTuningConfig()
     {
-      this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+      this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
     }
 
     private IndexTuningConfig(
@@ -1411,7 +1363,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
         @Nullable List<String> partitionDimensions,
         @Nullable IndexSpec indexSpec,
         @Nullable Integer maxPendingPersists,
-        @Nullable Boolean forceExtendableShardSpecs,
         @Nullable Boolean forceGuaranteedRollup,
         @Nullable Boolean reportParseExceptions,
         @Nullable Long pushTimeout,
@@ -1439,9 +1390,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
       this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions;
       this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
       this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
-      this.forceExtendableShardSpecs = forceExtendableShardSpecs == null
-                                       ? DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS
-                                       : forceExtendableShardSpecs;
       this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup;
       this.reportParseExceptions = reportParseExceptions == null
                                    ? DEFAULT_REPORT_PARSE_EXCEPTIONS
@@ -1478,7 +1426,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
           partitionDimensions,
           indexSpec,
           maxPendingPersists,
-          forceExtendableShardSpecs,
           forceGuaranteedRollup,
           reportParseExceptions,
           pushTimeout,
@@ -1501,7 +1448,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
           partitionDimensions,
           indexSpec,
           maxPendingPersists,
-          forceExtendableShardSpecs,
           forceGuaranteedRollup,
           reportParseExceptions,
           pushTimeout,
@@ -1594,12 +1540,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
     }
 
     @JsonProperty
-    public boolean isForceExtendableShardSpecs()
-    {
-      return forceExtendableShardSpecs;
-    }
-
-    @JsonProperty
     public boolean isForceGuaranteedRollup()
     {
       return forceGuaranteedRollup;
@@ -1663,7 +1603,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
       return maxRowsInMemory == that.maxRowsInMemory &&
              Objects.equals(maxTotalRows, that.maxTotalRows) &&
              maxPendingPersists == that.maxPendingPersists &&
-             forceExtendableShardSpecs == that.forceExtendableShardSpecs &&
              forceGuaranteedRollup == that.forceGuaranteedRollup &&
              reportParseExceptions == that.reportParseExceptions &&
              pushTimeout == that.pushTimeout &&
@@ -1688,7 +1627,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
           indexSpec,
           basePersistDirectory,
           maxPendingPersists,
-          forceExtendableShardSpecs,
           forceGuaranteedRollup,
           reportParseExceptions,
           pushTimeout,
@@ -1711,7 +1649,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
              ", indexSpec=" + indexSpec +
              ", basePersistDirectory=" + basePersistDirectory +
              ", maxPendingPersists=" + maxPendingPersists +
-             ", forceExtendableShardSpecs=" + forceExtendableShardSpecs +
              ", forceGuaranteedRollup=" + forceGuaranteedRollup +
              ", reportParseExceptions=" + reportParseExceptions +
              ", pushTimeout=" + pushTimeout +
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTask.java
deleted file mode 100644
index 11cd78f..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTask.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.IndexMerger;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.QueryableIndex;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.timeline.DataSegment;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class MergeTask extends MergeTaskBase
-{
-  @JsonIgnore
-  private final List<AggregatorFactory> aggregators;
-  private final Boolean rollup;
-  private final IndexSpec indexSpec;
-
-  @JsonCreator
-  public MergeTask(
-      @JsonProperty("id") String id,
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("segments") List<DataSegment> segments,
-      @JsonProperty("aggregations") List<AggregatorFactory> aggregators,
-      @JsonProperty("rollup") Boolean rollup,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
-      @JsonProperty("buildV9Directly") Boolean buildV9Directly,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("context") Map<String, Object> context
-  )
-  {
-    super(id, dataSource, segments, segmentWriteOutMediumFactory, context);
-    this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations");
-    this.rollup = rollup == null ? Boolean.TRUE : rollup;
-    this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
-  }
-
-  @Override
-  public File merge(final TaskToolbox toolbox, final Map<DataSegment, File> segments, final File outDir)
-      throws Exception
-  {
-    IndexMerger indexMerger = toolbox.getIndexMergerV9();
-    return indexMerger.mergeQueryableIndex(
-        Lists.transform(
-            ImmutableList.copyOf(segments.values()),
-            new Function<File, QueryableIndex>()
-            {
-              @Override
-              public QueryableIndex apply(@Nullable File input)
-              {
-                try {
-                  return toolbox.getIndexIO().loadIndex(input);
-                }
-                catch (Exception e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            }
-        ),
-        rollup,
-        aggregators.toArray(new AggregatorFactory[0]),
-        outDir,
-        indexSpec,
-        getSegmentWriteOutMediumFactory()
-    );
-  }
-
-  @Override
-  public String getType()
-  {
-    return "merge";
-  }
-
-  @JsonProperty
-  public Boolean getRollup()
-  {
-    return rollup;
-  }
-
-  @JsonProperty
-  public IndexSpec getIndexSpec()
-  {
-    return indexSpec;
-  }
-
-  @JsonProperty("aggregations")
-  public List<AggregatorFactory> getAggregators()
-  {
-    return aggregators;
-  }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java
deleted file mode 100644
index 2718be0..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MergeTaskBase.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
-import com.google.common.hash.Hashing;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.TaskLock;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
-import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
-/**
- */
-public abstract class MergeTaskBase extends AbstractFixedIntervalTask
-{
-  private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);
-
-  @JsonIgnore
-  private final List<DataSegment> segments;
-  @JsonIgnore
-  @Nullable
-  private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
-
-  protected MergeTaskBase(
-      final String id,
-      final String dataSource,
-      final List<DataSegment> segments,
-      final @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      Map<String, Object> context
-  )
-  {
-    super(
-        // _not_ the version, just something uniqueish
-        id != null ? id : StringUtils.format(
-            "merge_%s_%s", computeProcessingID(dataSource, segments), DateTimes.nowUtc().toString()
-        ),
-        dataSource,
-        computeMergedInterval(segments),
-        context
-    );
-
-    // Verify segment list is nonempty
-    Preconditions.checkArgument(segments.size() > 0, "segments nonempty");
-    // Verify segments are all in the correct datasource
-    Preconditions.checkArgument(
-        Iterables.size(
-            Iterables.filter(
-                segments,
-                new Predicate<DataSegment>()
-                {
-                  @Override
-                  public boolean apply(@Nullable DataSegment segment)
-                  {
-                    return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource);
-                  }
-                }
-            )
-        ) == 0, "segments in the wrong datasource"
-    );
-    verifyInputSegments(segments);
-
-    this.segments = segments;
-    this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
-  }
-
-  protected void verifyInputSegments(List<DataSegment> segments)
-  {
-    // Verify segments are all unsharded
-    Preconditions.checkArgument(
-        Iterables.size(
-            Iterables.filter(
-                segments,
-                new Predicate<DataSegment>()
-                {
-                  @Override
-                  public boolean apply(@Nullable DataSegment segment)
-                  {
-                    return segment == null || !(segment.getShardSpec() instanceof NoneShardSpec);
-                  }
-                }
-            )
-        ) == 0, "segments without NoneShardSpec"
-    );
-  }
-
-  @Override
-  public int getPriority()
-  {
-    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
-  }
-
-  @Override
-  public TaskStatus run(TaskToolbox toolbox) throws Exception
-  {
-    final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient()));
-    final ServiceEmitter emitter = toolbox.getEmitter();
-    final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
-    final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
-    final File mergeDir = toolbox.getMergeDir();
-
-    try {
-      final long startTime = System.currentTimeMillis();
-
-      log.info("Starting merge of id[%s], segments: %s", getId(), Lists.transform(segments, DataSegment::getId));
-
-      // download segments to merge
-      final Map<DataSegment, File> gettedSegments = toolbox.fetchSegments(segments);
-
-      // merge files together
-      final File fileToUpload = merge(toolbox, gettedSegments, mergeDir);
-
-      emitter.emit(builder.build("merger/numMerged", segments.size()));
-      emitter.emit(builder.build("merger/mergeTime", System.currentTimeMillis() - startTime));
-
-      log.info(
-          "[%s] : Merged %d segments in %,d millis",
-          mergedSegment.getDataSource(),
-          segments.size(),
-          System.currentTimeMillis() - startTime
-      );
-
-      long uploadStart = System.currentTimeMillis();
-
-      // Upload file
-
-      final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment, false);
-
-      emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
-      emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
-
-      toolbox.publishSegments(ImmutableList.of(uploadedSegment));
-
-      return TaskStatus.success(getId());
-    }
-    catch (Exception e) {
-      log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource())
-         .addData("interval", mergedSegment.getInterval())
-         .emit();
-
-      return TaskStatus.failure(getId());
-    }
-  }
-
-  /**
-   * Checks pre-existing segments in "context" to confirm that this merge query is valid. Specifically, confirm that
-   * we are operating on every segment that overlaps the chosen interval.
-   */
-  @Override
-  public boolean isReady(TaskActionClient taskActionClient) throws Exception
-  {
-    // Try to acquire lock
-    if (!super.isReady(taskActionClient)) {
-      return false;
-    } else {
-
-      final Set<SegmentId> current = taskActionClient
-          .submit(new SegmentListUsedAction(getDataSource(), getInterval(), null))
-          .stream()
-          .map(DataSegment::getId)
-          .collect(Collectors.toSet());
-
-      final Set<SegmentId> requested = segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
-
-      final Set<SegmentId> missingFromRequested = Sets.difference(current, requested);
-      if (!missingFromRequested.isEmpty()) {
-        throw new ISE(
-            "Merge is invalid: current segment(s) are not in the requested set: %s",
-            Joiner.on(", ").join(missingFromRequested)
-        );
-      }
-
-      final Set<SegmentId> missingFromCurrent = Sets.difference(requested, current);
-      if (!missingFromCurrent.isEmpty()) {
-        throw new ISE(
-            "Merge is invalid: requested segment(s) are not in the current set: %s",
-            Joiner.on(", ").join(missingFromCurrent)
-        );
-      }
-
-      return true;
-    }
-  }
-
-  protected abstract File merge(TaskToolbox taskToolbox, Map<DataSegment, File> segments, File outDir)
-      throws Exception;
-
-  @JsonProperty
-  public List<DataSegment> getSegments()
-  {
-    return segments;
-  }
-
-  @JsonProperty
-  @Nullable
-  public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
-  {
-    return segmentWriteOutMediumFactory;
-  }
-
-  @Override
-  public String toString()
-  {
-    return Objects.toStringHelper(this)
-                  .add("id", getId())
-                  .add("dataSource", getDataSource())
-                  .add("interval", getInterval())
-                  .add("segments", segments)
-                  .add("segmentWriteOutMediumFactory", segmentWriteOutMediumFactory)
-                  .toString();
-  }
-
-  private static String computeProcessingID(final String dataSource, final List<DataSegment> segments)
-  {
-    final String segmentIDs = Joiner.on("_").join(
-        Iterables.transform(
-            Ordering.natural().sortedCopy(segments), new Function<DataSegment, String>()
-            {
-              @Override
-              public String apply(DataSegment x)
-              {
-                return StringUtils.format(
-                    "%s_%s_%s_%s",
-                    x.getInterval().getStart(),
-                    x.getInterval().getEnd(),
-                    x.getVersion(),
-                    x.getShardSpec().getPartitionNum()
-                );
-              }
-            }
-        )
-    );
-
-    return StringUtils.format(
-        "%s_%s",
-        dataSource,
-        Hashing.sha1().hashString(segmentIDs, StandardCharsets.UTF_8).toString()
-    );
-  }
-
-  private static Interval computeMergedInterval(final List<DataSegment> segments)
-  {
-    Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0");
-
-    DateTime start = null;
-    DateTime end = null;
-
-    for (final DataSegment segment : segments) {
-      if (start == null || segment.getInterval().getStart().isBefore(start)) {
-        start = segment.getInterval().getStart();
-      }
-
-      if (end == null || segment.getInterval().getEnd().isAfter(end)) {
-        end = segment.getInterval().getEnd();
-      }
-    }
-
-    return new Interval(start, end);
-  }
-
-  private static DataSegment computeMergedSegment(
-      final String dataSource,
-      final String version,
-      final List<DataSegment> segments
-  )
-  {
-    final Interval mergedInterval = computeMergedInterval(segments);
-    final Set<String> mergedDimensions = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
-    final Set<String> mergedMetrics = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
-
-    for (DataSegment segment : segments) {
-      mergedDimensions.addAll(segment.getDimensions());
-      mergedMetrics.addAll(segment.getMetrics());
-    }
-
-    return DataSegment.builder()
-                      .dataSource(dataSource)
-                      .interval(mergedInterval)
-                      .version(version)
-                      .binaryVersion(IndexIO.CURRENT_VERSION_ID)
-                      .shardSpec(NoneShardSpec.instance())
-                      .dimensions(Lists.newArrayList(mergedDimensions))
-                      .metrics(Lists.newArrayList(mergedMetrics))
-                      .build();
-  }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SameIntervalMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SameIntervalMergeTask.java
deleted file mode 100644
index e1c0f2b..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SameIntervalMergeTask.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-
-import javax.annotation.Nullable;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class SameIntervalMergeTask extends AbstractFixedIntervalTask
-{
-  private static final String TYPE = "same_interval_merge";
-  @JsonIgnore
-  private final List<AggregatorFactory> aggregators;
-  private final Boolean rollup;
-  private final IndexSpec indexSpec;
-  @Nullable
-  private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
-
-  public SameIntervalMergeTask(
-      @JsonProperty("id") String id,
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("interval") Interval interval,
-      @JsonProperty("aggregations") List<AggregatorFactory> aggregators,
-      @JsonProperty("rollup") Boolean rollup,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
-      @SuppressWarnings("unused")
-      @JsonProperty("buildV9Directly") Boolean buildV9Directly,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("context") Map<String, Object> context
-  )
-  {
-    super(
-        makeId(id, TYPE, dataSource, interval),
-        dataSource,
-        interval,
-        context
-    );
-    this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations");
-    this.rollup = rollup == null ? Boolean.TRUE : rollup;
-    this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
-    this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
-  }
-
-  @JsonProperty("aggregations")
-  public List<AggregatorFactory> getAggregators()
-  {
-    return aggregators;
-  }
-
-  @JsonProperty
-  public Boolean getRollup()
-  {
-    return rollup;
-  }
-
-  @JsonProperty
-  public IndexSpec getIndexSpec()
-  {
-    return indexSpec;
-  }
-
-  /**
-   * Always returns true, doesn't affect the version being built.
-   */
-  @Deprecated
-  @JsonProperty
-  public Boolean getBuildV9Directly()
-  {
-    return true;
-  }
-
-  public static String makeId(String id, final String typeName, String dataSource, Interval interval)
-  {
-    return id != null ? id : joinId(
-        typeName,
-        dataSource,
-        interval.getStart(),
-        interval.getEnd(),
-        DateTimes.nowUtc().toString()
-    );
-  }
-
-  @Override
-  public String getType()
-  {
-    return TYPE;
-  }
-
-  @Override
-  public TaskStatus run(TaskToolbox toolbox) throws Exception
-  {
-    final List<DataSegment> segments = toolbox.getTaskActionClient().submit(
-        new SegmentListUsedAction(
-            getDataSource(),
-            getInterval(),
-            null
-        )
-    );
-    SubTask mergeTask = new SubTask(
-        getId(),
-        getDataSource(),
-        segments,
-        aggregators,
-        rollup,
-        indexSpec,
-        segmentWriteOutMediumFactory,
-        getContext()
-    );
-    final TaskStatus status = mergeTask.run(toolbox);
-    if (!status.isSuccess()) {
-      return TaskStatus.fromCode(getId(), status.getStatusCode());
-    }
-    return success();
-  }
-
-  public static class SubTask extends MergeTask
-  {
-    private SubTask(
-        String baseId,
-        String dataSource,
-        List<DataSegment> segments,
-        List<AggregatorFactory> aggregators,
-        Boolean rollup,
-        IndexSpec indexSpec,
-        @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-        Map<String, Object> context
-    )
-    {
-      super(
-          "sub_" + baseId,
-          dataSource,
-          segments,
-          aggregators,
-          rollup,
-          indexSpec,
-          true,
-          segmentWriteOutMediumFactory,
-          context
-      );
-    }
-
-    @Override
-    protected void verifyInputSegments(List<DataSegment> segments)
-    {
-      // do nothing
-    }
-  }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 4523fc2..c32f27d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -46,8 +46,6 @@ import java.util.Map;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = "append", value = AppendTask.class),
-    @JsonSubTypes.Type(name = "merge", value = MergeTask.class),
     @JsonSubTypes.Type(name = "kill", value = KillTask.class),
     @JsonSubTypes.Type(name = "move", value = MoveTask.class),
     @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
@@ -59,7 +57,6 @@ import java.util.Map;
     @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
     @JsonSubTypes.Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class),
     @JsonSubTypes.Type(name = "noop", value = NoopTask.class),
-    @JsonSubTypes.Type(name = "same_interval_merge", value = SameIntervalMergeTask.class),
     @JsonSubTypes.Type(name = "compact", value = CompactionTask.class)
 })
 public interface Task
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index a9a8066..ecf76ee 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -335,7 +335,6 @@ public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHan
         tuningConfig.getIndexSpec(),
         tuningConfig.getMaxPendingPersists(),
         true,
-        tuningConfig.isForceExtendableShardSpecs(),
         false,
         tuningConfig.isReportParseExceptions(),
         null,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index c0e9370..2038bf0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -71,7 +71,6 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
         null,
         null,
         null,
-        null,
         null
     );
   }
@@ -86,7 +85,6 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
       @JsonProperty("numShards") @Nullable Integer numShards,
       @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
       @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
-      @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs,
       @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup,
       @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
       @JsonProperty("pushTimeout") @Nullable Long pushTimeout,
@@ -113,7 +111,6 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
         indexSpec,
         maxPendingPersists,
         null,
-        forceExtendableShardSpecs,
         forceGuaranteedRollup,
         reportParseExceptions,
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
index b9f36b2..1b0430a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
@@ -20,14 +20,12 @@
 package org.apache.druid.indexing.common;
 
 import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
-import org.apache.druid.guice.ServerModule;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -47,7 +45,6 @@ import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -77,11 +74,6 @@ public class TestUtils
     );
     indexMergerV9 = new IndexMergerV9(jsonMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
 
-    final List<? extends Module> list = new ServerModule().getJacksonModules();
-    for (Module module : list) {
-      jsonMapper.registerModule(module);
-    }
-
     this.rowIngestionMetersFactory = new DropwizardRowIngestionMetersFactory();
 
     jsonMapper.setInjectableValues(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 6ad0ec4..ea109c8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -344,7 +344,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
                 Granularities.MINUTE,
                 null
             ),
-            IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, null, false, false, true),
+            IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, null, false, true),
             false
         ),
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 283dee8..cbd52f5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -298,7 +298,6 @@ public class CompactionTaskTest
         ),
         5000,
         true,
-        false,
         true,
         false,
         null,
@@ -502,7 +501,6 @@ public class CompactionTaskTest
         ),
         5000,
         true,
-        false,
         true,
         false,
         null,
@@ -578,7 +576,6 @@ public class CompactionTaskTest
         ),
         5000,
         true,
-        false,
         true,
         false,
         null,
@@ -654,7 +651,6 @@ public class CompactionTaskTest
         ),
         5000,
         true,
-        false,
         true,
         false,
         null,
@@ -976,7 +972,6 @@ public class CompactionTaskTest
         ),
         5000,
         true,
-        false,
         true,
         false,
         null,
@@ -1237,7 +1232,6 @@ public class CompactionTaskTest
             ),
             5000,
             true,
-            false,
             true,
             false,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
index 7e2b2ef..c77f063 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
@@ -21,8 +21,8 @@ package org.apache.druid.indexing.common.task;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.HadoopDruidIndexerConfig;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexer.updater.HadoopDruidConverterConfig;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
@@ -98,7 +98,7 @@ public class HadoopTaskTest
     Assert.assertFalse(classLoader instanceof ApplicationClassLoader);
     Assert.assertTrue(classLoader instanceof URLClassLoader);
 
-    final ClassLoader appLoader = HadoopDruidConverterConfig.class.getClassLoader();
+    final ClassLoader appLoader = HadoopDruidIndexerConfig.class.getClassLoader();
     Assert.assertNotEquals(StringUtils.format("ClassLoader [%s] is not isolated!", classLoader), appLoader, classLoader);
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 60a3a87..97279f9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -95,7 +95,6 @@ import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
-import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.joda.time.Interval;
@@ -241,7 +240,7 @@ public class IndexTaskTest
             tmpDir,
             null,
             null,
-            createTuningConfigWithMaxRowsPerSegment(2, false, true),
+            createTuningConfigWithMaxRowsPerSegment(2, true),
             false
         ),
         null,
@@ -268,52 +267,6 @@ public class IndexTaskTest
   }
 
   @Test
-  public void testForceExtendableShardSpecs() throws Exception
-  {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
-      writer.write("2014-01-01T00:00:10Z,a,1\n");
-      writer.write("2014-01-01T01:00:20Z,b,1\n");
-      writer.write("2014-01-01T02:00:30Z,c,1\n");
-    }
-
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
-        createIngestionSpec(
-            tmpDir,
-            null,
-            null,
-            createTuningConfigWithMaxRowsPerSegment(2, true, true),
-            false
-        ),
-        null,
-        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
-        null,
-        rowIngestionMetersFactory
-    );
-
-    Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
-
-    final List<DataSegment> segments = runTask(indexTask).rhs;
-
-    Assert.assertEquals(2, segments.size());
-
-    Assert.assertEquals("test", segments.get(0).getDataSource());
-    Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
-    Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
-    Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
-
-    Assert.assertEquals("test", segments.get(1).getDataSource());
-    Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval());
-    Assert.assertEquals(NumberedShardSpec.class, segments.get(1).getShardSpec().getClass());
-    Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
-  }
-
-  @Test
   public void testTransformSpec() throws Exception
   {
     File tmpDir = temporaryFolder.newFolder();
@@ -339,7 +292,7 @@ public class IndexTaskTest
                 )
             ),
             null,
-            createTuningConfigWithMaxRowsPerSegment(2, true, false),
+            createTuningConfigWithMaxRowsPerSegment(2, false),
             false
         ),
         null,
@@ -383,7 +336,7 @@ public class IndexTaskTest
                 Granularities.MINUTE,
                 Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
-            createTuningConfigWithMaxRowsPerSegment(10, false, true),
+            createTuningConfigWithMaxRowsPerSegment(10, true),
             false
         ),
         null,
@@ -420,7 +373,7 @@ public class IndexTaskTest
                 Granularities.HOUR,
                 Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
             ),
-            createTuningConfigWithMaxRowsPerSegment(50, false, true),
+            createTuningConfigWithMaxRowsPerSegment(50, true),
             false
         ),
         null,
@@ -453,7 +406,7 @@ public class IndexTaskTest
             tmpDir,
             null,
             null,
-            createTuningConfigWithNumShards(1, null, false, true),
+            createTuningConfigWithNumShards(1, null, true),
             false
         ),
         null,
@@ -468,7 +421,7 @@ public class IndexTaskTest
 
     Assert.assertEquals("test", segments.get(0).getDataSource());
     Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
-    Assert.assertEquals(NoneShardSpec.class, segments.get(0).getShardSpec().getClass());
+    Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
     Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
   }
 
@@ -491,7 +444,7 @@ public class IndexTaskTest
             tmpDir,
             null,
             null,
-            createTuningConfigWithNumShards(2, ImmutableList.of("dim"), false, true),
+            createTuningConfigWithNumShards(2, ImmutableList.of("dim"), true),
             false
         ),
         null,
@@ -566,7 +519,7 @@ public class IndexTaskTest
             tmpDir,
             null,
             null,
-            createTuningConfigWithMaxRowsPerSegment(2, false, false),
+            createTuningConfigWithMaxRowsPerSegment(2, false),
             true
         ),
         null,
@@ -616,7 +569,7 @@ public class IndexTaskTest
                 Granularities.MINUTE,
                 null
             ),
-            createTuningConfigWithMaxRowsPerSegment(2, false, true),
+            createTuningConfigWithMaxRowsPerSegment(2, true),
             false
         ),
         null,
@@ -631,17 +584,17 @@ public class IndexTaskTest
 
     Assert.assertEquals("test", segments.get(0).getDataSource());
     Assert.assertEquals(Intervals.of("2014-01-01T00/PT1H"), segments.get(0).getInterval());
-    Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class));
+    Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
     Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
 
     Assert.assertEquals("test", segments.get(1).getDataSource());
     Assert.assertEquals(Intervals.of("2014-01-01T01/PT1H"), segments.get(1).getInterval());
-    Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NoneShardSpec.class));
+    Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass());
     Assert.assertEquals(0, segments.get(1).getShardSpec().getPartitionNum());
 
     Assert.assertEquals("test", segments.get(2).getDataSource());
     Assert.assertEquals(Intervals.of("2014-01-01T02/PT1H"), segments.get(2).getInterval());
-    Assert.assertTrue(segments.get(2).getShardSpec().getClass().equals(NoneShardSpec.class));
+    Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(2).getShardSpec().getClass());
     Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum());
   }
 
@@ -679,7 +632,7 @@ public class IndexTaskTest
                 0
             ),
             null,
-            createTuningConfigWithMaxRowsPerSegment(2, false, true),
+            createTuningConfigWithMaxRowsPerSegment(2, true),
             false
         ),
         null,
@@ -731,7 +684,7 @@ public class IndexTaskTest
                 0
             ),
             null,
-            createTuningConfigWithMaxRowsPerSegment(2, false, true),
+            createTuningConfigWithMaxRowsPerSegment(2, true),
             false
         ),
         null,
@@ -778,7 +731,7 @@ public class IndexTaskTest
                 Granularities.MINUTE,
                 null
             ),
-            createTuningConfig(2, 2, null, 2L, null, null, false, false, true),
+            createTuningConfig(2, 2, null, 2L, null, null, false, true),
             false
         ),
         null,
@@ -823,7 +776,7 @@ public class IndexTaskTest
                 true,
                 null
             ),
-            createTuningConfig(3, 2, null, 2L, null, null, false, true, true),
+            createTuningConfig(3, 2, null, 2L, null, null, true, true),
             false
         ),
         null,
@@ -867,7 +820,7 @@ public class IndexTaskTest
                 true,
                 null
             ),
-            createTuningConfig(3, 2, null, 2L, null, null, false, false, true),
+            createTuningConfig(3, 2, null, 2L, null, null, false, true),
             false
         ),
         null,
@@ -940,7 +893,7 @@ public class IndexTaskTest
             0
         ),
         null,
-        createTuningConfig(2, null, null, null, null, null, false, false, false), // ignore parse exception,
+        createTuningConfig(2, null, null, null, null, null, false, false), // ignore parse exception,
         false
     );
 
@@ -993,7 +946,7 @@ public class IndexTaskTest
             0
         ),
         null,
-        createTuningConfig(2, null, null, null, null, null, false, false, true), // report parse exception
+        createTuningConfig(2, null, null, null, null, null, false, true), // report parse exception
         false
     );
 
@@ -1052,7 +1005,6 @@ public class IndexTaskTest
         indexSpec,
         null,
         true,
-        false,
         true,
         false,
         null,
@@ -1177,7 +1129,6 @@ public class IndexTaskTest
         true,
         false,
         false,
-        false,
         null,
         null,
         null,
@@ -1291,7 +1242,6 @@ public class IndexTaskTest
         indexSpec,
         null,
         true,
-        false,
         true,
         false,
         null,
@@ -1424,7 +1374,7 @@ public class IndexTaskTest
             0
         ),
         null,
-        createTuningConfig(2, 1, null, null, null, null, false, true, true), // report parse exception
+        createTuningConfig(2, 1, null, null, null, null, true, true), // report parse exception
         false
     );
 
@@ -1494,7 +1444,7 @@ public class IndexTaskTest
             0
         ),
         null,
-        createTuningConfig(2, null, null, null, null, null, false, false, true), // report parse exception
+        createTuningConfig(2, null, null, null, null, null, false, true), // report parse exception
         false
     );
 
@@ -1701,7 +1651,6 @@ public class IndexTaskTest
 
   private static IndexTuningConfig createTuningConfigWithMaxRowsPerSegment(
       int maxRowsPerSegment,
-      boolean forceExtendableShardSpecs,
       boolean forceGuaranteedRollup
   )
   {
@@ -1712,7 +1661,6 @@ public class IndexTaskTest
         null,
         null,
         null,
-        forceExtendableShardSpecs,
         forceGuaranteedRollup,
         true
     );
@@ -1721,7 +1669,6 @@ public class IndexTaskTest
   private static IndexTuningConfig createTuningConfigWithNumShards(
       int numShards,
       @Nullable List<String> partitionDimensions,
-      boolean forceExtendableShardSpecs,
       boolean forceGuaranteedRollup
   )
   {
@@ -1732,7 +1679,6 @@ public class IndexTaskTest
         null,
         numShards,
         partitionDimensions,
-        forceExtendableShardSpecs,
         forceGuaranteedRollup,
         true
     );
@@ -1745,7 +1691,6 @@ public class IndexTaskTest
       @Nullable Long maxTotalRows,
       @Nullable Integer numShards,
       @Nullable List<String> partitionDimensions,
-      boolean forceExtendableShardSpecs,
       boolean forceGuaranteedRollup,
       boolean reportParseException
   )
@@ -1762,7 +1707,6 @@ public class IndexTaskTest
         indexSpec,
         null,
         true,
-        forceExtendableShardSpecs,
         forceGuaranteedRollup,
         reportParseException,
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MergeTaskBaseTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MergeTaskBaseTest.java
deleted file mode 100644
index ef14ba9..0000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MergeTaskBaseTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.timeline.DataSegment;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-
-public class MergeTaskBaseTest
-{
-  private final DataSegment.Builder segmentBuilder = DataSegment.builder()
-                                                                .dataSource("foo")
-                                                                .version("V1");
-
-  final List<DataSegment> segments = ImmutableList.<DataSegment>builder()
-          .add(segmentBuilder.interval(Intervals.of("2012-01-04/2012-01-06")).build())
-          .add(segmentBuilder.interval(Intervals.of("2012-01-05/2012-01-07")).build())
-          .add(segmentBuilder.interval(Intervals.of("2012-01-03/2012-01-05")).build())
-          .build();
-
-  final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments, null, null)
-  {
-    @Override
-    protected File merge(TaskToolbox toolbox, Map<DataSegment, File> segments, File outDir)
-    {
-      return null;
-    }
-
-    @Override
-    public String getType()
-    {
-      return "test";
-    }
-  };
-
-  @Test
-  public void testDataSource()
-  {
-    Assert.assertEquals("foo", testMergeTaskBase.getDataSource());
-  }
-
-  @Test
-  public void testInterval()
-  {
-    Assert.assertEquals(Intervals.of("2012-01-03/2012-01-07"), testMergeTaskBase.getInterval());
-  }
-
-  @Test
-  public void testID()
-  {
-    final String desiredPrefix =
-        "merge_foo_" +
-        Hashing.sha1().hashString(
-            "2012-01-03T00:00:00.000Z_2012-01-05T00:00:00.000Z_V1_0" +
-            "_2012-01-04T00:00:00.000Z_2012-01-06T00:00:00.000Z_V1_0" +
-            "_2012-01-05T00:00:00.000Z_2012-01-07T00:00:00.000Z_V1_0",
-            StandardCharsets.UTF_8
-        ) +
-        "_";
-    Assert.assertEquals(
-        desiredPrefix,
-        testMergeTaskBase.getId().substring(0, desiredPrefix.length())
-    );
-  }
-}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SameIntervalMergeTaskTest.java
deleted file mode 100644
index 6284503..0000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SameIntervalMergeTaskTest.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.indexing.common.TaskLock;
-import org.apache.druid.indexing.common.TaskLockType;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.actions.LockListAction;
-import org.apache.druid.indexing.common.actions.LockTryAcquireAction;
-import org.apache.druid.indexing.common.actions.SegmentInsertAction;
-import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
-import org.apache.druid.indexing.common.actions.TaskAction;
-import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.IndexMergerV9;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.Segment;
-import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.segment.loading.SegmentLoader;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class SameIntervalMergeTaskTest
-{
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
-  public TaskLock taskLock;
-  private final CountDownLatch isRedayCountDown = new CountDownLatch(1);
-  private final CountDownLatch publishCountDown = new CountDownLatch(1);
-  private final IndexSpec indexSpec;
-  private final ObjectMapper jsonMapper;
-  private IndexIO indexIO;
-
-  public SameIntervalMergeTaskTest()
-  {
-    indexSpec = new IndexSpec();
-    TestUtils testUtils = new TestUtils();
-    jsonMapper = testUtils.getTestObjectMapper();
-    indexIO = testUtils.getTestIndexIO();
-  }
-
-  @Test
-  public void testRun() throws Exception
-  {
-    final List<AggregatorFactory> aggregators = ImmutableList.of(new CountAggregatorFactory("cnt"));
-    final SameIntervalMergeTask task = new SameIntervalMergeTask(
-        null,
-        "foo",
-        Intervals.of("2010-01-01/P1D"),
-        aggregators,
-        true,
-        indexSpec,
-        true,
-        null,
-        null
-    );
-
-    String newVersion = "newVersion";
-    final List<DataSegment> segments = runTask(task, newVersion);
-
-    // the lock is acquired
-    Assert.assertEquals(0, isRedayCountDown.getCount());
-    // the merged segment is published
-    Assert.assertEquals(0, publishCountDown.getCount());
-    // the merged segment is the only element
-    Assert.assertEquals(1, segments.size());
-
-    DataSegment mergeSegment = segments.get(0);
-    Assert.assertEquals("foo", mergeSegment.getDataSource());
-    Assert.assertEquals(newVersion, mergeSegment.getVersion());
-    // the merged segment's interval is within the requested interval
-    Assert.assertTrue(Intervals.of("2010-01-01/P1D").contains(mergeSegment.getInterval()));
-    // the merged segment should be NoneShardSpec
-    Assert.assertTrue(mergeSegment.getShardSpec() instanceof NoneShardSpec);
-  }
-
-  private List<DataSegment> runTask(final SameIntervalMergeTask mergeTask, final String version) throws Exception
-  {
-    boolean isReady = mergeTask.isReady(new TaskActionClient()
-    {
-      @Override
-      public <RetType> RetType submit(TaskAction<RetType> taskAction)
-      {
-        if (taskAction instanceof LockTryAcquireAction) {
-          // the lock of this interval is required
-          Assert.assertEquals(mergeTask.getInterval(), ((LockTryAcquireAction) taskAction).getInterval());
-          isRedayCountDown.countDown();
-          taskLock = new TaskLock(
-              TaskLockType.EXCLUSIVE,
-              mergeTask.getGroupId(),
-              mergeTask.getDataSource(),
-              mergeTask.getInterval(),
-              version,
-              Tasks.DEFAULT_TASK_PRIORITY
-          );
-          return (RetType) taskLock;
-        }
-        return null;
-      }
-    });
-    // ensure LockTryAcquireAction is submitted
-    Assert.assertTrue(isReady);
-    final List<DataSegment> segments = new ArrayList<>();
-
-    mergeTask.run(
-        new TaskToolbox(
-            null,
-            new TaskActionClient()
-            {
-              @Override
-              public <RetType> RetType submit(TaskAction<RetType> taskAction)
-              {
-                if (taskAction instanceof LockListAction) {
-                  Assert.assertNotNull("taskLock should be acquired before list", taskLock);
-                  return (RetType) Collections.singletonList(taskLock);
-                }
-                if (taskAction instanceof SegmentListUsedAction) {
-                  List<DataSegment> segments = ImmutableList.of(
-                      DataSegment.builder()
-                                 .dataSource(mergeTask.getDataSource())
-                                 .interval(Intervals.of("2010-01-01/PT1H"))
-                                 .version("oldVersion")
-                                 .shardSpec(new LinearShardSpec(0))
-                                 .build(),
-                      DataSegment.builder()
-                                 .dataSource(mergeTask.getDataSource())
-                                 .interval(Intervals.of("2010-01-01/PT1H"))
-                                 .version("oldVersion")
-                                 .shardSpec(new LinearShardSpec(0))
-                                 .build(),
-                      DataSegment.builder()
-                                 .dataSource(mergeTask.getDataSource())
-                                 .interval(Intervals.of("2010-01-01/PT2H"))
-                                 .version("oldVersion")
-                                 .shardSpec(new LinearShardSpec(0))
-                                 .build()
-                  );
-                  return (RetType) segments;
-                }
-                if (taskAction instanceof SegmentInsertAction) {
-                  publishCountDown.countDown();
-                  return null;
-                }
-
-                return null;
-              }
-            },
-            new NoopServiceEmitter(), new DataSegmentPusher()
-            {
-              @Deprecated
-              @Override
-              public String getPathForHadoop(String dataSource)
-              {
-                return getPathForHadoop();
-              }
-
-              @Override
-              public String getPathForHadoop()
-              {
-                return null;
-              }
-
-              @Override
-              public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
-              {
-                // the merged segment is pushed to storage
-                segments.add(segment);
-                return segment;
-              }
-              @Override
-              public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
-              {
-                return null;
-              }
-
-            },
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            new SegmentLoader()
-            {
-              @Override
-              public boolean isSegmentLoaded(DataSegment segment)
-              {
-                return false;
-              }
-
-              @Override
-              public Segment getSegment(DataSegment segment)
-              {
-                return null;
-              }
-
-              @Override
-              public File getSegmentFiles(DataSegment segment)
-              {
-                // dummy file to represent the downloaded segment's dir
-                return new File("" + segment.getShardSpec().getPartitionNum());
-              }
-
-              @Override
-              public void cleanup(DataSegment segment)
-              {
-              }
-            },
-            jsonMapper,
-            temporaryFolder.newFolder(),
-            indexIO,
-            null,
-            null,
-            null,
-            EasyMock.createMock(IndexMergerV9.class),
-            null,
-            null,
-            null,
-            null,
-            new NoopTestTaskFileWriter()
-        )
-    );
-
-    return segments;
-  }
-}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index fc1b794..1d15b68 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import org.apache.druid.client.indexing.ClientAppendQuery;
 import org.apache.druid.client.indexing.ClientKillQuery;
-import org.apache.druid.client.indexing.ClientMergeQuery;
 import org.apache.druid.guice.FirehoseModule;
 import org.apache.druid.indexer.HadoopIOConfig;
 import org.apache.druid.indexer.HadoopIngestionSpec;
@@ -37,7 +35,6 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -47,7 +44,6 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
 import org.apache.druid.server.security.AuthTestUtils;
-import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.hamcrest.CoreMatchers;
 import org.joda.time.Period;
@@ -57,7 +53,6 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.File;
-import java.util.List;
 
 public class TaskSerdeTest
 {
@@ -98,7 +93,6 @@ public class TaskSerdeTest
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertFalse(tuningConfig.isForceExtendableShardSpecs());
     Assert.assertFalse(tuningConfig.isReportParseExceptions());
     Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec());
     Assert.assertEquals(new Period(Integer.MAX_VALUE), tuningConfig.getIntermediatePersistPeriod());
@@ -210,7 +204,6 @@ public class TaskSerdeTest
                 indexSpec,
                 3,
                 true,
-                true,
                 false,
                 null,
                 null,
@@ -258,10 +251,6 @@ public class TaskSerdeTest
     Assert.assertEquals(taskTuningConfig.getMaxRowsInMemory(), task2TuningConfig.getMaxRowsInMemory());
     Assert.assertEquals(taskTuningConfig.getNumShards(), task2TuningConfig.getNumShards());
     Assert.assertEquals(taskTuningConfig.getMaxRowsPerSegment(), task2TuningConfig.getMaxRowsPerSegment());
-    Assert.assertEquals(
-        taskTuningConfig.isForceExtendableShardSpecs(),
-        task2TuningConfig.isForceExtendableShardSpecs()
-    );
     Assert.assertEquals(taskTuningConfig.isReportParseExceptions(), task2TuningConfig.isReportParseExceptions());
   }
 
@@ -297,7 +286,6 @@ public class TaskSerdeTest
                 indexSpec,
                 3,
                 true,
-                true,
                 false,
                 null,
                 null,
@@ -337,99 +325,6 @@ public class TaskSerdeTest
   }
 
   @Test
-  public void testMergeTaskSerde() throws Exception
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder()
-                   .dataSource("foo")
-                   .interval(Intervals.of("2010-01-01/P1D"))
-                   .version("1234")
-                   .build()
-    );
-    final List<AggregatorFactory> aggregators = ImmutableList.of(new CountAggregatorFactory("cnt"));
-    final MergeTask task = new MergeTask(
-        null,
-        "foo",
-        segments,
-        aggregators,
-        true,
-        indexSpec,
-        true,
-        null,
-        null
-    );
-
-    final String json = jsonMapper.writeValueAsString(task);
-
-    Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
-    final MergeTask task2 = (MergeTask) jsonMapper.readValue(json, Task.class);
-
-    Assert.assertEquals("foo", task.getDataSource());
-    Assert.assertEquals(Intervals.of("2010-01-01/P1D"), task.getInterval());
-
-    Assert.assertEquals(task.getId(), task2.getId());
-    Assert.assertEquals(task.getGroupId(), task2.getGroupId());
-    Assert.assertEquals(task.getDataSource(), task2.getDataSource());
-    Assert.assertEquals(task.getInterval(), task2.getInterval());
-    Assert.assertEquals(task.getSegments(), task2.getSegments());
-    Assert.assertEquals(
-        task.getAggregators().get(0).getName(),
-        task2.getAggregators().get(0).getName()
-    );
-
-    final MergeTask task3 = (MergeTask) jsonMapper.readValue(
-        jsonMapper.writeValueAsString(
-            new ClientMergeQuery(
-                "foo",
-                segments,
-                aggregators
-            )
-        ), Task.class
-    );
-
-    Assert.assertEquals("foo", task3.getDataSource());
-    Assert.assertEquals(Intervals.of("2010-01-01/P1D"), task3.getInterval());
-    Assert.assertEquals(segments, task3.getSegments());
-    Assert.assertEquals(aggregators, task3.getAggregators());
-  }
-
-  @Test
-  public void testSameIntervalMergeTaskSerde() throws Exception
-  {
-    final List<AggregatorFactory> aggregators = ImmutableList.of(new CountAggregatorFactory("cnt"));
-    final SameIntervalMergeTask task = new SameIntervalMergeTask(
-        null,
-        "foo",
-        Intervals.of("2010-01-01/P1D"),
-        aggregators,
-        true,
-        indexSpec,
-        true,
-        null,
-        null
-    );
-
-    final String json = jsonMapper.writeValueAsString(task);
-
-    Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
-    final SameIntervalMergeTask task2 = (SameIntervalMergeTask) jsonMapper.readValue(json, Task.class);
-
-    Assert.assertEquals("foo", task.getDataSource());
-    Assert.assertEquals(Intervals.of("2010-01-01/P1D"), task.getInterval());
-
-    Assert.assertEquals(task.getId(), task2.getId());
-    Assert.assertEquals(task.getGroupId(), task2.getGroupId());
-    Assert.assertEquals(task.getDataSource(), task2.getDataSource());
-    Assert.assertEquals(task.getInterval(), task2.getInterval());
-    Assert.assertEquals(task.getRollup(), task2.getRollup());
-    Assert.assertEquals(task.getIndexSpec(), task2.getIndexSpec());
-    Assert.assertEquals(
-        task.getAggregators().get(0).getName(),
-        task2.getAggregators().get(0).getName()
-    );
-  }
-
-  @Test
   public void testKillTaskSerde() throws Exception
   {
     final KillTask task = new KillTask(
@@ -546,63 +441,6 @@ public class TaskSerdeTest
   }
 
   @Test
-  public void testAppendTaskSerde() throws Exception
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder()
-                   .dataSource("foo")
-                   .interval(Intervals.of("2010-01-01/P1D"))
-                   .version("1234")
-                   .build(),
-        DataSegment.builder()
-                   .dataSource("foo")
-                   .interval(Intervals.of("2010-01-02/P1D"))
-                   .version("5678")
-                   .build()
-    );
-    final AppendTask task = new AppendTask(
-        null,
-        "foo",
-        segments,
-        ImmutableList.of(
-            new CountAggregatorFactory("cnt")
-        ),
-        indexSpec,
-        true,
-        null,
-        null
-    );
-
-    final String json = jsonMapper.writeValueAsString(task);
-
-    Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
-    final AppendTask task2 = (AppendTask) jsonMapper.readValue(json, Task.class);
-
-    Assert.assertEquals("foo", task.getDataSource());
-    Assert.assertEquals(Intervals.of("2010-01-01/P2D"), task.getInterval());
-
-    Assert.assertEquals(task.getId(), task2.getId());
-    Assert.assertEquals(task.getGroupId(), task2.getGroupId());
-    Assert.assertEquals(task.getDataSource(), task2.getDataSource());
-    Assert.assertEquals(task.getInterval(), task2.getInterval());
-    Assert.assertEquals(task.getSegments(), task2.getSegments());
-
-    final AppendTask task3 = (AppendTask) jsonMapper.readValue(
-        jsonMapper.writeValueAsString(
-            new ClientAppendQuery(
-                "foo",
-                segments
-            )
-        ), Task.class
-    );
-
-    Assert.assertEquals("foo", task3.getDataSource());
-    Assert.assertEquals(Intervals.of("2010-01-01/P2D"), task3.getInterval());
-    Assert.assertEquals(task3.getSegments(), segments);
-    Assert.assertEquals(task.getAggregators(), task2.getAggregators());
-  }
-
-  @Test
   public void testArchiveTaskSerde() throws Exception
   {
     final ArchiveTask task = new ArchiveTask(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index c7c8d0b..6f7c3b9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -188,7 +188,6 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
             null,
             null,
             null,
-            null,
             numTotalSubTasks,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 04aa5a7..704c5fa 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -430,7 +430,6 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
             null,
             null,
             null,
-            null,
             NUM_SUB_TASKS,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index 0fa747f..dd90963 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -138,7 +138,6 @@ public class ParallelIndexSupervisorTaskSerdeTest
             null,
             null,
             null,
-            null,
             2,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index efc1fc4..cfd02ac 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -250,7 +250,6 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
             null,
             null,
             null,
-            null,
             1,
             null,
             null,
@@ -291,7 +290,6 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
             null,
             null,
             null,
-            null,
             2,
             null,
             null,
@@ -311,6 +309,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
   )
   {
     // set up ingestion spec
+    //noinspection unchecked
     final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec(
         new DataSchema(
             "dataSource",
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 4e1c66d..6f2b992 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -692,7 +692,6 @@ public class TaskLifecycleTest
                 indexSpec,
                 3,
                 true,
-                true,
                 false,
                 null,
                 null,
@@ -774,7 +773,6 @@ public class TaskLifecycleTest
                 indexSpec,
                 3,
                 true,
-                true,
                 false,
                 null,
                 null,
@@ -1170,7 +1168,6 @@ public class TaskLifecycleTest
                 null,
                 null,
                 null,
-                null,
                 null
             )
         ),
diff --git a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java
index 85af41c..235d467 100644
--- a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java
+++ b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java
@@ -47,7 +47,6 @@ public class DefaultObjectMapper extends ObjectMapper
     registerModule(new GuavaModule());
     registerModule(new GranularityModule());
     registerModule(new AggregatorsModule());
-    registerModule(new SegmentsModule());
     registerModule(new StringComparatorModule());
     registerModule(new SegmentizerModule());
 
diff --git a/processing/src/main/java/org/apache/druid/jackson/SegmentsModule.java b/processing/src/main/java/org/apache/druid/jackson/SegmentsModule.java
deleted file mode 100644
index 0f3cd21..0000000
--- a/processing/src/main/java/org/apache/druid/jackson/SegmentsModule.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.jackson;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.apache.druid.timeline.partition.ShardSpec;
-
-/**
- */
-public class SegmentsModule extends SimpleModule
-{
-  public SegmentsModule()
-  {
-    super("SegmentsModule");
-
-    setMixInAnnotation(ShardSpec.class, ShardSpecMixin.class);
-  }
-
-  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-  @JsonSubTypes({
-      @JsonSubTypes.Type(name = "none", value = NoneShardSpec.class),
-  })
-  public interface ShardSpecMixin
-  {}
-}
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
index 2826b9d..ac57618 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
@@ -220,14 +220,6 @@ public interface IndexMerger
   // Faster than IndexMaker
   File convert(File inDir, File outDir, IndexSpec indexSpec) throws IOException;
 
-  File convert(
-      File inDir,
-      File outDir,
-      IndexSpec indexSpec,
-      ProgressIndicator progress,
-      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
-  ) throws IOException;
-
   File append(
       List<IndexableAdapter> indexes,
       AggregatorFactory[] aggregators,
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 135f8cb..950eb5f 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -932,8 +932,7 @@ public class IndexMergerV9 implements IndexMerger
     return convert(inDir, outDir, indexSpec, new BaseProgressIndicator(), defaultSegmentWriteOutMediumFactory);
   }
 
-  @Override
-  public File convert(
+  private File convert(
       final File inDir,
       final File outDir,
       final IndexSpec indexSpec,
diff --git a/processing/src/main/java/org/apache/druid/segment/ProgressIndicator.java b/processing/src/main/java/org/apache/druid/segment/ProgressIndicator.java
index 3d72360..caacfa7 100644
--- a/processing/src/main/java/org/apache/druid/segment/ProgressIndicator.java
+++ b/processing/src/main/java/org/apache/druid/segment/ProgressIndicator.java
@@ -29,7 +29,7 @@ public interface ProgressIndicator
 
   void stop();
 
-  void startSection(String section);
+  void startSection(@SuppressWarnings("unused") String section);
 
-  void stopSection(String section);
+  void stopSection(@SuppressWarnings("unused") String section);
 }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java
deleted file mode 100644
index c5497b8..0000000
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client.indexing;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.timeline.DataSegment;
-
-import java.util.List;
-
-/**
- */
-public class ClientAppendQuery implements ClientQuery
-{
-  private final String dataSource;
-  private final List<DataSegment> segments;
-
-  @JsonCreator
-  public ClientAppendQuery(
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("segments") List<DataSegment> segments
-  )
-  {
-    this.dataSource = dataSource;
-    this.segments = segments;
-  }
-
-  @JsonProperty
-  @Override
-  public String getType()
-  {
-    return "append";
-  }
-
-  @JsonProperty
-  @Override
-  public String getDataSource()
-  {
-    return dataSource;
-  }
-
-  @JsonProperty
-  public List<DataSegment> getSegments()
-  {
-    return segments;
-  }
-
-  @Override
-  public String toString()
-  {
-    return "ClientAppendQuery{" +
-           "dataSource='" + dataSource + '\'' +
-           ", segments=" + segments +
-           '}';
-  }
-}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java
deleted file mode 100644
index b61c6e6..0000000
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client.indexing;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.timeline.DataSegment;
-
-import java.util.List;
-
-/**
- */
-public class ClientMergeQuery implements ClientQuery
-{
-  private final String dataSource;
-  private final List<DataSegment> segments;
-  private final List<AggregatorFactory> aggregators;
-
-  @JsonCreator
-  public ClientMergeQuery(
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("segments") List<DataSegment> segments,
-      @JsonProperty("aggregations") List<AggregatorFactory> aggregators
-  )
-  {
-    this.dataSource = dataSource;
-    this.segments = segments;
-    this.aggregators = aggregators;
-
-  }
-
-  @JsonProperty
-  @Override
-  public String getType()
-  {
-    return "merge";
-  }
-
-  @JsonProperty
-  @Override
-  public String getDataSource()
-  {
-    return dataSource;
-  }
-
-  @JsonProperty
-  public List<DataSegment> getSegments()
-  {
-    return segments;
-  }
-
-  @JsonProperty("aggregations")
-  public List<AggregatorFactory> getAggregators()
-  {
-    return aggregators;
-  }
-
-  @Override
-  public String toString()
-  {
-    return "ClientMergeQuery{" +
-           "dataSource='" + dataSource + '\'' +
-           ", segments=" + segments +
-           ", aggregators=" + aggregators +
-           '}';
-  }
-}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
index 6dbd631..aaa8b5c 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
@@ -28,8 +28,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes(value = {
-    @Type(name = "append", value = ClientAppendQuery.class),
-    @Type(name = "merge", value = ClientMergeQuery.class),
     @Type(name = "kill", value = ClientKillQuery.class),
     @Type(name = "compact", value = ClientCompactQuery.class)
 })
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 9baa2d1..ac4349b 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -26,7 +26,6 @@ import com.google.inject.Inject;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
@@ -42,7 +41,6 @@ import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -62,25 +60,6 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   }
 
   @Override
-  public void mergeSegments(List<DataSegment> segments)
-  {
-    final Iterator<DataSegment> segmentsIter = segments.iterator();
-    if (!segmentsIter.hasNext()) {
-      return;
-    }
-
-    final String dataSource = segmentsIter.next().getDataSource();
-    while (segmentsIter.hasNext()) {
-      DataSegment next = segmentsIter.next();
-      if (!dataSource.equals(next.getDataSource())) {
-        throw new IAE("Cannot merge segments of different dataSources[%s] and [%s]", dataSource, next.getDataSource());
-      }
-    }
-
-    runTask(new ClientAppendQuery(dataSource, segments));
-  }
-
-  @Override
   public void killSegments(String dataSource, Interval interval)
   {
     runTask(new ClientKillQuery(dataSource, interval));
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 905b810..df2baf6 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -34,8 +34,6 @@ public interface IndexingServiceClient
 
   int killPendingSegments(String dataSource, DateTime end);
 
-  void mergeSegments(List<DataSegment> segments);
-
   String compactSegments(
       List<DataSegment> segments,
       boolean keepSegmentGranularity,
diff --git a/server/src/main/java/org/apache/druid/guice/ServerModule.java b/server/src/main/java/org/apache/druid/guice/ServerModule.java
index 07068ed..503e71a 100644
--- a/server/src/main/java/org/apache/druid/guice/ServerModule.java
+++ b/server/src/main/java/org/apache/druid/guice/ServerModule.java
@@ -19,29 +19,19 @@
 
 package org.apache.druid.guice;
 
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.inject.Binder;
+import com.google.inject.Module;
 import com.google.inject.Provides;
 import org.apache.druid.guice.annotations.Self;
-import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.initialization.ZkPathsConfig;
-import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.apache.druid.timeline.partition.NumberedShardSpec;
-import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
-
-import java.util.Collections;
-import java.util.List;
 
 /**
  */
-public class ServerModule implements DruidModule
+public class ServerModule implements Module
 {
   public static final String ZK_PATHS_PROPERTY_BASE = "druid.zk.paths";
 
@@ -57,18 +47,4 @@ public class ServerModule implements DruidModule
   {
     return ScheduledExecutors.createFactory(lifecycle);
   }
-
-  @Override
-  public List<? extends Module> getJacksonModules()
-  {
-    return Collections.singletonList(
-        new SimpleModule()
-            .registerSubtypes(
-                new NamedType(SingleDimensionShardSpec.class, "single"),
-                new NamedType(LinearShardSpec.class, "linear"),
-                new NamedType(NumberedShardSpec.class, "numbered"),
-                new NamedType(HashBasedNumberedShardSpec.class, "hashed")
-            )
-    );
-  }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index 42c8af9..43be6e0 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -31,7 +31,7 @@ import org.apache.druid.segment.realtime.plumber.RejectionPolicyFactory;
 import org.apache.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
 import org.apache.druid.segment.realtime.plumber.VersioningPolicy;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.joda.time.Period;
 
@@ -48,7 +48,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
   private static final VersioningPolicy defaultVersioningPolicy = new IntervalStartVersioningPolicy();
   private static final RejectionPolicyFactory defaultRejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
   private static final int defaultMaxPendingPersists = 0;
-  private static final ShardSpec defaultShardSpec = NoneShardSpec.instance();
+  private static final ShardSpec defaultShardSpec = new NumberedShardSpec(0, 1);
   private static final IndexSpec defaultIndexSpec = new IndexSpec();
   private static final Boolean defaultReportParseExceptions = Boolean.FALSE;
   private static final long defaultHandoffConditionTimeout = 0;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java
deleted file mode 100644
index c554d40..0000000
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.helper;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multiset;
-import com.google.common.collect.Ordering;
-import com.google.inject.Inject;
-import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.guava.FunctionalIterable;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
-import org.apache.druid.server.coordinator.CoordinatorStats;
-import org.apache.druid.server.coordinator.DatasourceWhitelist;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.TimelineObjectHolder;
-import org.apache.druid.timeline.VersionedIntervalTimeline;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.apache.druid.timeline.partition.PartitionChunk;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- */
-public class DruidCoordinatorSegmentMerger implements DruidCoordinatorHelper
-{
-  private static final Logger log = new Logger(DruidCoordinatorSegmentMerger.class);
-
-  private final IndexingServiceClient indexingServiceClient;
-  private final AtomicReference<DatasourceWhitelist> whiteListRef;
-
-  @Inject
-  public DruidCoordinatorSegmentMerger(
-      IndexingServiceClient indexingServiceClient,
-      JacksonConfigManager configManager
-  )
-  {
-    this.indexingServiceClient = indexingServiceClient;
-    this.whiteListRef = configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class);
-  }
-
-  @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
-  {
-    DatasourceWhitelist whitelist = whiteListRef.get();
-
-    CoordinatorStats stats = new CoordinatorStats();
-    Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = new HashMap<>();
-
-    // Find serviced segments by using a timeline
-    for (DataSegment dataSegment : params.getAvailableSegments()) {
-      if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) {
-        VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(dataSegment.getDataSource());
-        if (timeline == null) {
-          timeline = new VersionedIntervalTimeline<String, DataSegment>(Ordering.natural());
-          dataSources.put(dataSegment.getDataSource(), timeline);
-        }
-        timeline.add(
-            dataSegment.getInterval(),
-            dataSegment.getVersion(),
-            dataSegment.getShardSpec().createChunk(dataSegment)
-        );
-      }
-    }
-
-    // Find segments to merge
-    for (final Map.Entry<String, VersionedIntervalTimeline<String, DataSegment>> entry : dataSources.entrySet()) {
-      // Get serviced segments from the timeline
-      VersionedIntervalTimeline<String, DataSegment> timeline = entry.getValue();
-      List<TimelineObjectHolder<String, DataSegment>> timelineObjects =
-          timeline.lookup(new Interval(DateTimes.EPOCH, DateTimes.of("3000-01-01")));
-
-      // Accumulate timelineObjects greedily until we reach our limits, then backtrack to the maximum complete set
-      SegmentsToMerge segmentsToMerge = new SegmentsToMerge();
-
-      for (int i = 0; i < timelineObjects.size(); i++) {
-        if (!segmentsToMerge.add(timelineObjects.get(i))
-            || segmentsToMerge.getByteCount() > params.getCoordinatorDynamicConfig().getMergeBytesLimit()
-            || segmentsToMerge.getSegmentCount() >= params.getCoordinatorDynamicConfig().getMergeSegmentsLimit()) {
-          i -= segmentsToMerge.backtrack(params.getCoordinatorDynamicConfig().getMergeBytesLimit());
-
-          if (segmentsToMerge.getSegmentCount() > 1) {
-            stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey()));
-          }
-
-          if (segmentsToMerge.getSegmentCount() == 0) {
-            // Backtracked all the way to zero. Increment by one so we continue to make progress.
-            i++;
-          }
-
-          segmentsToMerge = new SegmentsToMerge();
-        }
-      }
-
-      // Finish any timelineObjects to merge that may have not hit threshold
-      segmentsToMerge.backtrack(params.getCoordinatorDynamicConfig().getMergeBytesLimit());
-      if (segmentsToMerge.getSegmentCount() > 1) {
-        stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey()));
-      }
-    }
-
-    log.info("Issued merge requests for %s segments", stats.getGlobalStat("mergedCount"));
-
-    params.getEmitter().emit(
-        new ServiceMetricEvent.Builder().build(
-            "coordinator/merge/count", stats.getGlobalStat("mergedCount")
-        )
-    );
-
-    return params.buildFromExisting()
-                 .withCoordinatorStats(stats)
-                 .build();
-  }
-
-  /**
-   * Issue merge request for some list of segments.
-   *
-   * @return number of segments merged
-   */
-  private int mergeSegments(SegmentsToMerge segmentsToMerge, String dataSource)
-  {
-    final List<DataSegment> segments = segmentsToMerge.getSegments();
-    final List<SegmentId> segmentNames = Lists.transform(segments, DataSegment::getId);
-
-    log.info("[%s] Found %d segments to merge %s", dataSource, segments.size(), segmentNames);
-
-    try {
-      indexingServiceClient.mergeSegments(segments);
-    }
-    catch (Exception e) {
-      log.error(e, "[%s] Merging error for segments %s", dataSource, segmentNames);
-    }
-
-    return segments.size();
-  }
-
-  private static class SegmentsToMerge
-  {
-    // set of already-included segments (to help us keep track of bytes accumulated)
-    private final Multiset<DataSegment> segments;
-
-    // (timeline object, union interval of underlying segments up to this point in the list)
-    private final List<Pair<TimelineObjectHolder<String, DataSegment>, Interval>> timelineObjects;
-
-    private long byteCount;
-
-    private SegmentsToMerge()
-    {
-      this.timelineObjects = new ArrayList<>();
-      this.segments = HashMultiset.create();
-      this.byteCount = 0;
-    }
-
-    public List<DataSegment> getSegments()
-    {
-      return ImmutableSet.copyOf(
-          FunctionalIterable.create(timelineObjects).transformCat(
-              new Function<Pair<TimelineObjectHolder<String, DataSegment>, Interval>, Iterable<DataSegment>>()
-              {
-                @Override
-                public Iterable<DataSegment> apply(Pair<TimelineObjectHolder<String, DataSegment>, Interval> input)
-                {
-                  return Iterables.transform(
-                      input.lhs.getObject(),
-                      new Function<PartitionChunk<DataSegment>, DataSegment>()
-                      {
-                        @Override
-                        public DataSegment apply(PartitionChunk<DataSegment> input)
-                        {
-                          return input.getObject();
-                        }
-                      }
-                  );
-                }
-              }
-          )
-      ).asList();
-    }
-
-    public boolean add(TimelineObjectHolder<String, DataSegment> timelineObject)
-    {
-      final Interval timelineObjectInterval = timelineObject.getInterval();
-
-      if (timelineObjects.size() > 0) {
-        Preconditions.checkArgument(
-            timelineObjectInterval.getStart().getMillis() >=
-            timelineObjects.get(timelineObjects.size() - 1).lhs.getInterval().getEnd().getMillis(),
-            "timeline objects must be provided in order"
-        );
-      }
-
-      PartitionChunk<DataSegment> firstChunk = Iterables.getFirst(timelineObject.getObject(), null);
-      if (firstChunk == null) {
-        throw new ISE("Unable to find an underlying interval");
-      }
-      Interval underlyingInterval = firstChunk.getObject().getInterval();
-
-      for (final PartitionChunk<DataSegment> segment : timelineObject.getObject()) {
-        if (!(segment.getObject().getShardSpec() instanceof NoneShardSpec)) {
-          return false;
-        }
-
-        segments.add(segment.getObject());
-        if (segments.count(segment.getObject()) == 1) {
-          byteCount += segment.getObject().getSize();
-        }
-      }
-
-      // Compute new underlying merged interval
-      final Interval mergedUnderlyingInterval = getMergedUnderlyingInterval();
-      if (mergedUnderlyingInterval == null) {
-        timelineObjects.add(Pair.of(timelineObject, underlyingInterval));
-      } else {
-        final DateTime start = underlyingInterval.getStart().isBefore(mergedUnderlyingInterval.getStart())
-                               ? underlyingInterval.getStart()
-                               : mergedUnderlyingInterval.getStart();
-
-        final DateTime end = underlyingInterval.getEnd().isAfter(mergedUnderlyingInterval.getEnd())
-                             ? underlyingInterval.getEnd()
-                             : mergedUnderlyingInterval.getEnd();
-
-        timelineObjects.add(Pair.of(timelineObject, new Interval(start, end)));
-      }
-
-      return true;
-    }
-
-    public Interval getMergedTimelineInterval()
-    {
-      if (timelineObjects.isEmpty()) {
-        return null;
-      } else {
-        return new Interval(
-            timelineObjects.get(0).lhs.getInterval().getStart(),
-            timelineObjects.get(timelineObjects.size() - 1).lhs.getInterval().getEnd()
-        );
-      }
-    }
-
-    public Interval getMergedUnderlyingInterval()
-    {
-      if (timelineObjects.isEmpty()) {
-        return null;
-      } else {
-        return timelineObjects.get(timelineObjects.size() - 1).rhs;
-      }
-    }
-
-    public long getByteCount()
-    {
-      return byteCount;
-    }
-
-    public int getSegmentCount()
-    {
-      return timelineObjects.size();
-    }
-
-    /**
-     * Does this set of segments fully cover union(all segment intervals)?
-     *
-     * @return true if this set is complete
-     */
-    public boolean isComplete()
-    {
-      return timelineObjects.size() == 0 || getMergedTimelineInterval().equals(getMergedUnderlyingInterval());
-    }
-
-    /**
-     * Remove timelineObjects from this holder until we have a complete set with total size <= maxSize.
-     *
-     * @return number of timeline object holders removed
-     */
-    public int backtrack(long maxSize)
-    {
-      Preconditions.checkArgument(maxSize >= 0, "maxSize >= 0");
-
-      int removed = 0;
-      while (!isComplete() || byteCount > maxSize) {
-        removed++;
-
-        final TimelineObjectHolder<String, DataSegment> removedHolder = timelineObjects.remove(
-            timelineObjects.size()
-            - 1
-        ).lhs;
-        for (final PartitionChunk<DataSegment> segment : removedHolder.getObject()) {
-          segments.remove(segment.getObject());
-          if (segments.count(segment.getObject()) == 0) {
-            byteCount -= segment.getObject().getSize();
-          }
-        }
-      }
-
-      return removed;
-    }
-  }
-}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientAppendQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientAppendQueryTest.java
deleted file mode 100644
index 1ff1772..0000000
--- a/server/src/test/java/org/apache/druid/client/indexing/ClientAppendQueryTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client.indexing;
-
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-
-public class ClientAppendQueryTest
-{
-  private ClientAppendQuery clientAppendQuery;
-  private static final String DATA_SOURCE = "data_source";
-  private final DateTime start = DateTimes.nowUtc();
-  private List<DataSegment> segments = Collections.singletonList(
-      new DataSegment(DATA_SOURCE, new Interval(start, start.plus(1)), start.toString(), null, null, null, null, 0, 0)
-  );
-
-  @Before
-  public void setUp()
-  {
-    clientAppendQuery = new ClientAppendQuery(DATA_SOURCE, segments);
-  }
-
-  @Test
-  public void testGetType()
-  {
-    Assert.assertEquals("append", clientAppendQuery.getType());
-  }
-
-  @Test
-  public void testGetDataSource()
-  {
-    Assert.assertEquals(DATA_SOURCE, clientAppendQuery.getDataSource());
-  }
-
-  @Test
-  public void testGetSegments()
-  {
-    Assert.assertEquals(segments, clientAppendQuery.getSegments());
-  }
-
-  @Test
-  public void testToString()
-  {
-    Assert.assertTrue(clientAppendQuery.toString().contains(DATA_SOURCE));
-    Assert.assertTrue(clientAppendQuery.toString().contains(segments.toString()));
-  }
-}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientMergeQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientMergeQueryTest.java
deleted file mode 100644
index 7983f5e..0000000
--- a/server/src/test/java/org/apache/druid/client/indexing/ClientMergeQueryTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client.indexing;
-
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class ClientMergeQueryTest
-{
-  private static final String DATA_SOURCE = "data_source";
-  public static final DateTime START = DateTimes.nowUtc();
-  private static final Interval INTERVAL = new Interval(START, START.plus(1));
-  private static final DataSegment DATA_SEGMENT =
-      new DataSegment(DATA_SOURCE, INTERVAL, START.toString(), null, null, null, null, 0, 0);
-  private static final List<DataSegment> SEGMENT_LIST = Collections.singletonList(DATA_SEGMENT);
-  private static final List<AggregatorFactory> AGGREGATOR_LIST = new ArrayList<>();
-  private static final ClientMergeQuery CLIENT_MERGE_QUERY =
-      new ClientMergeQuery(DATA_SOURCE, SEGMENT_LIST, AGGREGATOR_LIST);
-
-  @Test
-  public void testGetType()
-  {
-    Assert.assertEquals("merge", CLIENT_MERGE_QUERY.getType());
-  }
-
-  @Test
-  public void testGetDataSource()
-  {
-    Assert.assertEquals(DATA_SOURCE, CLIENT_MERGE_QUERY.getDataSource());
-  }
-
-  @Test
-  public void testGetSegments()
-  {
-    Assert.assertEquals(SEGMENT_LIST, CLIENT_MERGE_QUERY.getSegments());
-  }
-
-  @Test
-  public void testGetAggregators()
-  {
-    Assert.assertEquals(AGGREGATOR_LIST, CLIENT_MERGE_QUERY.getAggregators());
-  }
-
-  @Test
-  public void testToString()
-  {
-    Assert.assertTrue(CLIENT_MERGE_QUERY.toString().contains(DATA_SOURCE));
-    Assert.assertTrue(CLIENT_MERGE_QUERY.toString().contains(SEGMENT_LIST.toString()));
-    Assert.assertTrue(CLIENT_MERGE_QUERY.toString().contains(AGGREGATOR_LIST.toString()));
-  }
-}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 91e1a8a..78a9164 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -43,12 +43,6 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
   }
 
   @Override
-  public void mergeSegments(List<DataSegment> segments)
-  {
-    
-  }
-
-  @Override
   public String compactSegments(
       List<DataSegment> segments,
       boolean keepSegmentGranularity,
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
index e9023e8..cf57072 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
@@ -22,7 +22,7 @@ package org.apache.druid.segment.indexing;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.TestHelper;
-import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
@@ -88,7 +88,7 @@ public class RealtimeTuningConfigTest
     Assert.assertEquals(0, config.getAlertTimeout());
     Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
     Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
-    Assert.assertEquals(NoneShardSpec.instance(), config.getShardSpec());
+    Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec());
     Assert.assertEquals(0, config.getMaxPendingPersists());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(0, config.getMergeThreadPriority());
@@ -130,7 +130,7 @@ public class RealtimeTuningConfigTest
     Assert.assertEquals(70, config.getAlertTimeout());
     Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
     Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
-    Assert.assertEquals(NoneShardSpec.instance(), config.getShardSpec());
+    Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec());
     Assert.assertEquals(100, config.getMaxPendingPersists());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMergeThreadPriority());
diff --git a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java b/server/src/test/java/org/apache/druid/server/ServerTestHelper.java
index 87c6ae0..7713c7a 100644
--- a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java
+++ b/server/src/test/java/org/apache/druid/server/ServerTestHelper.java
@@ -20,23 +20,15 @@
 package org.apache.druid.server;
 
 import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.guice.ServerModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.timeline.DataSegment;
 
-import java.util.List;
-
 public class ServerTestHelper
 {
   public static final ObjectMapper MAPPER = new DefaultObjectMapper();
 
   static {
-    final List<? extends Module> list = new ServerModule().getJacksonModules();
-    for (Module module : list) {
-      MAPPER.registerModule(module);
-    }
     MAPPER.setInjectableValues(
         new InjectableValues.Std()
             .addValue(ObjectMapper.class.getName(), MAPPER)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index 14dc152..e203562 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -54,7 +54,6 @@ public class DruidCoordinatorConfigTest
     props.setProperty("druid.coordinator.startDelay", "PT1s");
     props.setProperty("druid.coordinator.period", "PT1s");
     props.setProperty("druid.coordinator.period.indexingPeriod", "PT1s");
-    props.setProperty("druid.coordinator.merge.on", "true");
     props.setProperty("druid.coordinator.kill.on", "true");
     props.setProperty("druid.coordinator.kill.period", "PT1s");
     props.setProperty("druid.coordinator.kill.durationToRetain", "PT1s");
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java
deleted file mode 100644
index 2f71041..0000000
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.client.indexing.NoopIndexingServiceClient;
-import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class DruidCoordinatorSegmentMergerTest
-{
-  private static final long mergeBytesLimit = 100;
-  private static final int mergeSegmentsLimit = 8;
-
-  @Test
-  public void testNoMerges()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("2").size(80).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(), merge(segments)
-    );
-  }
-
-  @Test
-  public void testMergeAtStart()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(20).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("2").size(20).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("2").size(90).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(0), segments.get(1))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testMergeAtEnd()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("2").size(20).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(2), segments.get(3))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testMergeInMiddle()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("2").size(10).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("2").size(20).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(1), segments.get(2))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testMergeNoncontiguous()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(10).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("2").size(10).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("2").size(10).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testMergeSeriesByteLimited()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(40).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P1D")).version("2").size(40).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("2").size(40).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("2").size(40).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("2").size(40).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(40).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(0), segments.get(1)),
-            ImmutableList.of(segments.get(2), segments.get(3)),
-            ImmutableList.of(segments.get(4), segments.get(5))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testMergeSeriesSegmentLimited()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-07/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-08/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-09/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-10/P1D")).version("2").size(1).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(
-                segments.get(0),
-                segments.get(1),
-                segments.get(2),
-                segments.get(3),
-                segments.get(4),
-                segments.get(5),
-                segments.get(6),
-                segments.get(7)
-            ),
-            ImmutableList.of(segments.get(8), segments.get(9))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMergeWithBacktracking()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(20).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P1D")).version("2").size(20).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P4D")).version("2").size(20).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("3").size(20).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("4").size(20).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("3").size(20).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-07/P1D")).version("2").size(20).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(0), segments.get(1)),
-            ImmutableList.of(segments.get(2), segments.get(3), segments.get(4), segments.get(5), segments.get(6))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMergeWithGapsAlignedStart()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P8D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("3").size(8).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("3").size(8).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-09/P1D")).version("3").size(8).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(1), segments.get(0), segments.get(2))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMergeWithGapsNonalignedStart()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P8D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P1D")).version("3").size(8).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("3").size(8).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-09/P1D")).version("3").size(8).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMerge1()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P4D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("3").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("1").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("3").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(80).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMerge2()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(15).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P4D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("3").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("4").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("3").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(80).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(2), segments.get(3), segments.get(4))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMerge3()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P4D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("3").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("1").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("3").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(80).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(1), segments.get(2), segments.get(4))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMerge4()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P4D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("3").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("4").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("3").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(80).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(1), segments.get(2), segments.get(3), segments.get(4))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMerge5()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P4D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("3").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("1").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("3").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(80).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMerge6()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P4D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("3").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("4").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("3").size(25).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(80).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(2), segments.get(3), segments.get(4))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMerge7()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P4D")).version("2").size(120).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("3").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("4").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("3").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(80).build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(2), segments.get(3), segments.get(4), segments.get(5))
-        ), merge(segments)
-    );
-  }
-
-  @Test
-  public void testOverlappingMerge8()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-01/P1D")).version("2").size(80).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-02/P4D")).version("2").size(120).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-03/P1D")).version("3").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-04/P1D")).version("1").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-05/P1D")).version("3").size(1).build(),
-        DataSegment.builder().dataSource("foo").interval(Intervals.of("2012-01-06/P1D")).version("2").size(80).build()
-    );
-
-    Assert.assertEquals(ImmutableList.of(ImmutableList.of(segments.get(4), segments.get(5))), merge(segments));
-  }
-
-  @Test
-  public void testMergeLinearShardSpecs()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder()
-                   .dataSource("foo")
-                   .interval(Intervals.of("2012-01-01/P1D"))
-                   .version("1")
-                   .shardSpec(new LinearShardSpec(1))
-                   .build(),
-        DataSegment.builder()
-                   .dataSource("foo")
-                   .interval(Intervals.of("2012-01-02/P1D"))
-                   .version("1")
-                   .shardSpec(new LinearShardSpec(7))
-                   .build(),
-        DataSegment.builder().dataSource("foo")
-                   .interval(Intervals.of("2012-01-03/P1D"))
-                   .version("1")
-                   .shardSpec(new LinearShardSpec(1500))
-                   .build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(),
-        merge(segments)
-    );
-  }
-
-  @Test
-  public void testMergeMixedShardSpecs()
-  {
-    final List<DataSegment> segments = ImmutableList.of(
-        DataSegment.builder()
-                   .dataSource("foo")
-                   .interval(Intervals.of("2012-01-01/P1D"))
-                   .version("1")
-                   .build(),
-        DataSegment.builder()
-                   .dataSource("foo")
-                   .interval(Intervals.of("2012-01-02/P1D"))
-                   .version("1")
-                   .build(),
-        DataSegment.builder().dataSource("foo")
-                   .interval(Intervals.of("2012-01-03/P1D"))
-                   .version("1")
-                   .shardSpec(new LinearShardSpec(1500))
-                   .build(),
-        DataSegment.builder().dataSource("foo")
-                   .interval(Intervals.of("2012-01-04/P1D"))
-                   .version("1")
-                   .build(),
-        DataSegment.builder().dataSource("foo")
-                   .interval(Intervals.of("2012-01-05/P1D"))
-                   .version("1")
-                   .build()
-    );
-
-    Assert.assertEquals(
-        ImmutableList.of(
-            ImmutableList.of(segments.get(0), segments.get(1)),
-            ImmutableList.of(segments.get(3), segments.get(4))
-        ),
-        merge(segments)
-    );
-  }
-
-  /**
-   * Runs DruidCoordinatorSegmentMerger on a particular set of segments and returns the list of requested merges.
-   */
-  private static List<List<DataSegment>> merge(final Collection<DataSegment> segments)
-  {
-    final JacksonConfigManager configManager = EasyMock.createMock(JacksonConfigManager.class);
-    EasyMock.expect(configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class))
-            .andReturn(new AtomicReference<DatasourceWhitelist>(null)).anyTimes();
-    EasyMock.replay(configManager);
-
-    final List<List<DataSegment>> retVal = new ArrayList<>();
-    final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
-    {
-      @Override
-      public void mergeSegments(List<DataSegment> segmentsToMerge)
-      {
-        retVal.add(segmentsToMerge);
-      }
-    };
-
-    final DruidCoordinatorSegmentMerger merger = new DruidCoordinatorSegmentMerger(
-        indexingServiceClient,
-        configManager
-    );
-    final DruidCoordinatorRuntimeParams params =
-        DruidCoordinatorRuntimeParams.newBuilder()
-                                     .withAvailableSegments(ImmutableSet.copyOf(segments))
-                                     .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMergeBytesLimit(
-                                         mergeBytesLimit).withMergeSegmentsLimit(mergeSegmentsLimit).build())
-                                     .withEmitter(EasyMock.createMock(ServiceEmitter.class))
-                                     .build();
-    merger.run(params);
-    return retVal;
-  }
-}
diff --git a/server/src/test/java/org/apache/druid/server/http/ServersResourceTest.java b/server/src/test/java/org/apache/druid/server/http/ServersResourceTest.java
index da87e3b..f96d2a3 100644
--- a/server/src/test/java/org/apache/druid/server/http/ServersResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/ServersResourceTest.java
@@ -73,7 +73,7 @@ public class ServersResourceTest
                       + "\"priority\":0,"
                       + "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":"
                       + "{\"dataSource\":\"dataSource\",\"interval\":\"2016-03-22T14:00:00.000Z/2016-03-22T15:00:00.000Z\",\"version\":\"v0\",\"loadSpec\":{},\"dimensions\":\"\",\"metrics\":\"\","
-                      + "\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}},"
+                      + "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}},"
                       + "\"currSize\":1}]";
     Assert.assertEquals(expected, result);
   }
@@ -99,7 +99,7 @@ public class ServersResourceTest
                       + "\"priority\":0,"
                       + "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":"
                       + "{\"dataSource\":\"dataSource\",\"interval\":\"2016-03-22T14:00:00.000Z/2016-03-22T15:00:00.000Z\",\"version\":\"v0\",\"loadSpec\":{},\"dimensions\":\"\",\"metrics\":\"\","
-                      + "\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}},"
+                      + "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}},"
                       + "\"currSize\":1}";
     Assert.assertEquals(expected, result);
   }
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 3ba06cf..4097e95 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -66,7 +66,6 @@ import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.helper.DruidCoordinatorHelper;
 import org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller;
-import org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
 import org.apache.druid.server.http.ClusterResource;
 import org.apache.druid.server.http.CoordinatorCompactionConfigsResource;
 import org.apache.druid.server.http.CoordinatorDynamicConfigsResource;
@@ -205,16 +204,24 @@ public class CliCoordinator extends ServerRunnable
             LifecycleModule.register(binder, Server.class);
             LifecycleModule.register(binder, DataSourcesResource.class);
 
-            ConditionalMultibind.create(
+            final ConditionalMultibind<DruidCoordinatorHelper> conditionalMultibind = ConditionalMultibind.create(
                 properties,
                 binder,
                 DruidCoordinatorHelper.class,
                 CoordinatorIndexingServiceHelper.class
-            ).addConditionBinding(
-                "druid.coordinator.merge.on",
-                Predicates.equalTo("true"),
-                DruidCoordinatorSegmentMerger.class
-            ).addConditionBinding(
+            );
+
+            if (conditionalMultibind.matchCondition("druid.coordinator.merge.on", Predicates.equalTo("true"))) {
+              throw new UnsupportedOperationException(
+                  "'druid.coordinator.merge.on' is not supported anymore. "
+                  + "Please consider using Coordinator's automatic compaction instead. "
+                  + "See http://druid.io/docs/latest/operations/segment-optimization.html and "
+                  + "http://druid.io/docs/latest/operations/api-reference.html#compaction-configuration for more "
+                  + "details about compaction."
+              );
+            }
+
+            conditionalMultibind.addConditionBinding(
                 "druid.coordinator.kill.on",
                 Predicates.equalTo("true"),
                 DruidCoordinatorSegmentKiller.class


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org