You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2021/03/25 17:32:53 UTC

[druid] branch master updated: DruidInputSource: Fix issues in column projection, timestamp handling. (#10267)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bf20f9e  DruidInputSource: Fix issues in column projection, timestamp handling. (#10267)
bf20f9e is described below

commit bf20f9e9798417c9293a195690b6adcb48f44d3f
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Thu Mar 25 10:32:21 2021 -0700

    DruidInputSource: Fix issues in column projection, timestamp handling. (#10267)
    
    * DruidInputSource: Fix issues in column projection, timestamp handling.
    
    DruidInputSource, DruidSegmentReader changes:
    
    1) Remove "dimensions" and "metrics". They are not necessary, because we
       can compute which columns we need to read based on what is going to
       be used by the timestamp, transform, dimensions, and metrics.
    2) Start using ColumnsFilter (see below) to decide which columns we need
       to read.
    3) Actually respect the "timestampSpec". Previously, it was ignored, and
       the timestamp of the returned InputRows was set to the `__time` column
       of the input datasource.
    
    (1) and (2) together fix a bug in which the DruidInputSource would not
    properly read columns that are used as inputs to a transformSpec.
    
    (3) fixes a bug where the timestampSpec would be ignored if you attempted
    to set the column to something other than `__time`.
    
    (1) and (3) are breaking changes.
    
    Web console changes:
    
    1) Remove "Dimensions" and "Metrics" from the Druid input source.
    2) Set timestampSpec to `{"column": "__time", "format": "millis"}` for
       compatibility with the new behavior.
    
    Other changes:
    
    1) Add ColumnsFilter, a new class that allows input readers to determine
       which columns they need to read. Currently, it's only used by the
       DruidInputSource, but it could be used by other columnar input sources
       in the future.
    2) Add a ColumnsFilter to InputRowSchema.
    3) Remove the metric names from InputRowSchema (they were unused).
    4) Add InputRowSchemas.fromDataSchema method that computes the proper
       ColumnsFilter for given timestamp, dimensions, transform, and metrics.
    5) Add "getRequiredColumns" method to TransformSpec to support the above.
    
    * Various fixups.
    
    * Uncomment incorrectly commented lines.
    
    * Move TransformSpecTest to the proper module.
    
    * Add druid.indexer.task.ignoreTimestampSpecForDruidInputSource setting.
    
    * Fix.
    
    * Fix build.
    
    * Checkstyle.
    
    * Misc fixes.
    
    * Fix test.
    
    * Move config.
    
    * Fix imports.
    
    * Fixup.
    
    * Fix ShuffleResourceTest.
    
    * Add import.
    
    * Smarter exclusions.
    
    * Fixes based on tests.
    
    Also, add TIME_COLUMN constant in the web console.
    
    * Adjustments for tests.
    
    * Reorder test data.
    
    * Update docs.
    
    * Update docs to say Druid 0.22.0 instead of 0.21.0.
    
    * Fix test.
    
    * Fix ITAutoCompactionTest.
    
    * Changes from review & from merging.
---
 .../org/apache/druid/data/input/ColumnsFilter.java | 183 +++++++
 .../apache/druid/data/input/InputRowSchema.java    |  25 +-
 .../org/apache/druid/data/input/InputSource.java   |   6 +-
 .../apache/druid/data/input/MapBasedInputRow.java  |  24 +
 .../FirehoseFactoryToInputSourceAdaptorTest.java   |   3 +-
 .../druid/data/input/impl/ColumnsFilterTest.java   |  92 ++++
 .../druid/data/input/impl/CsvReaderTest.java       |   6 +-
 .../druid/data/input/impl/DelimitedReaderTest.java |   4 +-
 .../input/impl/InputEntityIteratingReaderTest.java |   4 +-
 .../druid/data/input/impl/JsonLineReaderTest.java  |  11 +-
 .../druid/data/input/impl/JsonReaderTest.java      |  22 +-
 docs/configuration/index.md                        |   2 +
 docs/ingestion/native-batch.md                     | 100 ++--
 .../data/input/aliyun/OssInputSourceTest.java      |   8 +-
 .../druid/data/input/avro/AvroOCFReaderTest.java   |   4 +-
 .../google/GoogleCloudStorageInputSourceTest.java  |   5 +-
 .../inputsource/hdfs/HdfsInputSourceTest.java      |   3 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   3 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   3 +-
 .../apache/druid/data/input/orc/OrcReaderTest.java |   3 +-
 .../input/parquet/CompatParquetReaderTest.java     |  14 +-
 .../input/parquet/DecimalParquetReaderTest.java    |   7 +-
 .../parquet/FlattenSpecParquetReaderTest.java      |  18 +-
 .../parquet/ParquetReaderResourceLeakTest.java     |   4 +-
 .../input/parquet/TimestampsParquetReaderTest.java |  10 +-
 .../data/input/parquet/WikiParquetReaderTest.java  |   4 +-
 .../druid/data/input/s3/S3InputSourceTest.java     |   5 +-
 .../indexing/common/ReingestionTimelineUtils.java  |   4 +
 .../druid/indexing/common/config/TaskConfig.java   |  13 +-
 .../common/task/AbstractBatchIndexTask.java        |  13 +-
 .../druid/indexing/common/task/CompactionTask.java |   9 +-
 .../indexing/common/task/InputSourceProcessor.java |   1 +
 .../firehose/IngestSegmentFirehoseFactory.java     |   4 +
 .../druid/indexing/input/DruidInputSource.java     | 124 ++++-
 .../indexing/input/DruidSegmentInputFormat.java    |  16 +-
 .../druid/indexing/input/DruidSegmentReader.java   | 212 +++++---
 .../druid/indexing/input/InputRowSchemas.java      | 130 +++++
 .../overlord/sampler/InputSourceSampler.java       |  17 +-
 .../SeekableStreamIndexTaskRunner.java             |  11 +-
 .../druid/indexing/common/TaskToolboxTest.java     |   2 +-
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |  13 +-
 .../common/task/CompactionTaskRunTest.java         |   3 +-
 .../indexing/common/task/CompactionTaskTest.java   |  13 +-
 .../druid/indexing/common/task/HadoopTaskTest.java |   3 +-
 .../indexing/common/task/IngestionTestBase.java    |   3 +-
 .../common/task/RealtimeIndexTaskTest.java         |  13 +-
 .../AbstractParallelIndexSupervisorTaskTest.java   |   8 +-
 .../druid/indexing/input/DruidInputSourceTest.java | 224 ++++++++
 .../indexing/input/DruidSegmentReaderTest.java     | 568 ++++++++++++++++++++-
 .../druid/indexing/input/InputRowSchemasTest.java  | 105 ++++
 .../overlord/SingleTaskBackgroundRunnerTest.java   |   3 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java |   2 +-
 .../RecordSupplierInputSourceTest.java             |   3 +-
 .../seekablestream/StreamChunkParserTest.java      |   9 +-
 .../indexing/worker/WorkerTaskManagerTest.java     |   3 +-
 .../indexing/worker/WorkerTaskMonitorTest.java     |   3 +-
 .../IntermediaryDataManagerAutoCleanupTest.java    |   3 +-
 ...ermediaryDataManagerManualAddAndDeleteTest.java |   3 +-
 .../shuffle/ShuffleDataSegmentPusherTest.java      |   3 +-
 .../worker/shuffle/ShuffleResourceTest.java        |   3 +-
 .../docker/environment-configs/common              |   4 +
 .../coordinator/duty/ITAutoCompactionTest.java     |   4 +-
 ...dia_parallel_druid_input_source_index_task.json |   3 +-
 .../wikipedia_reindex_druid_input_source_task.json |   2 +-
 ...ex_druid_input_source_task_with_transforms.json |   2 +-
 .../segment/transform/ExpressionTransform.java     |  16 +-
 .../apache/druid/segment/transform/Transform.java  |   7 +
 .../druid/segment/transform/TransformSpec.java     |  15 +
 .../segment/transform}/TransformSpecTest.java      |  32 +-
 .../apache/druid/segment/indexing/DataSchema.java  |  67 ++-
 .../druid/metadata/input/SqlInputSourceTest.java   |   4 +-
 .../druid/segment/indexing/DataSchemaTest.java     |   6 +-
 web-console/e2e-tests/reindexing.spec.ts           |  52 +-
 web-console/src/druid-models/ingestion-spec.tsx    |  26 -
 web-console/src/druid-models/timestamp-spec.tsx    |  11 +-
 web-console/src/utils/sampler.ts                   |  18 +-
 .../src/views/load-data-view/load-data-view.tsx    |  21 +-
 website/.spelling                                  |   2 +
 78 files changed, 2000 insertions(+), 409 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/ColumnsFilter.java b/core/src/main/java/org/apache/druid/data/input/ColumnsFilter.java
new file mode 100644
index 0000000..b01001f
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/data/input/ColumnsFilter.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Used by some {@link InputSourceReader} implementations in order to know what columns will need to be read out
+ * of the {@link InputRow} objects they create.
+ *
+ * This is meant to be useful as an optimization: if we're reading from a columnar data format, then when a column
+ * isn't going to be needed, we shouldn't read it.
+ *
+ * @see InputSource#reader accepts objects of this class
+ */
+public abstract class ColumnsFilter
+{
+  /**
+   * Accepts all columns.
+   */
+  public static ColumnsFilter all()
+  {
+    return new ExclusionBased(Collections.emptySet());
+  }
+
+  /**
+   * Accepts a specific list of columns.
+   */
+  public static ColumnsFilter inclusionBased(final Set<String> inclusions)
+  {
+    return new InclusionBased(inclusions);
+  }
+
+
+  /**
+   * Accepts all columns, except those on a specific list.
+   */
+  public static ColumnsFilter exclusionBased(final Set<String> exclusions)
+  {
+    return new ExclusionBased(exclusions);
+  }
+
+  /**
+   * Check if a column should be included or not.
+   */
+  public abstract boolean apply(String column);
+
+  /**
+   * Returns a new filter with a particular column added. The returned filter will return true from {@link #apply}
+   * on this column.
+   */
+  public abstract ColumnsFilter plus(String column);
+
+  public static class InclusionBased extends ColumnsFilter
+  {
+    private final Set<String> inclusions;
+
+    private InclusionBased(Set<String> inclusions)
+    {
+      this.inclusions = inclusions;
+    }
+
+    @Override
+    public boolean apply(String column)
+    {
+      return inclusions.contains(column);
+    }
+
+    @Override
+    public ColumnsFilter plus(String column)
+    {
+      if (inclusions.contains(column)) {
+        return this;
+      } else {
+        final Set<String> copy = new HashSet<>(inclusions);
+        copy.add(column);
+        return new InclusionBased(copy);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      InclusionBased that = (InclusionBased) o;
+      return Objects.equals(inclusions, that.inclusions);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(inclusions);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "ColumnsFilter.InclusionBased{" +
+             "inclusions=" + inclusions +
+             '}';
+    }
+  }
+
+  public static class ExclusionBased extends ColumnsFilter
+  {
+    private final Set<String> exclusions;
+
+    public ExclusionBased(Set<String> exclusions)
+    {
+      this.exclusions = exclusions;
+    }
+
+    @Override
+    public boolean apply(String column)
+    {
+      return !exclusions.contains(column);
+    }
+
+    @Override
+    public ColumnsFilter plus(String column)
+    {
+      if (!exclusions.contains(column)) {
+        return this;
+      } else {
+        final Set<String> copy = new HashSet<>(exclusions);
+        copy.remove(column);
+        return new ExclusionBased(copy);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ExclusionBased that = (ExclusionBased) o;
+      return Objects.equals(exclusions, that.exclusions);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(exclusions);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "ColumnsFilter.ExclusionBased{" +
+             "exclusions=" + exclusions +
+             '}';
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
index c908187..227bd3a 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
@@ -22,8 +22,6 @@ package org.apache.druid.data.input;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 
-import java.util.List;
-
 /**
  * Schema of {@link InputRow}.
  */
@@ -31,13 +29,17 @@ public class InputRowSchema
 {
   private final TimestampSpec timestampSpec;
   private final DimensionsSpec dimensionsSpec;
-  private final List<String> metricNames;
+  private final ColumnsFilter columnsFilter;
 
-  public InputRowSchema(TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec, List<String> metricNames)
+  public InputRowSchema(
+      final TimestampSpec timestampSpec,
+      final DimensionsSpec dimensionsSpec,
+      final ColumnsFilter columnsFilter
+  )
   {
     this.timestampSpec = timestampSpec;
     this.dimensionsSpec = dimensionsSpec;
-    this.metricNames = metricNames;
+    this.columnsFilter = columnsFilter;
   }
 
   public TimestampSpec getTimestampSpec()
@@ -50,8 +52,17 @@ public class InputRowSchema
     return dimensionsSpec;
   }
 
-  public List<String> getMetricNames()
+  /**
+   * A {@link ColumnsFilter} that can filter down the list of columns that must be read after flattening.
+   *
+   * Logically, Druid applies ingestion spec components in a particular order: first flattenSpec (if any), then
+   * timestampSpec, then transformSpec, and finally dimensionsSpec and metricsSpec.
+   *
+   * If a flattenSpec is provided, this method returns a filter that should be applied after flattening. So, it will
+   * be based on what needs to pass between the flattenSpec and everything beyond it.
+   */
+  public ColumnsFilter getColumnsFilter()
   {
-    return metricNames;
+    return columnsFilter;
   }
 }
diff --git a/core/src/main/java/org/apache/druid/data/input/InputSource.java b/core/src/main/java/org/apache/druid/data/input/InputSource.java
index ba0224e..0a3cda2 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputSource.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputSource.java
@@ -78,5 +78,9 @@ public interface InputSource
    * @param inputFormat        to parse data. It can be null if {@link #needsFormat()} = true
    * @param temporaryDirectory to store temp data. It will be cleaned up automatically once the task is finished.
    */
-  InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory);
+  InputSourceReader reader(
+      InputRowSchema inputRowSchema,
+      @Nullable InputFormat inputFormat,
+      File temporaryDirectory
+  );
 }
diff --git a/core/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java b/core/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java
index 59ab8a5..e9117f2 100644
--- a/core/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java
+++ b/core/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java
@@ -25,8 +25,10 @@ import org.joda.time.DateTime;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
+ *
  */
 @PublicApi
 public class MapBasedInputRow extends MapBasedRow implements InputRow
@@ -60,6 +62,28 @@ public class MapBasedInputRow extends MapBasedRow implements InputRow
   }
 
   @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    MapBasedInputRow that = (MapBasedInputRow) o;
+    return Objects.equals(dimensions, that.dimensions);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), dimensions);
+  }
+
+  @Override
   public String toString()
   {
     return "MapBasedInputRow{" +
diff --git a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
index bd8cbe1..3def282 100644
--- a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java
@@ -37,7 +37,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Stream;
@@ -71,7 +70,7 @@ public class FirehoseFactoryToInputSourceAdaptorTest extends InitializedNullHand
         new InputRowSchema(
             inputRowParser.getParseSpec().getTimestampSpec(),
             inputRowParser.getParseSpec().getDimensionsSpec(),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         null,
         null
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/ColumnsFilterTest.java b/core/src/test/java/org/apache/druid/data/input/impl/ColumnsFilterTest.java
new file mode 100644
index 0000000..00faf4e
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/impl/ColumnsFilterTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ColumnsFilterTest
+{
+  private static final List<String> COLUMNS = ImmutableList.of("a", "b", "c");
+
+  @Test
+  public void testAll()
+  {
+    Assert.assertEquals(
+        ImmutableList.of("a", "b", "c"),
+        apply(ColumnsFilter.all(), COLUMNS)
+    );
+  }
+
+  @Test
+  public void testInclusionBased()
+  {
+    Assert.assertEquals(
+        ImmutableList.of("b"),
+        apply(ColumnsFilter.inclusionBased(ImmutableSet.of("b")), COLUMNS)
+    );
+  }
+
+  @Test
+  public void testInclusionBasedPlus()
+  {
+    Assert.assertEquals(
+        ColumnsFilter.inclusionBased(ImmutableSet.of("a", "b", "c")),
+        ColumnsFilter.inclusionBased(ImmutableSet.of("b", "c")).plus("a").plus("c")
+    );
+  }
+
+  @Test
+  public void testExclusionBased()
+  {
+    Assert.assertEquals(
+        ImmutableList.of("a", "c"),
+        apply(ColumnsFilter.exclusionBased(ImmutableSet.of("b")), COLUMNS)
+    );
+  }
+
+  @Test
+  public void testExclusionBasedPlus()
+  {
+    Assert.assertEquals(
+        ColumnsFilter.exclusionBased(ImmutableSet.of("b")),
+        ColumnsFilter.exclusionBased(ImmutableSet.of("b", "c")).plus("a").plus("c")
+    );
+  }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(ColumnsFilter.InclusionBased.class).usingGetClass().verify();
+    EqualsVerifier.forClass(ColumnsFilter.ExclusionBased.class).usingGetClass().verify();
+  }
+
+  private List<String> apply(ColumnsFilter columnsFilter, List<String> columns)
+  {
+    return columns.stream().filter(columnsFilter::apply).collect(Collectors.toList());
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java
index ec94237..c1faa27 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
@@ -37,7 +38,6 @@ import org.junit.Test;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -47,7 +47,7 @@ public class CsvReaderTest
   private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
       new TimestampSpec("ts", "auto", null),
       new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))),
-      Collections.emptyList()
+      ColumnsFilter.all()
   );
 
   @BeforeClass
@@ -229,7 +229,7 @@ public class CsvReaderTest
         new InputRowSchema(
             new TimestampSpec("Timestamp", "auto", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java
index e590ed5..c98d8ff 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.data.input.impl;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
@@ -35,7 +36,6 @@ import org.junit.Test;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -44,7 +44,7 @@ public class DelimitedReaderTest
   private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
       new TimestampSpec("ts", "auto", null),
       new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))),
-      Collections.emptyList()
+      ColumnsFilter.all()
   );
 
   @BeforeClass
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
index e202d15..37b35f1 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.java.util.common.DateTimes;
@@ -37,7 +38,6 @@ import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 public class InputEntityIteratingReaderTest
@@ -64,7 +64,7 @@ public class InputEntityIteratingReaderTest
             new DimensionsSpec(
                 DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "name", "score"))
             ),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         new CsvInputFormat(
             ImmutableList.of("time", "name", "score"),
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java
index 3a400b5..c61ad49 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
@@ -66,7 +67,7 @@ public class JsonLineReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -116,7 +117,7 @@ public class JsonLineReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -158,7 +159,7 @@ public class JsonLineReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -200,7 +201,7 @@ public class JsonLineReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -242,7 +243,7 @@ public class JsonLineReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
index f554034..7ab52a0 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -38,7 +39,6 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
-import java.util.Collections;
 
 public class JsonReaderTest
 {
@@ -75,7 +75,7 @@ public class JsonReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -141,7 +141,7 @@ public class JsonReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -194,7 +194,8 @@ public class JsonReaderTest
 
     final ByteEntity source = new ByteEntity(
         StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
-                           + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}" //baz property is illegal
+                           + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}"
+                           //baz property is illegal
                            + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}")
     );
 
@@ -202,7 +203,7 @@ public class JsonReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -255,7 +256,7 @@ public class JsonReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -314,7 +315,8 @@ public class JsonReaderTest
     //2nd row is ill-formed
     final ByteEntity source = new ByteEntity(
         StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
-                           + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}\n" //value of baz is invalid
+                           + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}\n"
+                           //value of baz is invalid
                            + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n")
     );
 
@@ -322,7 +324,7 @@ public class JsonReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -377,7 +379,7 @@ public class JsonReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
@@ -431,7 +433,7 @@ public class JsonReaderTest
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         source,
         null
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 18c23da..5a91a77 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1286,6 +1286,7 @@ Additional peon configs include:
 |`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
 |`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
 |`druid.indexer.task.restoreTasksOnRestart`|If true, MiddleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
+|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
 |`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests served by a task's chat handler. Set to 0 to disable limiting.|0|
 
 If the peon is running in remote mode, there must be an Overlord up and running. Peons in remote mode can set the following configurations:
@@ -1350,6 +1351,7 @@ then the value from the configuration below is used:
 |`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer restart for restorable tasks to gracefully exit.|PT5M|
 |`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
 |`druid.indexer.task.restoreTasksOnRestart`|If true, the Indexer will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
+|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
 |`druid.peon.taskActionClient.retry.minWait`|The minimum retry time to communicate with Overlord.|PT5S|
 |`druid.peon.taskActionClient.retry.maxWait`|The maximum retry time to communicate with Overlord.|PT1M|
 |`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of retries to communicate with Overlord.|60|
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 5032ed1..dece5bf 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -1292,60 +1292,82 @@ no `inputFormat` field needs to be specified in the ingestion spec when using th
 |type|This should be "druid".|yes|
 |dataSource|A String defining the Druid datasource to fetch rows from|yes|
 |interval|A String representing an ISO-8601 interval, which defines the time range to fetch the data over.|yes|
-|dimensions|A list of Strings containing the names of dimension columns to select from the Druid datasource. If the list is empty, no dimensions are returned. If null, all dimensions are returned. |no|
-|metrics|The list of Strings containing the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.|no|
 |filter| See [Filters](../querying/filters.md). Only rows that match the filter, if specified, will be returned.|no|
 
-A minimal example DruidInputSource spec is shown below:
+The Druid input source can be used for a variety of purposes, including:
 
-```json
-...
-    "ioConfig": {
-      "type": "index_parallel",
-      "inputSource": {
-        "type": "druid",
-        "dataSource": "wikipedia",
-        "interval": "2013-01-01/2013-01-02"
-      }
-      ...
-    },
-...
-```
+- Creating new datasources that are rolled-up copies of existing datasources.
+- Changing the [partitioning or sorting](index.md#partitioning) of a datasource to improve performance.
+- Updating or removing rows using a [`transformSpec`](index.md#transformspec).
 
-The spec above will read all existing dimension and metric columns from
-the `wikipedia` datasource, including all rows with a timestamp (the `__time` column)
-within the interval `2013-01-01/2013-01-02`.
+When using the Druid input source, the timestamp column shows up as a numeric field named `__time` set to the number
+of milliseconds since the epoch (January 1, 1970 00:00:00 UTC). It is common to use this in the timestampSpec, if you
+want the output timestamp to be equivalent to the input timestamp. In this case, set the timestamp column to `__time`
+and the format to `auto` or `millis`.
 
-A spec that applies a filter and reads a subset of the original datasource's columns is shown below.
+It is OK for the input and output datasources to be the same. In this case, newly generated data will overwrite the
+previous data for the intervals specified in the `granularitySpec`. Generally, if you are going to do this, it is a
+good idea to test out your reindexing by writing to a separate datasource before overwriting your main one.
+Alternatively, if your goals can be satisfied by [compaction](compaction.md), consider that instead as a simpler
+approach.
+
+An example task spec is shown below. It reads from a hypothetical raw datasource `wikipedia_raw` and creates a new
+rolled-up datasource `wikipedia_rollup` by grouping on hour, "countryName", and "page".
 
 ```json
-...
+{
+  "type": "index_parallel",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "wikipedia_rollup",
+      "timestampSpec": {
+        "column": "__time",
+        "format": "millis"
+      },
+      "dimensionsSpec": {
+        "dimensions": [
+          "countryName",
+          "page"
+        ]
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "cnt"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "queryGranularity": "HOUR",
+        "segmentGranularity": "DAY",
+        "intervals": ["2016-06-27/P1D"],
+        "rollup": true
+      }
+    },
     "ioConfig": {
       "type": "index_parallel",
       "inputSource": {
         "type": "druid",
-        "dataSource": "wikipedia",
-        "interval": "2013-01-01/2013-01-02",
-        "dimensions": [
-          "page",
-          "user"
-        ],
-        "metrics": [
-          "added"
-        ],
-        "filter": {
-          "type": "selector",
-          "dimension": "page",
-          "value": "Druid"
-        }
+        "dataSource": "wikipedia_raw",
+        "interval": "2016-06-27/P1D"
       }
-      ...
     },
-...
+    "tuningConfig": {
+      "type": "index_parallel",
+      "partitionsSpec": {
+        "type": "hashed"
+      },
+      "forceGuaranteedRollup": true,
+      "maxNumConcurrentSubTasks": 1
+    }
+  }
+}
 ```
 
-This spec above will only return the `page`, `user` dimensions and `added` metric.
-Only rows where `page` = `Druid` will be returned.
+> Note: Older versions (0.19 and earlier) did not respect the timestampSpec when using the Druid input source. If you
+> have ingestion specs that rely on this and cannot rewrite them, set
+> [`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`](../configuration/index.md#indexer-general-configuration)
+> to `true` to enable a compatibility mode where the timestampSpec is ignored.
 
 ### SQL Input Source
 
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
index d38bc66..b7e9540 100644
--- a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
@@ -39,6 +39,7 @@ import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Provides;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSourceReader;
@@ -111,7 +112,8 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
   private static final OssClientConfig CLOUD_CONFIG_PROPERTIES = new OssClientConfig(
       "test.oss-cn.aliyun.com",
       new DefaultPasswordProvider("myKey"),
-      new DefaultPasswordProvider("mySecret"));
+      new DefaultPasswordProvider("mySecret")
+  );
 
   private static final List<CloudObjectLocation> EXPECTED_LOCATION =
       ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
@@ -454,7 +456,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
     InputRowSchema someSchema = new InputRowSchema(
         new TimestampSpec("time", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
-        ImmutableList.of("count")
+        ColumnsFilter.all()
     );
 
     InputSourceReader reader = inputSource.reader(
@@ -497,7 +499,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
     InputRowSchema someSchema = new InputRowSchema(
         new TimestampSpec("time", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
-        ImmutableList.of("count")
+        ColumnsFilter.all()
     );
 
     InputSourceReader reader = inputSource.reader(
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
index da586d8..4841483 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFReaderTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.druid.data.input.AvroHadoopInputRowParserTest;
 import org.apache.druid.data.input.AvroStreamInputRowParserTest;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -199,10 +200,9 @@ public class AvroOCFReaderTest
     final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
     final DimensionsSpec dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
         "eventType")));
-    final List<String> metricNames = ImmutableList.of("someLong");
 
     final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null);
-    final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, metricNames);
+    final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all());
     final FileEntity entity = new FileEntity(someAvroFile);
     return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
   }
diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
index e68a9a4..4b42efe 100644
--- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
+++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
@@ -31,6 +31,7 @@ import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Provides;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSourceReader;
@@ -226,7 +227,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
     InputRowSchema someSchema = new InputRowSchema(
         new TimestampSpec("time", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
-        ImmutableList.of("count")
+        ColumnsFilter.all()
     );
 
     InputSourceReader reader = inputSource.reader(
@@ -269,7 +270,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
     InputRowSchema someSchema = new InputRowSchema(
         new TimestampSpec("time", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
-        ImmutableList.of("count")
+        ColumnsFilter.all()
     );
 
     InputSourceReader reader = inputSource.reader(
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
index a61a0c6..96a99ad 100644
--- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.InjectableValues.Std;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
@@ -76,7 +77,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
   private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
       new TimestampSpec(null, null, null),
       DimensionsSpec.EMPTY,
-      Collections.emptyList()
+      ColumnsFilter.all()
   );
   private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
       Arrays.asList(TimestampSpec.DEFAULT_COLUMN, COLUMN),
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 992bb19..3d9dc1c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2781,7 +2781,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
         true,
         null,
         null,
-        null
+        null,
+        false
     );
     final TestDerbyConnector derbyConnector = derby.getConnector();
     derbyConnector.createDataSourceTable();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 302a4a0..2fe4d33 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2867,7 +2867,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
         true,
         null,
         null,
-        null
+        null,
+        false
     );
     final TestDerbyConnector derbyConnector = derby.getConnector();
     derbyConnector.createDataSourceTable();
diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
index bef9b64..9726c0e 100644
--- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
+++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.orc;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
@@ -259,7 +260,7 @@ public class OrcReaderTest
       String dataFile
   ) throws IOException
   {
-    final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, Collections.emptyList());
+    final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all());
     final FileEntity entity = new FileEntity(new File(dataFile));
     return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
   }
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java
index f8b586b..6017321 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetReaderTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.parquet;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -34,7 +35,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -49,7 +49,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("ts", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("field"))),
-        ImmutableList.of()
+        ColumnsFilter.all()
     );
     InputEntityReader reader = createReader(
         file,
@@ -114,7 +114,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        ImmutableList.of("metric1")
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.ROOT, "col", "col"),
@@ -200,7 +200,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractByLogicalMap", "$.intToStringColumn.1"),
@@ -315,7 +315,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("repeatedInt"))),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.ROOT, "repeatedInt", "repeatedInt")
@@ -353,7 +353,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec", "extracted1", "extracted2"))),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.PATH, "extracted1", "$.myComplex[0].id"),
@@ -395,7 +395,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractedOptional", "$.optionalMessage.someId"),
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java
index 50b9fe2..faa80e6 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetReaderTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.parquet;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -49,7 +50,7 @@ public class DecimalParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("fixed_len_dec"))),
-        ImmutableList.of("metric1")
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.ROOT, "fixed_len_dec", "fixed_len_dec"),
@@ -86,7 +87,7 @@ public class DecimalParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec"))),
-        ImmutableList.of("metric1")
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i32_dec"),
@@ -123,7 +124,7 @@ public class DecimalParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i64_dec"))),
-        ImmutableList.of("metric1")
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i64_dec"),
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java
index 5be38dd..7ff4306 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetReaderTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.parquet;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -33,7 +34,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -69,7 +69,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "listDim"))),
-        ImmutableList.of("metric1", "metric2")
+        ColumnsFilter.all()
     );
     JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of());
     InputEntityReader reader = createReader(
@@ -103,7 +103,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        ImmutableList.of("metric1", "metric2")
+        ColumnsFilter.all()
     );
     InputEntityReader reader = createReader(
         file,
@@ -136,7 +136,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "list"))),
-        ImmutableList.of("metric1", "metric2")
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
@@ -177,7 +177,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "listExtracted"))),
-        ImmutableList.of("metric1", "metric2")
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
@@ -217,7 +217,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1"))),
-        ImmutableList.of("metric1")
+        ColumnsFilter.all()
     );
     JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of());
     InputEntityReader reader = createReader(
@@ -253,7 +253,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        ImmutableList.of("metric1", "metric2")
+        ColumnsFilter.all()
     );
     InputEntityReader reader = createReader(
         file,
@@ -286,7 +286,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        ImmutableList.of("metric1", "metric2")
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
@@ -329,7 +329,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
         new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
index 251fa34..f8e56b3 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.parquet;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
@@ -39,7 +40,6 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Collections;
 import java.util.Objects;
 
 public class ParquetReaderResourceLeakTest extends BaseParquetReaderTest
@@ -55,7 +55,7 @@ public class ParquetReaderResourceLeakTest extends BaseParquetReaderTest
         new DimensionsSpec(
             DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled"))
         ),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     FetchingFileEntity entity = new FetchingFileEntity(new File("example/wiki/wiki.parquet"));
     ParquetInputFormat parquet = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration());
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java
index 19f1544..c0189fe 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetReaderTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.parquet;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -31,7 +32,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -46,12 +46,12 @@ public class TimestampsParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schemaAsString = new InputRowSchema(
         new TimestampSpec("date_as_string", "Y-M-d", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     InputRowSchema schemaAsDate = new InputRowSchema(
         new TimestampSpec("date_as_date", null, null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     InputEntityReader readerAsString = createReader(
         file,
@@ -104,7 +104,7 @@ public class TimestampsParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("ts", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     InputEntityReader reader = createReader(file, schema, JSONPathSpec.DEFAULT);
 
@@ -130,7 +130,7 @@ public class TimestampsParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("time", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     InputEntityReader reader = createReader(
         file,
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java
index 75e5e91..4bc7bac 100644
--- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetReaderTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.parquet;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -31,7 +32,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -45,7 +45,7 @@ public class WikiParquetReaderTest extends BaseParquetReaderTest
     InputRowSchema schema = new InputRowSchema(
         new TimestampSpec("timestamp", "iso", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled"))),
-        Collections.emptyList()
+        ColumnsFilter.all()
     );
     InputEntityReader reader = createReader("example/wiki/wiki.parquet", schema, JSONPathSpec.DEFAULT);
 
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index 9efaf4f..41a5f8a 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -40,6 +40,7 @@ import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Provides;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSourceReader;
@@ -509,7 +510,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     InputRowSchema someSchema = new InputRowSchema(
         new TimestampSpec("time", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
-        ImmutableList.of("count")
+        ColumnsFilter.all()
     );
 
     InputSourceReader reader = inputSource.reader(
@@ -553,7 +554,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     InputRowSchema someSchema = new InputRowSchema(
         new TimestampSpec("time", "auto", null),
         new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
-        ImmutableList.of("count")
+        ColumnsFilter.all()
     );
 
     InputSourceReader reader = inputSource.reader(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java
index bd9d214..8714fa6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/ReingestionTimelineUtils.java
@@ -34,6 +34,10 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+/**
+ * @deprecated only used by {@link org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory}
+ */
+@Deprecated
 public class ReingestionTimelineUtils
 {
   /**
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 7c22dad..bf887e5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -67,6 +67,9 @@ public class TaskConfig
   @JsonProperty
   private final List<StorageLocationConfig> shuffleDataLocations;
 
+  @JsonProperty
+  private final boolean ignoreTimestampSpecForDruidInputSource;
+
   @JsonCreator
   public TaskConfig(
       @JsonProperty("baseDir") String baseDir,
@@ -77,7 +80,8 @@ public class TaskConfig
       @JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart,
       @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
       @JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
-      @JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations
+      @JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
+      @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource
   )
   {
     this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@@ -102,6 +106,7 @@ public class TaskConfig
     } else {
       this.shuffleDataLocations = shuffleDataLocations;
     }
+    this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
   }
 
   @JsonProperty
@@ -178,6 +183,12 @@ public class TaskConfig
     return shuffleDataLocations;
   }
 
+  @JsonProperty
+  public boolean isIgnoreTimestampSpecForDruidInputSource()
+  {
+    return ignoreTimestampSpecForDruidInputSource;
+  }
+
   private String defaultDir(@Nullable String configParameter, final String defaultVal)
   {
     if (configParameter == null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 6f7cdf6..72f1502 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -26,7 +26,6 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.indexer.TaskStatus;
@@ -42,6 +41,7 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
+import org.apache.druid.indexing.input.InputRowSchemas;
 import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -49,7 +49,6 @@ import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -66,7 +65,6 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -176,16 +174,9 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
       ParseExceptionHandler parseExceptionHandler
   ) throws IOException
   {
-    final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
-                                            .map(AggregatorFactory::getName)
-                                            .collect(Collectors.toList());
     final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
         inputSource.reader(
-            new InputRowSchema(
-                dataSchema.getTimestampSpec(),
-                dataSchema.getDimensionsSpec(),
-                metricsNames
-            ),
+            InputRowSchemas.fromDataSchema(dataSchema),
             inputFormat,
             tmpDir
         )
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 8234c35..6f69811 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -624,12 +624,13 @@ public class CompactionTask extends AbstractBatchIndexTask
             interval,
             null,
             null,
-            dataSchema.getDimensionsSpec().getDimensionNames(),
-            Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
+            null,
+            null,
             toolbox.getIndexIO(),
             coordinatorClient,
             segmentLoaderFactory,
-            retryPolicyFactory
+            retryPolicyFactory,
+            toolbox.getConfig()
         ),
         null,
         false
@@ -699,7 +700,7 @@ public class CompactionTask extends AbstractBatchIndexTask
     return new
         DataSchema(
         dataSource,
-        new TimestampSpec(null, null, null),
+        new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
         finalDimensionsSpec,
         finalMetricsSpec,
         uniformGranularitySpec,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
index 05ac79e..63ebba8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java
@@ -76,6 +76,7 @@ public class InputSourceProcessor
                                                         ? (DynamicPartitionsSpec) partitionsSpec
                                                         : null;
     final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+
     try (
         final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
             tmpDir,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 3ba936e..01b3d95 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -62,6 +62,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+/**
+ * @deprecated use {@link DruidInputSource} instead
+ */
+@Deprecated
 public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>>
 {
   private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index f10e40b..c9d0f4e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -22,10 +22,10 @@ package org.apache.druid.indexing.input;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.data.input.AbstractInputSource;
@@ -39,10 +39,11 @@ import org.apache.druid.data.input.SegmentsSplitHintSpec;
 import org.apache.druid.data.input.SplitHintSpec;
 import org.apache.druid.data.input.impl.InputEntityIteratingReader;
 import org.apache.druid.data.input.impl.SplittableInputSource;
-import org.apache.druid.indexing.common.ReingestionTimelineUtils;
+import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexing.common.RetryPolicy;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
@@ -50,6 +51,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
@@ -70,16 +72,28 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Stream;
 
+/**
+ * An {@link org.apache.druid.data.input.InputSource} that allows reading from Druid segments.
+ *
+ * Used internally by {@link org.apache.druid.indexing.common.task.CompactionTask}, and can also be used directly.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
 public class DruidInputSource extends AbstractInputSource implements SplittableInputSource<List<WindowedSegmentId>>
 {
   private static final Logger LOG = new Logger(DruidInputSource.class);
 
   /**
+   * Timestamp formats that the standard __time column can be parsed with.
+   */
+  private static final List<String> STANDARD_TIME_COLUMN_FORMATS = ImmutableList.of("millis", "auto");
+
+  /**
    * A Comparator that orders {@link WindowedSegmentId} mainly by segmentId (which is important), and then by intervals
    * (which is arbitrary, and only here for totality of ordering).
    */
@@ -113,12 +127,21 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
   @Nullable
   private final List<WindowedSegmentId> segmentIds;
   private final DimFilter dimFilter;
-  private final List<String> dimensions;
-  private final List<String> metrics;
   private final IndexIO indexIO;
   private final CoordinatorClient coordinatorClient;
   private final SegmentLoaderFactory segmentLoaderFactory;
   private final RetryPolicyFactory retryPolicyFactory;
+  private final TaskConfig taskConfig;
+
+  /**
+   * Included for serde backwards-compatibility only. Not used.
+   */
+  private final List<String> dimensions;
+
+  /**
+   * Included for serde backwards-compatibility only. Not used.
+   */
+  private final List<String> metrics;
 
   @JsonCreator
   public DruidInputSource(
@@ -133,7 +156,8 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
       @JacksonInject IndexIO indexIO,
       @JacksonInject CoordinatorClient coordinatorClient,
       @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
-      @JacksonInject RetryPolicyFactory retryPolicyFactory
+      @JacksonInject RetryPolicyFactory retryPolicyFactory,
+      @JacksonInject TaskConfig taskConfig
   )
   {
     Preconditions.checkNotNull(dataSource, "dataSource");
@@ -150,6 +174,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
     this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
     this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
     this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
+    this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig");
   }
 
   @JsonProperty
@@ -167,7 +192,6 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
 
   @Nullable
   @JsonProperty("segments")
-  @JsonInclude(Include.NON_NULL)
   public List<WindowedSegmentId> getSegmentIds()
   {
     return segmentIds;
@@ -179,12 +203,18 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
     return dimFilter;
   }
 
+  /**
+   * Included for serde backwards-compatibility only. Not used.
+   */
   @JsonProperty
   public List<String> getDimensions()
   {
     return dimensions;
   }
 
+  /**
+   * Included for serde backwards-compatibility only. Not used.
+   */
   @JsonProperty
   public List<String> getMetrics()
   {
@@ -207,28 +237,38 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
               .from(partitionHolder)
               .transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval()));
         }).iterator();
-    final List<String> effectiveDimensions = ReingestionTimelineUtils.getDimensionsToReingest(
-        dimensions,
-        inputRowSchema.getDimensionsSpec(),
-        timeline
-    );
 
-    List<String> effectiveMetrics;
-    if (metrics == null) {
-      effectiveMetrics = ReingestionTimelineUtils.getUniqueMetrics(timeline);
+    final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter);
+
+    final InputRowSchema inputRowSchemaToUse;
+
+    if (taskConfig.isIgnoreTimestampSpecForDruidInputSource()) {
+      // Legacy compatibility mode; see https://github.com/apache/druid/pull/10267.
+      LOG.warn("Ignoring the provided timestampSpec and reading the __time column instead. To use timestampSpecs with "
+               + "the 'druid' input source, set druid.indexer.task.ignoreTimestampSpecForDruidInputSource to false.");
+
+      inputRowSchemaToUse = new InputRowSchema(
+          new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, STANDARD_TIME_COLUMN_FORMATS.iterator().next(), null),
+          inputRowSchema.getDimensionsSpec(),
+          inputRowSchema.getColumnsFilter().plus(ColumnHolder.TIME_COLUMN_NAME)
+      );
     } else {
-      effectiveMetrics = metrics;
+      inputRowSchemaToUse = inputRowSchema;
     }
 
-    final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(
-        indexIO,
-        dimFilter,
-        effectiveDimensions,
-        effectiveMetrics
-    );
+    if (ColumnHolder.TIME_COLUMN_NAME.equals(inputRowSchemaToUse.getTimestampSpec().getTimestampColumn())
+        && !STANDARD_TIME_COLUMN_FORMATS.contains(inputRowSchemaToUse.getTimestampSpec().getTimestampFormat())) {
+      // Slight chance the user did this intentionally, but not likely. Log a warning.
+      LOG.warn(
+          "The provided timestampSpec refers to the %s column without using format %s. If you wanted to read the "
+          + "column as-is, switch formats.",
+          inputRowSchemaToUse.getTimestampSpec().getTimestampColumn(),
+          STANDARD_TIME_COLUMN_FORMATS
+      );
+    }
 
     return new InputEntityIteratingReader(
-        inputRowSchema,
+        inputRowSchemaToUse,
         inputFormat,
         entityIterator,
         temporaryDirectory
@@ -300,7 +340,8 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
         indexIO,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        taskConfig
     );
   }
 
@@ -310,6 +351,43 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
     return false;
   }
 
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DruidInputSource that = (DruidInputSource) o;
+    return Objects.equals(dataSource, that.dataSource)
+           && Objects.equals(interval, that.interval)
+           && Objects.equals(segmentIds, that.segmentIds)
+           && Objects.equals(dimFilter, that.dimFilter)
+           && Objects.equals(dimensions, that.dimensions)
+           && Objects.equals(metrics, that.metrics);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(dataSource, interval, segmentIds, dimFilter, dimensions, metrics);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DruidInputSource{" +
+           "dataSource='" + dataSource + '\'' +
+           ", interval=" + interval +
+           ", segmentIds=" + segmentIds +
+           ", dimFilter=" + dimFilter +
+           (dimensions != null ? ", dimensions=" + dimensions : "") +
+           (metrics != null ? ", metrics=" + metrics : "") +
+           '}';
+  }
+
   public static Iterator<InputSplit<List<WindowedSegmentId>>> createSplits(
       CoordinatorClient coordinatorClient,
       RetryPolicyFactory retryPolicyFactory,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
index 80f8772..4d02859 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
@@ -27,26 +27,19 @@ import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.IndexIO;
 
 import java.io.File;
-import java.util.List;
 
 public class DruidSegmentInputFormat implements InputFormat
 {
   private final IndexIO indexIO;
   private final DimFilter dimFilter;
-  private List<String> dimensions;
-  private List<String> metrics;
 
-  DruidSegmentInputFormat(
+  public DruidSegmentInputFormat(
       IndexIO indexIO,
-      DimFilter dimFilter,
-      List<String> dimensions,
-      List<String> metrics
+      DimFilter dimFilter
   )
   {
     this.indexIO = indexIO;
     this.dimFilter = dimFilter;
-    this.dimensions = dimensions;
-    this.metrics = metrics;
   }
 
   @Override
@@ -65,8 +58,9 @@ public class DruidSegmentInputFormat implements InputFormat
     return new DruidSegmentReader(
         source,
         indexIO,
-        dimensions,
-        metrics,
+        inputRowSchema.getTimestampSpec(),
+        inputRowSchema.getDimensionsSpec(),
+        inputRowSchema.getColumnsFilter(),
         dimFilter,
         temporaryDirectory
     );
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
index 25ada0a..8e3bfe7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
@@ -21,12 +21,18 @@ package org.apache.druid.indexing.input;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.data.input.InputEntity.CleanableFile;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.IntermediateRowParsingReader;
-import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -35,56 +41,64 @@ import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.filter.Filters;
 import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
 import org.apache.druid.utils.CollectionUtils;
-import org.joda.time.DateTime;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
 {
   private final DruidSegmentInputEntity source;
   private final IndexIO indexIO;
-  private final List<String> dimensions;
-  private final List<String> metrics;
+  private final ColumnsFilter columnsFilter;
+  private final InputRowSchema inputRowSchema;
   private final DimFilter dimFilter;
   private final File temporaryDirectory;
 
   DruidSegmentReader(
-      InputEntity source,
-      IndexIO indexIO,
-      List<String> dimensions,
-      List<String> metrics,
-      DimFilter dimFilter,
-      File temporaryDirectory
+      final InputEntity source,
+      final IndexIO indexIO,
+      final TimestampSpec timestampSpec,
+      final DimensionsSpec dimensionsSpec,
+      final ColumnsFilter columnsFilter,
+      final DimFilter dimFilter,
+      final File temporaryDirectory
   )
   {
     Preconditions.checkArgument(source instanceof DruidSegmentInputEntity);
     this.source = (DruidSegmentInputEntity) source;
     this.indexIO = indexIO;
-    this.dimensions = dimensions;
-    this.metrics = metrics;
+    this.columnsFilter = columnsFilter;
+    this.inputRowSchema = new InputRowSchema(
+        timestampSpec,
+        dimensionsSpec,
+        columnsFilter
+    );
     this.dimFilter = dimFilter;
     this.temporaryDirectory = temporaryDirectory;
   }
@@ -109,10 +123,23 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
         null
     );
 
+    // Retain order of columns from the original segments. Useful for preserving dimension order if we're in
+    // schemaless mode.
+    final Set<String> columnsToRead = Sets.newLinkedHashSet(
+        Iterables.filter(
+            Iterables.concat(
+                Collections.singleton(ColumnHolder.TIME_COLUMN_NAME),
+                storageAdapter.getAdapter().getAvailableDimensions(),
+                storageAdapter.getAdapter().getAvailableMetrics()
+            ),
+            columnsFilter::apply
+        )
+    );
+
     final Sequence<Map<String, Object>> sequence = Sequences.concat(
         Sequences.map(
             cursors,
-            this::cursorToSequence
+            cursor -> cursorToSequence(cursor, columnsToRead)
         )
     );
 
@@ -122,8 +149,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
   @Override
   protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow) throws ParseException
   {
-    final DateTime timestamp = (DateTime) intermediateRow.get(ColumnHolder.TIME_COLUMN_NAME);
-    return Collections.singletonList(new MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow));
+    return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, intermediateRow));
   }
 
   @Override
@@ -137,14 +163,13 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
    * Map<String, Object> intermediate rows, selecting the dimensions and metrics of this segment reader.
    *
    * @param cursor A cursor
+   *
    * @return A sequence of intermediate rows
    */
-  private Sequence<Map<String, Object>> cursorToSequence(
-      final Cursor cursor
-  )
+  private Sequence<Map<String, Object>> cursorToSequence(final Cursor cursor, final Set<String> columnsToRead)
   {
     return Sequences.simple(
-        () -> new IntermediateRowFromCursorIterator(cursor, dimensions, metrics)
+        () -> new IntermediateRowFromCursorIterator(cursor, columnsToRead)
     );
   }
 
@@ -152,8 +177,9 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
    * @param sequence    A sequence of intermediate rows generated from a sequence of
    *                    cursors in {@link #intermediateRowIterator()}
    * @param segmentFile The underlying segment file containing the row data
+   *
    * @return A CloseableIterator from a sequence of intermediate rows, closing the underlying segment file
-   *         when the iterator is closed.
+   * when the iterator is closed.
    */
   @VisibleForTesting
   static CloseableIterator<Map<String, Object>> makeCloseableIteratorFromSequenceAndSegmentFile(
@@ -191,45 +217,91 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
   }
 
   /**
+   * Reads columns for {@link IntermediateRowFromCursorIterator}.
+   */
+  private static class IntermediateRowColumnProcessorFactory implements ColumnProcessorFactory<Supplier<Object>>
+  {
+    private static final IntermediateRowColumnProcessorFactory INSTANCE = new IntermediateRowColumnProcessorFactory();
+
+    @Override
+    public ValueType defaultType()
+    {
+      return ValueType.STRING;
+    }
+
+    @Override
+    public Supplier<Object> makeDimensionProcessor(DimensionSelector selector, boolean multiValue)
+    {
+      return () -> {
+        final IndexedInts vals = selector.getRow();
+
+        int valsSize = vals.size();
+        if (valsSize == 1) {
+          return selector.lookupName(vals.get(0));
+        } else if (valsSize > 1) {
+          List<String> dimVals = new ArrayList<>(valsSize);
+          for (int i = 0; i < valsSize; ++i) {
+            dimVals.add(selector.lookupName(vals.get(i)));
+          }
+
+          return dimVals;
+        }
+
+        return null;
+      };
+    }
+
+    @Override
+    public Supplier<Object> makeFloatProcessor(BaseFloatColumnValueSelector selector)
+    {
+      return () -> selector.isNull() ? null : selector.getFloat();
+    }
+
+    @Override
+    public Supplier<Object> makeDoubleProcessor(BaseDoubleColumnValueSelector selector)
+    {
+      return () -> selector.isNull() ? null : selector.getDouble();
+    }
+
+    @Override
+    public Supplier<Object> makeLongProcessor(BaseLongColumnValueSelector selector)
+    {
+      return () -> selector.isNull() ? null : selector.getLong();
+    }
+
+    @Override
+    public Supplier<Object> makeComplexProcessor(BaseObjectColumnValueSelector<?> selector)
+    {
+      return selector::getObject;
+    }
+  }
+
+  /**
    * Given a {@link Cursor}, a list of dimension names, and a list of metric names, this iterator
    * returns the rows of the cursor as Map<String, Object> intermediate rows.
    */
   private static class IntermediateRowFromCursorIterator implements Iterator<Map<String, Object>>
   {
     private final Cursor cursor;
-    private final BaseLongColumnValueSelector timestampColumnSelector;
-    private final Map<String, DimensionSelector> dimSelectors;
-    private final Map<String, BaseObjectColumnValueSelector> metSelectors;
+    private final Map<String, Supplier<Object>> columnReaders;
 
     public IntermediateRowFromCursorIterator(
-        Cursor cursor,
-        List<String> dimensionNames,
-        List<String> metricNames
+        final Cursor cursor,
+        final Set<String> columnsToRead
     )
     {
       this.cursor = cursor;
+      this.columnReaders = CollectionUtils.newLinkedHashMapWithExpectedSize(columnsToRead.size());
 
-      timestampColumnSelector = cursor
-          .getColumnSelectorFactory()
-          .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
-
-      dimSelectors = new HashMap<>();
-      for (String dim : dimensionNames) {
-        final DimensionSelector dimSelector = cursor
-            .getColumnSelectorFactory()
-            .makeDimensionSelector(new DefaultDimensionSpec(dim, dim));
-        // dimSelector is null if the dimension is not present
-        if (dimSelector != null) {
-          dimSelectors.put(dim, dimSelector);
-        }
-      }
-
-      metSelectors = new HashMap<>();
-      for (String metric : metricNames) {
-        final BaseObjectColumnValueSelector metricSelector = cursor
-            .getColumnSelectorFactory()
-            .makeColumnValueSelector(metric);
-        metSelectors.put(metric, metricSelector);
+      for (String column : columnsToRead) {
+        columnReaders.put(
+            column,
+            ColumnProcessors.makeProcessor(
+                column,
+                IntermediateRowColumnProcessorFactory.INSTANCE,
+                cursor.getColumnSelectorFactory()
+            )
+        );
       }
     }
 
@@ -245,46 +317,18 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
       if (!hasNext()) {
         throw new NoSuchElementException();
       }
-      final Map<String, Object> theEvent =
-          CollectionUtils.newLinkedHashMapWithExpectedSize(dimSelectors.size() + metSelectors.size() + 1);
-
-      for (Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
-        final String dim = dimSelector.getKey();
-        final DimensionSelector selector = dimSelector.getValue();
-        final IndexedInts vals = selector.getRow();
-
-        int valsSize = vals.size();
-        if (valsSize == 1) {
-          final String dimVal = selector.lookupName(vals.get(0));
-          theEvent.put(dim, dimVal);
-        } else if (valsSize > 1) {
-          List<String> dimVals = new ArrayList<>(valsSize);
-          for (int i = 0; i < valsSize; ++i) {
-            dimVals.add(selector.lookupName(vals.get(i)));
-          }
-          theEvent.put(dim, dimVals);
-        }
-      }
+      final Map<String, Object> rowMap =
+          CollectionUtils.newLinkedHashMapWithExpectedSize(columnReaders.size());
 
-      for (Entry<String, BaseObjectColumnValueSelector> metSelector : metSelectors.entrySet()) {
-        final String metric = metSelector.getKey();
-        final BaseObjectColumnValueSelector selector = metSelector.getValue();
-        Object value = selector.getObject();
+      for (Entry<String, Supplier<Object>> entry : columnReaders.entrySet()) {
+        final Object value = entry.getValue().get();
         if (value != null) {
-          theEvent.put(metric, value);
+          rowMap.put(entry.getKey(), value);
         }
       }
 
-      // Timestamp is added last because we expect that the time column will always be a date time object.
-      // If it is added earlier, it can be overwritten by metrics or dimenstions with the same name.
-      //
-      // If a user names a metric or dimension `__time` it will be overwritten. This case should be rare since
-      // __time is reserved for the time column in druid segments.
-      final long timestamp = timestampColumnSelector.getLong();
-      theEvent.put(ColumnHolder.TIME_COLUMN_NAME, DateTimes.utc(timestamp));
-
       cursor.advance();
-      return theEvent;
+      return rowMap;
     }
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
new file mode 100644
index 0000000..f273be7
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.input;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.transform.Transform;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities that are helpful when implementing {@link org.apache.druid.data.input.InputEntityReader}.
+ */
+public class InputRowSchemas
+{
+  private InputRowSchemas()
+  {
+    // No instantiation.
+  }
+
+  /**
+   * Creates an {@link InputRowSchema} from a given {@link DataSchema}.
+   */
+  public static InputRowSchema fromDataSchema(final DataSchema dataSchema)
+  {
+    return new InputRowSchema(
+        dataSchema.getTimestampSpec(),
+        dataSchema.getDimensionsSpec(),
+        createColumnsFilter(
+            dataSchema.getTimestampSpec(),
+            dataSchema.getDimensionsSpec(),
+            dataSchema.getTransformSpec(),
+            dataSchema.getAggregators()
+        )
+    );
+  }
+
+  /**
+   * Build a {@link ColumnsFilter} that can filter down the list of columns that must be read after flattening.
+   *
+   * @see InputRowSchema#getColumnsFilter()
+   */
+  @VisibleForTesting
+  static ColumnsFilter createColumnsFilter(
+      final TimestampSpec timestampSpec,
+      final DimensionsSpec dimensionsSpec,
+      final TransformSpec transformSpec,
+      final AggregatorFactory[] aggregators
+  )
+  {
+    // We'll need to know what fields are generated from transforms, vs. expected from the raw data.
+    final Set<String> transformOutputNames =
+        transformSpec.getTransforms().stream().map(Transform::getName).collect(Collectors.toSet());
+
+    if (dimensionsSpec.hasCustomDimensions()) {
+      // We need an inclusion-based filter.
+      final Set<String> inclusions = new HashSet<>();
+
+      // Add timestamp column.
+      inclusions.add(timestampSpec.getTimestampColumn());
+
+      // Add all transform inputs.
+      inclusions.addAll(transformSpec.getRequiredColumns());
+
+      // Add all dimension inputs that are *not* transform outputs.
+      for (String column : dimensionsSpec.getDimensionNames()) {
+        if (!transformOutputNames.contains(column)) {
+          inclusions.add(column);
+        }
+      }
+
+      // Add all aggregator inputs that are *not* transform outputs.
+      for (AggregatorFactory aggregator : aggregators) {
+        for (String column : aggregator.requiredFields()) {
+          if (!transformOutputNames.contains(column)) {
+            inclusions.add(column);
+          }
+        }
+      }
+
+      return ColumnsFilter.inclusionBased(inclusions);
+    } else {
+      // Schemaless dimensions mode: we need an exclusion-based filter.
+      // Start from the list of dimension exclusions.
+      final Set<String> exclusions = new HashSet<>(dimensionsSpec.getDimensionExclusions());
+
+      // Remove (un-exclude) timestamp column.
+      exclusions.remove(timestampSpec.getTimestampColumn());
+
+      // Remove (un-exclude) all transform inputs.
+      exclusions.removeAll(transformSpec.getRequiredColumns());
+
+      // Remove (un-exclude) all aggregator inputs that are *not* transform outputs.
+      for (AggregatorFactory aggregator : aggregators) {
+        for (String column : aggregator.requiredFields()) {
+          if (!transformOutputNames.contains(column)) {
+            exclusions.remove(column);
+          }
+        }
+      }
+
+      return ColumnsFilter.exclusionBased(exclusions);
+    }
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 4342460..8ff872d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -33,6 +33,7 @@ import org.apache.druid.data.input.Row;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimedShutoffInputSourceReader;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.input.InputRowSchemas;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.io.Closer;
@@ -51,8 +52,7 @@ import org.apache.druid.segment.indexing.DataSchema;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -175,10 +175,10 @@ public class InputSourceSampler
         columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
 
         for (Row row : index) {
-          Map<String, Object> parsed = new HashMap<>();
+          Map<String, Object> parsed = new LinkedHashMap<>();
 
-          columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
           parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
+          columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
 
           Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
           if (sortKey != null) {
@@ -215,14 +215,7 @@ public class InputSourceSampler
       File tempDir
   )
   {
-    final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
-                                            .map(AggregatorFactory::getName)
-                                            .collect(Collectors.toList());
-    final InputRowSchema inputRowSchema = new InputRowSchema(
-        dataSchema.getTimestampSpec(),
-        dataSchema.getDimensionsSpec(),
-        metricsNames
-    );
+    final InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(dataSchema);
 
     InputSourceReader reader = inputSource.reader(inputRowSchema, inputFormat, tempDir);
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 3bbf7df..ea8a5ba 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -61,6 +61,7 @@ import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
 import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.RealtimeIndexTask;
+import org.apache.druid.indexing.input.InputRowSchemas;
 import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -70,7 +71,6 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
@@ -106,7 +106,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -245,13 +244,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     this.task = task;
     this.ioConfig = task.getIOConfig();
     this.tuningConfig = task.getTuningConfig();
-    this.inputRowSchema = new InputRowSchema(
-        task.getDataSchema().getTimestampSpec(),
-        task.getDataSchema().getDimensionsSpec(),
-        Arrays.stream(task.getDataSchema().getAggregators())
-              .map(AggregatorFactory::getName)
-              .collect(Collectors.toList())
-    );
+    this.inputRowSchema = InputRowSchemas.fromDataSchema(task.getDataSchema());
     this.inputFormat = ioConfig.getInputFormat();
     this.parser = parser;
     this.authorizerMapper = authorizerMapper;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index 3dc9988..ef75df5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -102,7 +102,7 @@ public class TaskToolboxTest
     EasyMock.replay(task, mockHandoffNotifierFactory);
 
     taskToolbox = new TaskToolboxFactory(
-        new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null),
+        new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null, false),
         new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
         mockTaskActionClientFactory,
         mockEmitter,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 08ea6fa..f6b09cb 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1505,7 +1505,18 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
     };
 
     taskLockbox = new TaskLockbox(taskStorage, mdc);
-    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
+    final TaskConfig taskConfig = new TaskConfig(
+        directory.getPath(),
+        null,
+        null,
+        50000,
+        null,
+        true,
+        null,
+        null,
+        null,
+        false
+    );
 
     final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
         taskLockbox,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index f3ee50c..c835397 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.CompactionTask.Builder;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
@@ -1188,7 +1189,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
     );
 
     return new TaskToolbox(
-        null,
+        new TaskConfig(null, null, null, null, null, false, null, null, null, false),
         null,
         createActionClient(task),
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 35b242a..4f386ab 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -1479,7 +1479,11 @@ public class CompactionTaskTest
       final DataSchema dataSchema = ingestionSchema.getDataSchema();
       Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
 
-      Assert.assertEquals(new TimestampSpec(null, null, null), dataSchema.getTimestampSpec());
+      Assert.assertEquals(
+          new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
+          dataSchema.getTimestampSpec()
+      );
+
       Assert.assertEquals(
           new HashSet<>(expectedDimensionsSpec.getDimensions()),
           new HashSet<>(dataSchema.getDimensionsSpec().getDimensions())
@@ -1511,11 +1515,6 @@ public class CompactionTaskTest
       Assert.assertEquals(expectedSegmentIntervals.get(i), druidInputSource.getInterval());
       Assert.assertNull(druidInputSource.getDimFilter());
 
-      Assert.assertEquals(
-          new HashSet<>(expectedDimensionsSpec.getDimensionNames()),
-          new HashSet<>(druidInputSource.getDimensions())
-      );
-
       // assert tuningConfig
       Assert.assertEquals(expectedTuningConfig, ingestionSchema.getTuningConfig());
     }
@@ -1552,7 +1551,7 @@ public class CompactionTaskTest
     )
     {
       super(
-          null,
+          new TaskConfig(null, null, null, null, null, false, null, null, null, false),
           null,
           taskActionClient,
           null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
index 990888e..caaeea2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
@@ -116,7 +116,8 @@ public class HadoopTaskTest
         false,
         null,
         null,
-        null
+        null,
+        false
     )).once();
     EasyMock.replay(toolbox);
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 3d862a5..6a6aae2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.actions.TaskActionToolbox;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -311,7 +312,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
         );
 
         final TaskToolbox box = new TaskToolbox(
-            null,
+            new TaskConfig(null, null, null, null, null, false, null, null, null, false),
             new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
             taskActionClient,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 79c0099..5deecaf 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -888,7 +888,18 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
       final File directory
   )
   {
-    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
+    final TaskConfig taskConfig = new TaskConfig(
+        directory.getPath(),
+        null,
+        null,
+        50000,
+        null,
+        true,
+        null,
+        null,
+        null,
+        false
+    );
     final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc);
     try {
       taskStorage.insert(task, TaskStatus.running(task.getId()));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index f3cd4f5..86d7ad1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -202,7 +202,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
             false,
             null,
             null,
-            ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null))
+            ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
+            false
         ),
         null
     );
@@ -519,6 +520,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
 
   public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO)
   {
+    final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false);
+
     objectMapper.setInjectableValues(
         new InjectableValues.Std()
             .addValue(ExprMacroTable.class, LookupEnabledTestExprMacroTable.INSTANCE)
@@ -535,6 +538,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
             .addValue(CoordinatorClient.class, coordinatorClient)
             .addValue(SegmentLoaderFactory.class, new SegmentLoaderFactory(indexIO, objectMapper))
             .addValue(RetryPolicyFactory.class, new RetryPolicyFactory(new RetryPolicyConfig()))
+            .addValue(TaskConfig.class, taskConfig)
     );
     objectMapper.registerSubtypes(
         new NamedType(ParallelIndexSupervisorTask.class, ParallelIndexSupervisorTask.TYPE),
@@ -550,7 +554,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
   protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException
   {
     return new TaskToolbox(
-        null,
+        new TaskConfig(null, null, null, null, null, false, null, null, null, false),
         new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
         actionClient,
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
new file mode 100644
index 0000000..dcdc537
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.input;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.guice.IndexingServiceInputSourceModule;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.firehose.WindowedSegmentId;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.TestHelper;
+import org.easymock.EasyMock;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class DruidInputSourceTest
+{
+  private final IndexIO indexIO = EasyMock.createMock(IndexIO.class);
+  private final CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
+  private final SegmentLoaderFactory segmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
+  private final RetryPolicyFactory retryPolicyFactory = EasyMock.createMock(RetryPolicyFactory.class);
+  private final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
+
+  private ObjectMapper mapper = null;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Before
+  public void setUp()
+  {
+    mapper = TestHelper.makeJsonMapper();
+    mapper.registerModules(new IndexingServiceInputSourceModule().getJacksonModules());
+
+    final InjectableValues.Std injectableValues = (InjectableValues.Std) mapper.getInjectableValues();
+    injectableValues.addValue(IndexIO.class, indexIO);
+    injectableValues.addValue(CoordinatorClient.class, coordinatorClient);
+    injectableValues.addValue(SegmentLoaderFactory.class, segmentLoaderFactory);
+    injectableValues.addValue(RetryPolicyFactory.class, retryPolicyFactory);
+    injectableValues.addValue(TaskConfig.class, taskConfig);
+  }
+
+  @Test
+  public void testSerdeUsingIntervals() throws Exception
+  {
+    final String json = "{"
+                        + "\"type\":\"druid\","
+                        + "\"dataSource\":\"foo\","
+                        + "\"interval\":\"2000-01-01T00:00:00.000Z/2001-01-01T00:00:00.000Z\""
+                        + "}";
+
+    final InputSource inputSource = mapper.readValue(json, InputSource.class);
+
+    Assert.assertThat(inputSource, CoreMatchers.instanceOf(DruidInputSource.class));
+    Assert.assertEquals(
+        new DruidInputSource(
+            "foo",
+            Intervals.of("2000/2001"),
+            null,
+            null,
+            null,
+            null,
+            indexIO,
+            coordinatorClient,
+            segmentLoaderFactory,
+            retryPolicyFactory,
+            taskConfig
+        ),
+        inputSource
+    );
+
+    Assert.assertEquals(json, mapper.writeValueAsString(inputSource));
+  }
+
+  @Test
+  public void testSerdeUsingIntervalsAndLegacyDimensionsMetrics() throws Exception
+  {
+    final String json = "{"
+                        + "\"type\":\"druid\","
+                        + "\"dataSource\":\"foo\","
+                        + "\"interval\":\"2000-01-01T00:00:00.000Z/2001-01-01T00:00:00.000Z\","
+                        + "\"dimensions\":[\"a\"],"
+                        + "\"metrics\":[\"b\"]"
+                        + "}";
+
+    final InputSource inputSource = mapper.readValue(json, InputSource.class);
+
+    Assert.assertThat(inputSource, CoreMatchers.instanceOf(DruidInputSource.class));
+    Assert.assertEquals(
+        new DruidInputSource(
+            "foo",
+            Intervals.of("2000/2001"),
+            null,
+            null,
+            ImmutableList.of("a"),
+            ImmutableList.of("b"),
+            indexIO,
+            coordinatorClient,
+            segmentLoaderFactory,
+            retryPolicyFactory,
+            taskConfig
+        ),
+        inputSource
+    );
+
+    Assert.assertEquals(json, mapper.writeValueAsString(inputSource));
+  }
+
+  @Test
+  public void testSerdeUsingSegments() throws Exception
+  {
+    final String json = "{"
+                        + "\"type\":\"druid\","
+                        + "\"dataSource\":\"foo\","
+                        + "\"segments\":["
+                        + "{\"segmentId\":\"foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123\","
+                        + "\"intervals\":[\"2000-01-01T00:00:00.000Z/2000-01-01T12:00:00.000Z\"]}"
+                        + "]"
+                        + "}";
+
+    final InputSource inputSource = mapper.readValue(json, InputSource.class);
+
+    Assert.assertThat(inputSource, CoreMatchers.instanceOf(DruidInputSource.class));
+    Assert.assertEquals(
+        new DruidInputSource(
+            "foo",
+            null,
+            ImmutableList.of(
+                new WindowedSegmentId(
+                    "foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123",
+                    ImmutableList.of(Intervals.of("2000-01-01T00/2000-01-01T12"))
+                )
+            ),
+            null,
+            null,
+            null,
+            indexIO,
+            coordinatorClient,
+            segmentLoaderFactory,
+            retryPolicyFactory,
+            taskConfig
+        ),
+        inputSource
+    );
+
+    Assert.assertEquals(json, mapper.writeValueAsString(inputSource));
+  }
+
+  @Test
+  public void testSerdeUsingBothIntervalsAndSegments() throws Exception
+  {
+    final String json = "{"
+                        + "\"type\":\"druid\","
+                        + "\"dataSource\":\"foo\","
+                        + "\"interval\":\"2000-01-01T00:00:00.000Z/2001-01-01T00:00:00.000Z\","
+                        + "\"segments\":["
+                        + "  {\"segmentId\":\"foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123\","
+                        + "   \"intervals\":[\"2000-01-01T00:00:00.000Z/2000-01-01T12:00:00.000Z\"]}"
+                        + "]"
+                        + "}";
+
+
+    expectedException.expect(JsonProcessingException.class);
+    expectedException.expectMessage("Specify exactly one of 'interval' and 'segments'");
+
+    mapper.readValue(json, InputSource.class);
+  }
+
+  @Test
+  public void testSerdeUsingNeitherIntervalsNorSegments() throws Exception
+  {
+    final String json = "{"
+                        + "\"type\":\"druid\","
+                        + "\"dataSource\":\"foo\""
+                        + "}";
+
+    expectedException.expect(JsonProcessingException.class);
+    expectedException.expectMessage("Specify exactly one of 'interval' and 'segments'");
+
+    mapper.readValue(json, InputSource.class);
+  }
+
+  @Test
+  public void testSerdeUsingNoDataSource() throws Exception
+  {
+    final String json = "{"
+                        + "\"type\":\"druid\","
+                        + "\"interval\":\"2000-01-01T00:00:00.000Z/2001-01-01T00:00:00.000Z\""
+                        + "}";
+
+    expectedException.expect(JsonProcessingException.class);
+    expectedException.expectMessage("dataSource");
+
+    mapper.readValue(json, InputSource.class);
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
index b3f5142..9270f5f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
@@ -19,23 +19,528 @@
 
 package org.apache.druid.indexing.input;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.druid.common.config.NullHandlingTest;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntity.CleanableFile;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.hll.HyperLogLogHash;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.guava.BaseSequence;
 import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
-public class DruidSegmentReaderTest
+public class DruidSegmentReaderTest extends NullHandlingTest
 {
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private File segmentDirectory;
+
+  private final IndexIO indexIO = TestHelper.getTestIndexIO();
+
+  @Before
+  public void setUp() throws IOException
+  {
+    // Write a segment with two rows in it, with columns: s (string), d (double), cnt (long), met_s (complex).
+    final IncrementalIndex<?> incrementalIndex =
+        IndexBuilder.create()
+                    .schema(
+                        new IncrementalIndexSchema.Builder()
+                            .withDimensionsSpec(
+                                new DimensionsSpec(
+                                    ImmutableList.of(
+                                        StringDimensionSchema.create("s"),
+                                        new DoubleDimensionSchema("d")
+                                    )
+                                )
+                            )
+                            .withMetrics(
+                                new CountAggregatorFactory("cnt"),
+                                new HyperUniquesAggregatorFactory("met_s", "s")
+                            )
+                            .withRollup(false)
+                            .build()
+                    )
+                    .rows(
+                        ImmutableList.of(
+                            new MapBasedInputRow(
+                                DateTimes.of("2000"),
+                                ImmutableList.of("s", "d"),
+                                ImmutableMap.<String, Object>builder()
+                                    .put("s", "foo")
+                                    .put("d", 1.23)
+                                    .build()
+                            ),
+                            new MapBasedInputRow(
+                                DateTimes.of("2000T01"),
+                                ImmutableList.of("s", "d"),
+                                ImmutableMap.<String, Object>builder()
+                                    .put("s", "bar")
+                                    .put("d", 4.56)
+                                    .build()
+                            )
+                        )
+                    )
+                    .buildIncrementalIndex();
+
+    segmentDirectory = temporaryFolder.newFolder();
+
+    try {
+      TestHelper.getTestIndexMergerV9(
+          OnHeapMemorySegmentWriteOutMediumFactory.instance()
+      ).persist(
+          incrementalIndex,
+          segmentDirectory,
+          new IndexSpec(),
+          null
+      );
+    }
+    finally {
+      incrementalIndex.close();
+    }
+  }
+
+  @Test
+  public void testReader() throws IOException
+  {
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        makeInputEntity(Intervals.of("2000/P1D")),
+        indexIO,
+        new TimestampSpec("__time", "millis", DateTimes.of("1971")),
+        new DimensionsSpec(
+            ImmutableList.of(
+                StringDimensionSchema.create("s"),
+                new DoubleDimensionSchema("d")
+            )
+        ),
+        ColumnsFilter.all(),
+        null,
+        temporaryFolder.newFolder()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new MapBasedInputRow(
+                DateTimes.of("2000"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T").getMillis())
+                    .put("s", "foo")
+                    .put("d", 1.23d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("foo"))
+                    .build()
+            ),
+            new MapBasedInputRow(
+                DateTimes.of("2000T01"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T01").getMillis())
+                    .put("s", "bar")
+                    .put("d", 4.56d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("bar"))
+                    .build()
+            )
+        ),
+        readRows(reader)
+    );
+  }
+
+  @Test
+  public void testReaderAutoTimestampFormat() throws IOException
+  {
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        makeInputEntity(Intervals.of("2000/P1D")),
+        indexIO,
+        new TimestampSpec("__time", "auto", DateTimes.of("1971")),
+        new DimensionsSpec(
+            ImmutableList.of(
+                StringDimensionSchema.create("s"),
+                new DoubleDimensionSchema("d")
+            )
+        ),
+        ColumnsFilter.all(),
+        null,
+        temporaryFolder.newFolder()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new MapBasedInputRow(
+                DateTimes.of("2000"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T").getMillis())
+                    .put("s", "foo")
+                    .put("d", 1.23d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("foo"))
+                    .build()
+            ),
+            new MapBasedInputRow(
+                DateTimes.of("2000T01"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T01").getMillis())
+                    .put("s", "bar")
+                    .put("d", 4.56d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("bar"))
+                    .build()
+            )
+        ),
+        readRows(reader)
+    );
+  }
+
+  @Test
+  public void testReaderWithDimensionExclusions() throws IOException
+  {
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        makeInputEntity(Intervals.of("2000/P1D")),
+        indexIO,
+        new TimestampSpec("__time", "millis", DateTimes.of("1971")),
+        new DimensionsSpec(
+            ImmutableList.of(),
+            ImmutableList.of("__time", "s", "cnt", "met_s"),
+            ImmutableList.of()
+        ),
+        ColumnsFilter.all(),
+        null,
+        temporaryFolder.newFolder()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new MapBasedInputRow(
+                DateTimes.of("2000"),
+                ImmutableList.of("d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T").getMillis())
+                    .put("s", "foo")
+                    .put("d", 1.23d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("foo"))
+                    .build()
+            ),
+            new MapBasedInputRow(
+                DateTimes.of("2000T01"),
+                ImmutableList.of("d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T01").getMillis())
+                    .put("s", "bar")
+                    .put("d", 4.56d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("bar"))
+                    .build()
+            )
+        ),
+        readRows(reader)
+    );
+  }
+
+  @Test
+  public void testReaderWithInclusiveColumnsFilter() throws IOException
+  {
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        makeInputEntity(Intervals.of("2000/P1D")),
+        indexIO,
+        new TimestampSpec("__time", "millis", DateTimes.of("1971")),
+        new DimensionsSpec(
+            ImmutableList.of(
+                StringDimensionSchema.create("s"),
+                new DoubleDimensionSchema("d")
+            )
+        ),
+        ColumnsFilter.inclusionBased(ImmutableSet.of("__time", "s", "d")),
+        null,
+        temporaryFolder.newFolder()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new MapBasedInputRow(
+                DateTimes.of("2000"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T").getMillis())
+                    .put("s", "foo")
+                    .put("d", 1.23d)
+                    .build()
+            ),
+            new MapBasedInputRow(
+                DateTimes.of("2000T01"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T01").getMillis())
+                    .put("s", "bar")
+                    .put("d", 4.56d)
+                    .build()
+            )
+        ),
+        readRows(reader)
+    );
+  }
+
+  @Test
+  public void testReaderWithInclusiveColumnsFilterNoTimestamp() throws IOException
+  {
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        makeInputEntity(Intervals.of("2000/P1D")),
+        indexIO,
+        new TimestampSpec("__time", "millis", DateTimes.of("1971")),
+        new DimensionsSpec(
+            ImmutableList.of(
+                StringDimensionSchema.create("s"),
+                new DoubleDimensionSchema("d")
+            )
+        ),
+        ColumnsFilter.inclusionBased(ImmutableSet.of("s", "d")),
+        null,
+        temporaryFolder.newFolder()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new MapBasedInputRow(
+                DateTimes.of("1971"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("s", "foo")
+                    .put("d", 1.23d)
+                    .build()
+            ),
+            new MapBasedInputRow(
+                DateTimes.of("1971"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("s", "bar")
+                    .put("d", 4.56d)
+                    .build()
+            )
+        ),
+        readRows(reader)
+    );
+  }
+
+  @Test
+  public void testReaderWithFilter() throws IOException
+  {
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        makeInputEntity(Intervals.of("2000/P1D")),
+        indexIO,
+        new TimestampSpec("__time", "millis", DateTimes.of("1971")),
+        new DimensionsSpec(
+            ImmutableList.of(
+                StringDimensionSchema.create("s"),
+                new DoubleDimensionSchema("d")
+            )
+        ),
+        ColumnsFilter.all(),
+        new SelectorDimFilter("d", "1.23", null),
+        temporaryFolder.newFolder()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new MapBasedInputRow(
+                DateTimes.of("2000"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T").getMillis())
+                    .put("s", "foo")
+                    .put("d", 1.23d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("foo"))
+                    .build()
+            )
+        ),
+        readRows(reader)
+    );
+  }
+
+  @Test
+  public void testReaderTimestampFromDouble() throws IOException
+  {
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        makeInputEntity(Intervals.of("2000/P1D")),
+        indexIO,
+        new TimestampSpec("d", "posix", null),
+        new DimensionsSpec(
+            ImmutableList.of(
+                StringDimensionSchema.create("s"),
+                new DoubleDimensionSchema("d")
+            )
+        ),
+        ColumnsFilter.all(),
+        null,
+        temporaryFolder.newFolder()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new MapBasedInputRow(
+                DateTimes.of("1970-01-01T00:00:01.000Z"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T").getMillis())
+                    .put("s", "foo")
+                    .put("d", 1.23d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("foo"))
+                    .build()
+            ),
+            new MapBasedInputRow(
+                DateTimes.of("1970-01-01T00:00:04.000Z"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T01").getMillis())
+                    .put("s", "bar")
+                    .put("d", 4.56d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("bar"))
+                    .build()
+            )
+        ),
+        readRows(reader)
+    );
+  }
+
+  @Test
+  public void testReaderTimestampAsPosixIncorrectly() throws IOException
+  {
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        makeInputEntity(Intervals.of("2000/P1D")),
+        indexIO,
+        new TimestampSpec("__time", "posix", null),
+        new DimensionsSpec(
+            ImmutableList.of(
+                StringDimensionSchema.create("s"),
+                new DoubleDimensionSchema("d")
+            )
+        ),
+        ColumnsFilter.all(),
+        null,
+        temporaryFolder.newFolder()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new MapBasedInputRow(
+                DateTimes.of("31969-04-01T00:00:00.000Z"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T").getMillis())
+                    .put("s", "foo")
+                    .put("d", 1.23d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("foo"))
+                    .build()
+            ),
+            new MapBasedInputRow(
+                DateTimes.of("31969-05-12T16:00:00.000Z"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T01").getMillis())
+                    .put("s", "bar")
+                    .put("d", 4.56d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("bar"))
+                    .build()
+            )
+        ),
+        readRows(reader)
+    );
+  }
+
+  @Test
+  public void testReaderTimestampSpecDefault() throws IOException
+  {
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        makeInputEntity(Intervals.of("2000/P1D")),
+        indexIO,
+        new TimestampSpec(null, null, DateTimes.of("1971")),
+        new DimensionsSpec(
+            ImmutableList.of(
+                StringDimensionSchema.create("s"),
+                new DoubleDimensionSchema("d")
+            )
+        ),
+        ColumnsFilter.all(),
+        null,
+        temporaryFolder.newFolder()
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new MapBasedInputRow(
+                DateTimes.of("1971"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T").getMillis())
+                    .put("s", "foo")
+                    .put("d", 1.23d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("foo"))
+                    .build()
+            ),
+            new MapBasedInputRow(
+                DateTimes.of("1971"),
+                ImmutableList.of("s", "d"),
+                ImmutableMap.<String, Object>builder()
+                    .put("__time", DateTimes.of("2000T01").getMillis())
+                    .put("s", "bar")
+                    .put("d", 4.56d)
+                    .put("cnt", 1L)
+                    .put("met_s", makeHLLC("bar"))
+                    .build()
+            )
+        ),
+        readRows(reader)
+    );
+  }
+
   @Test
   public void testMakeCloseableIteratorFromSequenceAndSegmentFileCloseYielderOnClose() throws IOException
   {
@@ -80,4 +585,65 @@ public class DruidSegmentReaderTest
     Assert.assertTrue("File is not closed", isFileClosed.booleanValue());
     Assert.assertTrue("Sequence is not closed", isSequenceClosed.booleanValue());
   }
+
+  private DruidSegmentInputEntity makeInputEntity(final Interval interval)
+  {
+    return new DruidSegmentInputEntity(
+        new SegmentLoader()
+        {
+          @Override
+          public boolean isSegmentLoaded(DataSegment segment)
+          {
+            throw new UnsupportedOperationException("unused");
+          }
+
+          @Override
+          public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
+          {
+            throw new UnsupportedOperationException("unused");
+          }
+
+          @Override
+          public File getSegmentFiles(DataSegment segment)
+          {
+            return segmentDirectory;
+          }
+
+          @Override
+          public void cleanup(DataSegment segment)
+          {
+            throw new UnsupportedOperationException("unused");
+          }
+        },
+        DataSegment.builder()
+                   .dataSource("ds")
+                   .dimensions(ImmutableList.of("s", "d"))
+                   .metrics(ImmutableList.of("cnt", "met_s"))
+                   .interval(Intervals.of("2000/P1D"))
+                   .version("1")
+                   .size(0)
+                   .build(),
+        interval
+    );
+  }
+
+  private List<InputRow> readRows(final DruidSegmentReader reader) throws IOException
+  {
+    final List<InputRow> rows = new ArrayList<>();
+    try (final CloseableIterator<Map<String, Object>> iterator = reader.intermediateRowIterator()) {
+      while (iterator.hasNext()) {
+        rows.addAll(reader.parseInputRows(iterator.next()));
+      }
+    }
+    return rows;
+  }
+
+  private static HyperLogLogCollector makeHLLC(final String... values)
+  {
+    final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
+    for (String value : values) {
+      collector.add(HyperLogLogHash.getDefault().hash(value));
+    }
+    return collector;
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java
new file mode 100644
index 0000000..7241a27
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.input;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.common.config.NullHandlingTest;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.transform.ExpressionTransform;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InputRowSchemasTest extends NullHandlingTest
+{
+  @Test
+  public void test_createColumnsFilter_normal()
+  {
+    final ColumnsFilter columnsFilter = InputRowSchemas.createColumnsFilter(
+        new TimestampSpec("ts", "auto", null),
+        new DimensionsSpec(
+            ImmutableList.of(StringDimensionSchema.create("foo")),
+            ImmutableList.of(),
+            ImmutableList.of()
+        ),
+        new TransformSpec(
+            new SelectorDimFilter("bar", "x", null),
+            ImmutableList.of(
+                new ExpressionTransform("baz", "qux + 3", ExprMacroTable.nil())
+            )
+        ),
+        new AggregatorFactory[]{
+            new LongSumAggregatorFactory("billy", "bob")
+        }
+    );
+
+    Assert.assertEquals(
+        ColumnsFilter.inclusionBased(
+            ImmutableSet.of(
+                "ts",
+                "foo",
+                "bar",
+                "qux",
+                "bob"
+            )
+        ),
+        columnsFilter
+    );
+  }
+
+  @Test
+  public void test_createColumnsFilter_schemaless()
+  {
+    final ColumnsFilter columnsFilter = InputRowSchemas.createColumnsFilter(
+        new TimestampSpec("ts", "auto", null),
+        new DimensionsSpec(
+            ImmutableList.of(),
+            ImmutableList.of("ts", "foo", "bar", "qux", "bob"),
+            ImmutableList.of()
+        ),
+        new TransformSpec(
+            new SelectorDimFilter("bar", "x", null),
+            ImmutableList.of(
+                new ExpressionTransform("baz", "qux + 3", ExprMacroTable.nil())
+            )
+        ),
+        new AggregatorFactory[]{
+            new LongSumAggregatorFactory("billy", "bob")
+        }
+    );
+
+    Assert.assertEquals(
+        ColumnsFilter.exclusionBased(
+            ImmutableSet.of(
+                "foo"
+            )
+        ),
+        columnsFilter
+    );
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 49b2ce7..120c709 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -89,7 +89,8 @@ public class SingleTaskBackgroundRunnerTest
         true,
         null,
         null,
-        null
+        null,
+        false
     );
     final ServiceEmitter emitter = new NoopServiceEmitter();
     final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 8bb8e3e..d39905e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -599,7 +599,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
         new TaskAuditLogConfig(true)
     );
     File tmpDir = temporaryFolder.newFolder();
-    taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null);
+    taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false);
 
     return new TaskToolboxFactory(
         taskConfig,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
index fba7687..036f230 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.seekablestream;
 
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
@@ -79,7 +80,7 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
         new InputRowSchema(
             new TimestampSpec("col_0", "auto", null),
             new DimensionsSpec(DimensionsSpec.getDefaultSchemas(colNames.subList(1, colNames.size()))),
-            Collections.emptyList()
+            ColumnsFilter.all()
         ),
         inputFormat,
         temporaryFolder.newFolder()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
index 97dc280..1cab704 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.seekablestream;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
@@ -109,7 +110,7 @@ public class StreamChunkParserTest
     final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
         null,
         inputFormat,
-        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
+        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()),
         TransformSpec.NONE,
         temporaryFolder.newFolder(),
         row -> true,
@@ -157,7 +158,7 @@ public class StreamChunkParserTest
     final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
         parser,
         inputFormat,
-        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
+        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()),
         TransformSpec.NONE,
         temporaryFolder.newFolder(),
         row -> true,
@@ -179,7 +180,7 @@ public class StreamChunkParserTest
     final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
         null,
         inputFormat,
-        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
+        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()),
         TransformSpec.NONE,
         temporaryFolder.newFolder(),
         row -> true,
@@ -202,7 +203,7 @@ public class StreamChunkParserTest
     final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
         null,
         inputFormat,
-        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
+        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()),
         TransformSpec.NONE,
         temporaryFolder.newFolder(),
         row -> true,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 8e5984c..dbc2233 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -88,7 +88,8 @@ public class WorkerTaskManagerTest
         false,
         null,
         null,
-        null
+        null,
+        false
     );
     TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
     TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index b0bdbc8..fdf7aa4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -162,7 +162,8 @@ public class WorkerTaskMonitorTest
         false,
         null,
         null,
-        null
+        null,
+        false
     );
     TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
     TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
index dcce480..76f26fe 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
@@ -87,7 +87,8 @@ public class IntermediaryDataManagerAutoCleanupTest
         false,
         null,
         null,
-        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null))
+        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
+        false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
     {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
index 4db1b39..3c87bf4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -70,7 +70,8 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
         false,
         null,
         null,
-        ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null))
+        ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)),
+        false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
     intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
index 0604742..54f6c0e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
@@ -69,7 +69,8 @@ public class ShuffleDataSegmentPusherTest
         false,
         null,
         null,
-        ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null))
+        ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
+        false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
     intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
index bd1b211..741956a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
@@ -95,7 +95,8 @@ public class ShuffleResourceTest
         false,
         null,
         null,
-        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null))
+        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
+        false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
     {
diff --git a/integration-tests/docker/environment-configs/common b/integration-tests/docker/environment-configs/common
index 72c797b..815596f 100644
--- a/integration-tests/docker/environment-configs/common
+++ b/integration-tests/docker/environment-configs/common
@@ -68,3 +68,7 @@ druid_indexer_logs_directory=/shared/tasklogs
 druid_sql_enable=true
 druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
 druid_request_logging_type=slf4j
+
+# Testing the legacy config from https://github.com/apache/druid/pull/10267
+# Can remove this when the flag is no longer needed
+druid_indexer_task_ignoreTimestampSpecForDruidInputSource=true
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 833e5eb..e2b141f 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -129,7 +129,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
           fullDatasourceName,
           AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
           0,
-          22482,
+          22481,
           0,
           0,
           3,
@@ -275,7 +275,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
           fullDatasourceName,
           AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
           0,
-          22482,
+          22481,
           0,
           0,
           3,
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json
index 91702a4..46af17a 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_druid_input_source_index_task.json
@@ -10,7 +10,8 @@
         ]
       },
       "timestampSpec": {
-        "column": "timestamp"
+        "column": "ignored-see-ignoreTimestampSpecForDruidInputSource",
+        "format": "iso"
       },
       "metricsSpec": [
         {
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json b/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json
index 3a5934c..cf2415c 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json
@@ -24,7 +24,7 @@
             },
             "timestampSpec": {
                 "column": "__time",
-                "format": "iso"
+                "format": "millis"
             },
             "dimensionsSpec": {
                 "dimensionExclusions" : ["robot", "continent"]
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task_with_transforms.json b/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task_with_transforms.json
index 3e8a44c..2c2b037 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task_with_transforms.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task_with_transforms.json
@@ -24,7 +24,7 @@
             },
             "timestampSpec": {
                 "column": "__time",
-                "format": "iso"
+                "format": "millis"
             },
             "dimensionsSpec": {
                 "dimensions": [
diff --git a/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java b/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
index 16bad31..2ace9b0 100644
--- a/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
+++ b/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprMacroTable;
@@ -32,12 +33,15 @@ import org.apache.druid.segment.virtual.ExpressionSelectors;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
 
 public class ExpressionTransform implements Transform
 {
   private final String name;
   private final String expression;
   private final ExprMacroTable macroTable;
+  private final Supplier<Expr> parsedExpression;
 
   @JsonCreator
   public ExpressionTransform(
@@ -49,6 +53,9 @@ public class ExpressionTransform implements Transform
     this.name = Preconditions.checkNotNull(name, "name");
     this.expression = Preconditions.checkNotNull(expression, "expression");
     this.macroTable = macroTable;
+    this.parsedExpression = Suppliers.memoize(
+        () -> Parser.parse(expression, Preconditions.checkNotNull(this.macroTable, "macroTable"))
+    )::get;
   }
 
   @JsonProperty
@@ -67,8 +74,13 @@ public class ExpressionTransform implements Transform
   @Override
   public RowFunction getRowFunction()
   {
-    final Expr expr = Parser.parse(expression, Preconditions.checkNotNull(this.macroTable, "macroTable"));
-    return new ExpressionRowFunction(expr);
+    return new ExpressionRowFunction(parsedExpression.get());
+  }
+
+  @Override
+  public Set<String> getRequiredColumns()
+  {
+    return parsedExpression.get().analyzeInputs().getRequiredBindings();
   }
 
   static class ExpressionRowFunction implements RowFunction
diff --git a/processing/src/main/java/org/apache/druid/segment/transform/Transform.java b/processing/src/main/java/org/apache/druid/segment/transform/Transform.java
index a481a4c..8b6f75f 100644
--- a/processing/src/main/java/org/apache/druid/segment/transform/Transform.java
+++ b/processing/src/main/java/org/apache/druid/segment/transform/Transform.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.druid.guice.annotations.ExtensionPoint;
 
+import java.util.Set;
+
 /**
  * A row transform that is part of a {@link TransformSpec}. Transforms allow adding new fields to input rows. Each
  * one has a "name" (the name of the new field) which can be referred to by DimensionSpecs, AggregatorFactories, etc.
@@ -52,4 +54,9 @@ public interface Transform
    * as output.
    */
   RowFunction getRowFunction();
+
+  /**
+   * Returns the names of all columns that this transform is going to read.
+   */
+  Set<String> getRequiredColumns();
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java
index 6de7ac9..1391da3 100644
--- a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java
+++ b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java
@@ -126,6 +126,21 @@ public class TransformSpec
     return new Transformer(this);
   }
 
+  public Set<String> getRequiredColumns()
+  {
+    final Set<String> requiredColumns = new HashSet<>();
+
+    if (filter != null) {
+      requiredColumns.addAll(filter.getRequiredColumns());
+    }
+
+    for (Transform transform : transforms) {
+      requiredColumns.addAll(transform.getRequiredColumns());
+    }
+
+    return requiredColumns;
+  }
+
   @Override
   public boolean equals(final Object o)
   {
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/TransformSpecTest.java b/processing/src/test/java/org/apache/druid/segment/transform/TransformSpecTest.java
similarity index 91%
rename from server/src/test/java/org/apache/druid/segment/indexing/TransformSpecTest.java
rename to processing/src/test/java/org/apache/druid/segment/transform/TransformSpecTest.java
index 763a924..00d491e 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/TransformSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/transform/TransformSpecTest.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.druid.segment.indexing;
+package org.apache.druid.segment.transform;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
@@ -33,8 +34,6 @@ import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.AndDimFilter;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.transform.ExpressionTransform;
-import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -80,6 +79,11 @@ public class TransformSpecTest extends InitializedNullHandlingTest
         )
     );
 
+    Assert.assertEquals(
+        ImmutableSet.of("x", "y", "a", "b", "f", "g"),
+        transformSpec.getRequiredColumns()
+    );
+
     final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
     final InputRow row = parser.parseBatch(ROW1).get(0);
 
@@ -108,6 +112,11 @@ public class TransformSpecTest extends InitializedNullHandlingTest
         )
     );
 
+    Assert.assertEquals(
+        ImmutableSet.of("x", "y"),
+        transformSpec.getRequiredColumns()
+    );
+
     final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
     final InputRow row = parser.parseBatch(ROW1).get(0);
 
@@ -139,6 +148,12 @@ public class TransformSpecTest extends InitializedNullHandlingTest
         )
     );
 
+    Assert.assertEquals(
+        ImmutableSet.of("x", "f", "g", "y", "a", "b"),
+        transformSpec.getRequiredColumns()
+    );
+
+
     final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
     Assert.assertNotNull(parser.parseBatch(ROW1).get(0));
     Assert.assertNull(parser.parseBatch(ROW2).get(0));
@@ -154,6 +169,12 @@ public class TransformSpecTest extends InitializedNullHandlingTest
         )
     );
 
+    Assert.assertEquals(
+        ImmutableSet.of("a", "b"),
+        transformSpec.getRequiredColumns()
+    );
+
+
     final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
     final InputRow row = parser.parseBatch(ROW1).get(0);
 
@@ -172,6 +193,11 @@ public class TransformSpecTest extends InitializedNullHandlingTest
         )
     );
 
+    Assert.assertEquals(
+        ImmutableSet.of("__time"),
+        transformSpec.getRequiredColumns()
+    );
+
     final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
     final InputRow row = parser.parseBatch(ROW1).get(0);
 
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index f36f754..04be8c4 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -27,8 +27,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
@@ -36,6 +36,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.transform.TransformSpec;
@@ -45,8 +46,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 
 /**
@@ -55,7 +55,6 @@ import java.util.stream.Collectors;
 public class DataSchema
 {
   private static final Logger log = new Logger(DataSchema.class);
-  private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*");
   private final String dataSource;
   private final AggregatorFactory[] aggregators;
   private final GranularitySpec granularitySpec;
@@ -150,35 +149,47 @@ public class DataSchema
     IdUtils.validateId("dataSource", dataSource);
   }
 
+  /**
+   * Computes the {@link DimensionsSpec} that we will actually use. It is derived from, but not necessarily identical
+   * to, the one that we were given.
+   */
   private static DimensionsSpec computeDimensionsSpec(
-      TimestampSpec timestampSpec,
-      DimensionsSpec dimensionsSpec,
-      AggregatorFactory[] aggregators
+      final TimestampSpec timestampSpec,
+      final DimensionsSpec dimensionsSpec,
+      final AggregatorFactory[] aggregators
   )
   {
-    final Set<String> dimensionExclusions = new HashSet<>();
-
-    final String timestampColumn = timestampSpec.getTimestampColumn();
-    if (!(dimensionsSpec.hasCustomDimensions() && dimensionsSpec.getDimensionNames().contains(timestampColumn))) {
-      dimensionExclusions.add(timestampColumn);
-    }
-
-    for (AggregatorFactory aggregator : aggregators) {
-      dimensionExclusions.addAll(aggregator.requiredFields());
-      dimensionExclusions.add(aggregator.getName());
-    }
+    final Set<String> inputFieldNames = new HashSet<>();
+    final Set<String> outputFieldNames = new HashSet<>();
+
+    // Populate inputFieldNames.
+    inputFieldNames.add(timestampSpec.getTimestampColumn());
+    inputFieldNames.addAll(dimensionsSpec.getDimensionNames());
+    Arrays.stream(aggregators)
+          .flatMap(aggregator -> aggregator.requiredFields().stream())
+          .forEach(inputFieldNames::add);
+
+    // Populate outputFieldNames, validating along the way for lack of duplicates.
+    outputFieldNames.add(ColumnHolder.TIME_COLUMN_NAME);
+
+    Stream.concat(
+        dimensionsSpec.getDimensions().stream().map(DimensionSchema::getName),
+        Arrays.stream(aggregators).map(AggregatorFactory::getName)
+    ).forEach(
+        field -> {
+          if (!outputFieldNames.add(field)) {
+            throw new IAE("Cannot specify field [%s] more than once", field);
+          }
+        }
+    );
 
-    final Set<String> metSet = Arrays.stream(aggregators).map(AggregatorFactory::getName).collect(Collectors.toSet());
-    final Set<String> dimSet = new HashSet<>(dimensionsSpec.getDimensionNames());
-    final Set<String> overlap = Sets.intersection(metSet, dimSet);
-    if (!overlap.isEmpty()) {
-      throw new IAE(
-          "Cannot have overlapping dimensions and metrics of the same name. Please change the name of the metric. Overlap: %s",
-          overlap
-      );
-    }
+    // Set up additional exclusions: all inputs and outputs, minus defined dimensions.
+    final Set<String> additionalDimensionExclusions = new HashSet<>();
+    additionalDimensionExclusions.addAll(inputFieldNames);
+    additionalDimensionExclusions.addAll(outputFieldNames);
+    additionalDimensionExclusions.removeAll(dimensionsSpec.getDimensionNames());
 
-    return dimensionsSpec.withDimensionExclusions(Sets.difference(dimensionExclusions, dimSet));
+    return dimensionsSpec.withDimensionExclusions(additionalDimensionExclusions);
   }
 
   @JsonProperty
diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
index caeb39e..1e65c7b 100644
--- a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.commons.io.FileUtils;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -54,7 +55,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
@@ -80,7 +80,7 @@ public class SqlInputSourceTest
           new ArrayList<>(),
           new ArrayList<>()
       ),
-      Collections.emptyList()
+      ColumnsFilter.all()
   );
   @Rule
   public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 83fc9c6..0b9eb39 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -101,7 +101,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
     );
 
     Assert.assertEquals(
-        ImmutableSet.of("time", "col1", "col2", "metric1", "metric2"),
+        ImmutableSet.of("__time", "time", "col1", "col2", "metric1", "metric2"),
         schema.getDimensionsSpec().getDimensionExclusions()
     );
   }
@@ -139,7 +139,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
     );
 
     Assert.assertEquals(
-        ImmutableSet.of("dimC", "col1", "metric1", "metric2"),
+        ImmutableSet.of("__time", "dimC", "col1", "metric1", "metric2"),
         schema.getParser().getParseSpec().getDimensionsSpec().getDimensionExclusions()
     );
   }
@@ -409,7 +409,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
         actual.getParser().getParseSpec(),
         new JSONParseSpec(
             new TimestampSpec("xXx", null, null),
-            new DimensionsSpec(null, Arrays.asList("metric1", "xXx", "col1"), null),
+            new DimensionsSpec(null, Arrays.asList("__time", "metric1", "xXx", "col1"), null),
             null,
             null,
             null
diff --git a/web-console/e2e-tests/reindexing.spec.ts b/web-console/e2e-tests/reindexing.spec.ts
index a14262a..d32a6d9 100644
--- a/web-console/e2e-tests/reindexing.spec.ts
+++ b/web-console/e2e-tests/reindexing.spec.ts
@@ -115,50 +115,50 @@ function validateConnectLocalData(preview: string) {
   expect(firstLine).toBe(
     'Druid row: {' +
       '"__time":1442018818771' +
-      ',"isRobot":"false"' +
-      ',"countryIsoCode":null' +
-      ',"added":"36"' +
-      ',"regionName":null' +
       ',"channel":"#en.wikipedia"' +
-      ',"delta":"36"' +
-      ',"isUnpatrolled":"false"' +
-      ',"isNew":"false"' +
-      ',"isMinor":"false"' +
+      ',"comment":"added project"' +
       ',"isAnonymous":"false"' +
-      ',"deleted":"0"' +
-      ',"cityName":null' +
-      ',"metroCode":null' +
+      ',"isMinor":"false"' +
+      ',"isNew":"false"' +
+      ',"isRobot":"false"' +
+      ',"isUnpatrolled":"false"' +
       ',"namespace":"Talk"' +
-      ',"comment":"added project"' +
-      ',"countryName":null' +
       ',"page":"Talk:Oswald Tilghman"' +
       ',"user":"GELongstreet"' +
+      ',"added":"36"' +
+      ',"deleted":"0"' +
+      ',"delta":"36"' +
+      ',"cityName":null' +
+      ',"countryIsoCode":null' +
+      ',"countryName":null' +
       ',"regionIsoCode":null' +
+      ',"regionName":null' +
+      ',"metroCode":null' +
       '}',
   );
   const lastLine = lines[lines.length - 1];
   expect(lastLine).toBe(
     'Druid row: {' +
       '"__time":1442020314823' +
-      ',"isRobot":"false"' +
-      ',"countryIsoCode":null' +
-      ',"added":"1"' +
-      ',"regionName":null' +
       ',"channel":"#en.wikipedia"' +
-      ',"delta":"1"' +
-      ',"isUnpatrolled":"false"' +
-      ',"isNew":"false"' +
-      ',"isMinor":"true"' +
+      ',"comment":"/* History */[[WP:AWB/T|Typo fixing]], [[WP:AWB/T|typo(s) fixed]]: nothern → northern using [[Project:AWB|AWB]]"' +
       ',"isAnonymous":"false"' +
-      ',"deleted":"0"' +
-      ',"cityName":null' +
-      ',"metroCode":null' +
+      ',"isMinor":"true"' +
+      ',"isNew":"false"' +
+      ',"isRobot":"false"' +
+      ',"isUnpatrolled":"false"' +
       ',"namespace":"Main"' +
-      ',"comment":"/* History */[[WP:AWB/T|Typo fixing]], [[WP:AWB/T|typo(s) fixed]]: nothern → northern using [[Project:AWB|AWB]]"' +
-      ',"countryName":null' +
       ',"page":"Hapoel Katamon Jerusalem F.C."' +
       ',"user":"The Quixotic Potato"' +
+      ',"added":"1"' +
+      ',"deleted":"0"' +
+      ',"delta":"1"' +
+      ',"cityName":null' +
+      ',"countryIsoCode":null' +
+      ',"countryName":null' +
       ',"regionIsoCode":null' +
+      ',"regionName":null' +
+      ',"metroCode":null' +
       '}',
   );
 }
diff --git a/web-console/src/druid-models/ingestion-spec.tsx b/web-console/src/druid-models/ingestion-spec.tsx
index d4c9bdc..53146f5 100644
--- a/web-console/src/druid-models/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec.tsx
@@ -486,32 +486,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
           ),
         },
         {
-          name: 'inputSource.dimensions',
-          label: 'Dimensions',
-          type: 'string-array',
-          placeholder: '(optional)',
-          hideInMore: true,
-          info: (
-            <p>
-              The list of dimensions to select. If left empty, no dimensions are returned. If left
-              null or not defined, all dimensions are returned.
-            </p>
-          ),
-        },
-        {
-          name: 'inputSource.metrics',
-          label: 'Metrics',
-          type: 'string-array',
-          placeholder: '(optional)',
-          hideInMore: true,
-          info: (
-            <p>
-              The list of metrics to select. If left empty, no metrics are returned. If left null or
-              not defined, all metrics are selected.
-            </p>
-          ),
-        },
-        {
           name: 'inputSource.filter',
           label: 'Filter',
           type: 'json',
diff --git a/web-console/src/druid-models/timestamp-spec.tsx b/web-console/src/druid-models/timestamp-spec.tsx
index b6c595b..f6a8263 100644
--- a/web-console/src/druid-models/timestamp-spec.tsx
+++ b/web-console/src/druid-models/timestamp-spec.tsx
@@ -32,11 +32,18 @@ import { Transform } from './transform-spec';
 
 const NO_SUCH_COLUMN = '!!!_no_such_column_!!!';
 
+export const TIME_COLUMN = '__time';
+
 export const PLACEHOLDER_TIMESTAMP_SPEC: TimestampSpec = {
   column: NO_SUCH_COLUMN,
   missingValue: '1970-01-01T00:00:00Z',
 };
 
+export const REINDEX_TIMESTAMP_SPEC: TimestampSpec = {
+  column: TIME_COLUMN,
+  format: 'millis',
+};
+
 export const CONSTANT_TIMESTAMP_SPEC: TimestampSpec = {
   column: NO_SUCH_COLUMN,
   missingValue: '2010-01-01T00:00:00Z',
@@ -48,7 +55,7 @@ export function getTimestampSchema(spec: IngestionSpec): TimestampSchema {
   const transforms: Transform[] =
     deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || EMPTY_ARRAY;
 
-  const timeTransform = transforms.find(transform => transform.name === '__time');
+  const timeTransform = transforms.find(transform => transform.name === TIME_COLUMN);
   if (timeTransform) return 'expression';
 
   const timestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec') || EMPTY_OBJECT;
@@ -74,7 +81,7 @@ export function getTimestampSpecExpressionFromSpec(spec: IngestionSpec): string
   const transforms: Transform[] =
     deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || EMPTY_ARRAY;
 
-  const timeTransform = transforms.find(transform => transform.name === '__time');
+  const timeTransform = transforms.find(transform => transform.name === TIME_COLUMN);
   if (!timeTransform) return;
   return timeTransform.expression;
 }
diff --git a/web-console/src/utils/sampler.ts b/web-console/src/utils/sampler.ts
index 3f37c9e..0bd9ae8 100644
--- a/web-console/src/utils/sampler.ts
+++ b/web-console/src/utils/sampler.ts
@@ -29,6 +29,8 @@ import {
   isDruidSource,
   MetricSpec,
   PLACEHOLDER_TIMESTAMP_SPEC,
+  REINDEX_TIMESTAMP_SPEC,
+  TIME_COLUMN,
   TimestampSpec,
   Transform,
   TransformSpec,
@@ -152,13 +154,13 @@ export function headerFromSampleResponse(options: HeaderFromSampleResponseOption
 
   let columns = sortWithPrefixSuffix(
     dedupe(sampleResponse.data.flatMap(s => (s.parsed ? Object.keys(s.parsed) : []))).sort(),
-    columnOrder || ['__time'],
+    columnOrder || [TIME_COLUMN],
     suffixColumnOrder || [],
     alphanumericCompare,
   );
 
   if (ignoreTimeColumn) {
-    columns = columns.filter(c => c !== '__time');
+    columns = columns.filter(c => c !== TIME_COLUMN);
   }
 
   return columns;
@@ -290,7 +292,7 @@ export async function sampleForConnect(
       ioConfig,
       dataSchema: {
         dataSource: 'sample',
-        timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC,
+        timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
         dimensionsSpec: {},
       },
     } as any,
@@ -338,13 +340,15 @@ export async function sampleForParser(
     sampleStrategy,
   );
 
+  const reingestMode = isDruidSource(spec);
+
   const sampleSpec: SampleSpec = {
     type: samplerType,
     spec: {
       ioConfig,
       dataSchema: {
         dataSource: 'sample',
-        timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC,
+        timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
         dimensionsSpec: {},
       },
     },
@@ -398,7 +402,7 @@ export async function sampleForTimestamp(
         dimensionsSpec: {},
         timestampSpec,
         transformSpec: {
-          transforms: transforms.filter(transform => transform.name === '__time'),
+          transforms: transforms.filter(transform => transform.name === TIME_COLUMN),
         },
       },
     },
@@ -459,7 +463,7 @@ export async function sampleForTransform(
       headerFromSampleResponse({
         sampleResponse: sampleResponseHack,
         ignoreTimeColumn: true,
-        columnOrder: ['__time'].concat(inputFormatColumns),
+        columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
       }).concat(transforms.map(t => t.name)),
     );
   }
@@ -518,7 +522,7 @@ export async function sampleForFilter(
       headerFromSampleResponse({
         sampleResponse: sampleResponseHack,
         ignoreTimeColumn: true,
-        columnOrder: ['__time'].concat(inputFormatColumns),
+        columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
       }).concat(transforms.map(t => t.name)),
     );
   }
diff --git a/web-console/src/views/load-data-view/load-data-view.tsx b/web-console/src/views/load-data-view/load-data-view.tsx
index 783b448..447c9f6 100644
--- a/web-console/src/views/load-data-view/load-data-view.tsx
+++ b/web-console/src/views/load-data-view/load-data-view.tsx
@@ -55,6 +55,7 @@ import {
 } from '../../components';
 import { FormGroupWithInfo } from '../../components/form-group-with-info/form-group-with-info';
 import { AsyncActionDialog } from '../../dialogs';
+import { TIME_COLUMN } from '../../druid-models';
 import {
   addTimestampTransform,
   adjustId,
@@ -1221,8 +1222,8 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
 
             if (druidSource) {
               let newSpec = deepSet(spec, 'spec.dataSchema.timestampSpec', {
-                column: '__time',
-                format: 'iso',
+                column: TIME_COLUMN,
+                format: 'millis',
               });
 
               if (typeof inputData.rollup === 'boolean') {
@@ -1247,7 +1248,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
                   newSpec,
                   'spec.dataSchema.dimensionsSpec.dimensions',
                   Object.keys(inputData.columns)
-                    .filter(k => k !== '__time' && !aggregators[k])
+                    .filter(k => k !== TIME_COLUMN && !aggregators[k])
                     .map(k => ({
                       name: k,
                       type: String(inputData.columns![k].type || 'string').toLowerCase(),
@@ -1455,7 +1456,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
             let possibleTimestampSpec: TimestampSpec;
             if (isDruidSource(spec)) {
               possibleTimestampSpec = {
-                column: '__time',
+                column: TIME_COLUMN,
                 format: 'auto',
               };
             } else {
@@ -1608,7 +1609,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
         data: {
           headerAndRows: headerAndRowsFromSampleResponse({
             sampleResponse,
-            columnOrder: ['__time'].concat(inputFormatColumns),
+            columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
           }),
           spec,
         },
@@ -1802,7 +1803,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
       transformQueryState: new QueryState({
         data: headerAndRowsFromSampleResponse({
           sampleResponse,
-          columnOrder: ['__time'].concat(inputFormatColumns),
+          columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
         }),
         lastData: transformQueryState.getSomeData(),
       }),
@@ -2020,7 +2021,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
         filterQueryState: new QueryState({
           data: headerAndRowsFromSampleResponse({
             sampleResponse,
-            columnOrder: ['__time'].concat(inputFormatColumns),
+            columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
             parsedOnly: true,
           }),
           lastData: filterQueryState.getSomeData(),
@@ -2043,7 +2044,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
 
     const headerAndRowsNoFilter = headerAndRowsFromSampleResponse({
       sampleResponse: sampleResponseNoFilter,
-      columnOrder: ['__time'].concat(inputFormatColumns),
+      columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
       parsedOnly: true,
     });
 
@@ -2232,7 +2233,9 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
         data: {
           headerAndRows: headerAndRowsFromSampleResponse({
             sampleResponse,
-            columnOrder: ['__time'].concat(dimensions ? dimensions.map(getDimensionSpecName) : []),
+            columnOrder: [TIME_COLUMN].concat(
+              dimensions ? dimensions.map(getDimensionSpecName) : [],
+            ),
             suffixColumnOrder: metricsSpec ? metricsSpec.map(getMetricSpecName) : undefined,
           }),
           dimensions,
diff --git a/website/.spelling b/website/.spelling
index f89b29a..3a85dc6 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1038,6 +1038,7 @@ baseDir
 chatHandlerNumRetries
 chatHandlerTimeout
 connectorConfig
+countryName
 dataSchema's
 foldCase
 forceGuaranteedRollup
@@ -1782,6 +1783,7 @@ successfulSending
 taskBlackListCleanupPeriod
 tasklogs
 timeBoundary
+timestampSpec
 tmp
 tmpfs
 truststore

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org