You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/05/02 05:37:20 UTC

[incubator-druid] branch master updated: Data loader (sampler component) (#7531)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ec8562c  Data loader (sampler component) (#7531)
ec8562c is described below

commit ec8562c885f212d1c070a40322001a3da8048a0f
Author: David Lim <da...@gmail.com>
AuthorDate: Wed May 1 23:37:14 2019 -0600

    Data loader (sampler component) (#7531)
    
    * sampler initial check-in
    fix checkstyle issues
    add sampler fix to process CSV files from cache properly
    change to composition and rename some classes
    add tests and report num rows read and indexed
    remove excludedByFilter flag and don't send filtered out data
    fix tests to handle both settings for druid.generic.useDefaultValueForNull
    
    * wrap sampler firehose in TimedShutoffFirehoseFactory to support timeouts
    
    * code review changes - add additional comments, limit maxRows
---
 .../java/org/apache/druid/data/input/Firehose.java |  25 +-
 .../apache/druid/data/input/FirehoseFactory.java   |  12 +
 .../apache/druid/data/input/InputRowPlusRaw.java   |  84 +++
 .../data/input/impl/FileIteratingFirehose.java     |  21 +-
 .../druid/data/input/impl/TimestampSpec.java       |   4 +-
 .../PrefetchableTextFilesFirehoseFactory.java      |  19 +
 .../indexing/overlord/sampler/FirehoseSampler.java | 299 ++++++++
 .../overlord/sampler/IndexTaskSamplerSpec.java     |  62 ++
 .../indexing/overlord/sampler/SamplerCache.java    | 186 +++++
 .../indexing/overlord/sampler/SamplerConfig.java   | 117 +++
 .../overlord/sampler/SamplerException.java         |  30 +
 .../overlord/sampler/SamplerExceptionMapper.java   |  41 +
 .../indexing/overlord/sampler/SamplerInputRow.java | 105 +++
 .../indexing/overlord/sampler/SamplerModule.java   |  58 ++
 .../indexing/overlord/sampler/SamplerResource.java |  43 ++
 .../indexing/overlord/sampler/SamplerResponse.java | 152 ++++
 .../indexing/overlord/sampler/SamplerSpec.java     |  28 +
 .../apache/druid/indexing/common/TestFirehose.java | 205 +++++
 .../common/task/RealtimeIndexTaskTest.java         |  92 +--
 .../overlord/sampler/FirehoseSamplerTest.java      | 839 +++++++++++++++++++++
 .../overlord/sampler/IndexTaskSamplerSpecTest.java | 129 ++++
 .../overlord/sampler/SamplerCacheTest.java         | 177 +++++
 .../overlord/sampler/SamplerResponseTest.java      |  60 ++
 .../firehose/TimedShutoffFirehoseFactory.java      |  21 +-
 .../java/org/apache/druid/cli/CliOverlord.java     |   4 +-
 25 files changed, 2713 insertions(+), 100 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/Firehose.java b/core/src/main/java/org/apache/druid/data/input/Firehose.java
index 39556a8..f2951df 100644
--- a/core/src/main/java/org/apache/druid/data/input/Firehose.java
+++ b/core/src/main/java/org/apache/druid/data/input/Firehose.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input;
 
 import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.java.util.common.parsers.ParseException;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;
@@ -67,6 +68,24 @@ public interface Firehose extends Closeable
   InputRow nextRow();
 
   /**
+   * Returns an InputRowPlusRaw object containing the InputRow plus the raw, unparsed data corresponding to the next row
+   * available. Used in the sampler to provide the caller with information to assist in configuring a parse spec. If a
+   * ParseException is thrown by the parser, it should be caught and returned in the InputRowPlusRaw so we will be able
+   * to provide information on the raw row which failed to be parsed. Should only be called if hasMore returns true.
+   *
+   * @return an InputRowPlusRaw which may contain any of: an InputRow, the raw data, or a ParseException
+   */
+  default InputRowPlusRaw nextRowWithRaw()
+  {
+    try {
+      return InputRowPlusRaw.of(nextRow(), null);
+    }
+    catch (ParseException e) {
+      return InputRowPlusRaw.of(null, e);
+    }
+  }
+
+  /**
    * Returns a runnable that will "commit" everything read up to the point at which commit() is called.  This is
    * often equivalent to everything that has been read since the last commit() call (or instantiation of the object),
    * but doesn't necessarily have to be.
@@ -79,9 +98,9 @@ public interface Firehose extends Closeable
    * The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has
    * been committed on the writer side of this interface protocol.
    * <p>
-   * A simple implementation of this interface might do nothing when run() is called 
-   * (in which case the same do-nothing instance can be returned every time), or 
-   * a more complex implementation might clean up temporary resources that are no longer needed 
+   * A simple implementation of this interface might do nothing when run() is called
+   * (in which case the same do-nothing instance can be returned every time), or
+   * a more complex implementation might clean up temporary resources that are no longer needed
    * because of InputRows delivered by prior calls to {@link #nextRow()}.
    * </p>
    */
diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
index 64cb368..e4c5d38 100644
--- a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
+++ b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
@@ -73,6 +73,18 @@ public interface FirehoseFactory<T extends InputRowParser>
     return connect(parser);
   }
 
+  /**
+   * Initialization method that connects up the firehose. This method is intended for use by the sampler, and allows
+   * implementors to return a more efficient firehose, knowing that only a small number of rows will be read.
+   *
+   * @param parser             an input row parser
+   * @param temporaryDirectory a directory where temporary files are stored
+   */
+  default Firehose connectForSampler(T parser, @Nullable File temporaryDirectory) throws IOException, ParseException
+  {
+    return connect(parser, temporaryDirectory);
+  }
+
   default boolean isSplittable()
   {
     return false;
diff --git a/core/src/main/java/org/apache/druid/data/input/InputRowPlusRaw.java b/core/src/main/java/org/apache/druid/data/input/InputRowPlusRaw.java
new file mode 100644
index 0000000..e5696c9
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/data/input/InputRowPlusRaw.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import javax.annotation.Nullable;
+
+public class InputRowPlusRaw
+{
+  @Nullable
+  private final InputRow inputRow;
+
+  @Nullable
+  private final byte[] raw;
+
+  @Nullable
+  private final ParseException parseException;
+
+  private InputRowPlusRaw(@Nullable InputRow inputRow, @Nullable byte[] raw, @Nullable ParseException parseException)
+  {
+    this.inputRow = inputRow;
+    this.raw = raw;
+    this.parseException = parseException;
+  }
+
+  @Nullable
+  public InputRow getInputRow()
+  {
+    return inputRow;
+  }
+
+  /**
+   * The raw, unparsed event (as opposed to an {@link InputRow} which is the output of a parser). The interface default
+   * for {@link Firehose#nextRowWithRaw()} sets this to null, so this will only be non-null if nextRowWithRaw() is
+   * overridden by an implementation, such as in
+   * {@link org.apache.druid.data.input.impl.FileIteratingFirehose#nextRowWithRaw()}. Note that returning the raw row
+   * does not make sense for some sources (e.g. non-row based types), so clients should be able to handle this field
+   * being unset.
+   */
+  @Nullable
+  public byte[] getRaw()
+  {
+    return raw;
+  }
+
+  @Nullable
+  public ParseException getParseException()
+  {
+    return parseException;
+  }
+
+  public boolean isEmpty()
+  {
+    return inputRow == null && raw == null && parseException == null;
+  }
+
+  public static InputRowPlusRaw of(@Nullable InputRow inputRow, @Nullable byte[] raw)
+  {
+    return new InputRowPlusRaw(inputRow, raw, null);
+  }
+
+  public static InputRowPlusRaw of(@Nullable byte[] raw, @Nullable ParseException parseException)
+  {
+    return new InputRowPlusRaw(null, raw, parseException);
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java b/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java
index 6b75ed0..23224fb 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java
@@ -22,6 +22,9 @@ package org.apache.druid.data.input.impl;
 import org.apache.commons.io.LineIterator;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.utils.Runnables;
 
 import javax.annotation.Nullable;
@@ -30,8 +33,6 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-/**
- */
 public class FileIteratingFirehose implements Firehose
 {
   private final Iterator<LineIterator> lineIterators;
@@ -81,6 +82,22 @@ public class FileIteratingFirehose implements Firehose
     return parser.parse(lineIterator.next());
   }
 
+  @Override
+  public InputRowPlusRaw nextRowWithRaw()
+  {
+    if (!hasMore()) {
+      throw new NoSuchElementException();
+    }
+
+    String raw = lineIterator.next();
+    try {
+      return InputRowPlusRaw.of(parser.parse(raw), StringUtils.toUtf8(raw));
+    }
+    catch (ParseException e) {
+      return InputRowPlusRaw.of(StringUtils.toUtf8(raw), e);
+    }
+  }
+
   private LineIterator getNextLineIterator()
   {
     if (lineIterator != null) {
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
index a61f983..7883ae0 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
@@ -47,7 +47,7 @@ public class TimestampSpec
 
   private final String timestampColumn;
   private final String timestampFormat;
-  // this value should never be set for production data
+  // this value should never be set for production data; the data loader uses it before a timestamp column is chosen
   private final DateTime missingValue;
   /** This field is a derivative of {@link #timestampFormat}; not checked in {@link #equals} and {@link #hashCode} */
   private final Function<Object, DateTime> timestampConverter;
@@ -59,7 +59,7 @@ public class TimestampSpec
   public TimestampSpec(
       @JsonProperty("column") String timestampColumn,
       @JsonProperty("format") String format,
-      // this value should never be set for production data
+      // this value should never be set for production data; the data loader uses it before a timestamp column is chosen
       @JsonProperty("missingValue") DateTime missingValue
   )
   {
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java
index 2a0d917..f237fb9 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/PrefetchableTextFilesFirehoseFactory.java
@@ -91,6 +91,9 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
 {
   private static final Logger LOG = new Logger(PrefetchableTextFilesFirehoseFactory.class);
 
+  private static final CacheManager DISABLED_CACHE_MANAGER = new CacheManager(0);
+  private static final PrefetchConfig DISABLED_PREFETCH_CONFIG = new PrefetchConfig(0L, 0L, 0L, 0L);
+
   public static final int DEFAULT_MAX_FETCH_RETRY = 3;
 
   private final CacheManager<T> cacheManager;
@@ -158,6 +161,22 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
   @Override
   public Firehose connect(StringInputRowParser firehoseParser, @Nullable File temporaryDirectory) throws IOException
   {
+    return connectInternal(firehoseParser, temporaryDirectory, this.prefetchConfig, this.cacheManager);
+  }
+
+  @Override
+  public Firehose connectForSampler(StringInputRowParser parser, @Nullable File temporaryDirectory) throws IOException
+  {
+    return connectInternal(parser, temporaryDirectory, DISABLED_PREFETCH_CONFIG, DISABLED_CACHE_MANAGER);
+  }
+
+  private Firehose connectInternal(
+      StringInputRowParser firehoseParser,
+      @Nullable File temporaryDirectory,
+      PrefetchConfig prefetchConfig,
+      CacheManager cacheManager
+  ) throws IOException
+  {
     if (objects == null) {
       objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "objects"));
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/FirehoseSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/FirehoseSampler.java
new file mode 100644
index 0000000..e0ae3eb
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/FirehoseSampler.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.druid.common.utils.UUIDUtils;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.java.util.common.parsers.Parser;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class FirehoseSampler
+{
+  private static final EmittingLogger log = new EmittingLogger(FirehoseSampler.class);
+
+  // These are convenience shims to allow the data loader to not need to provide a dummy parseSpec during the early
+  // stages when the parameters for the parseSpec are still unknown and they are only interested in the unparsed rows.
+  // We need two of these because firehose factories based on AbstractTextFilesFirehoseFactory expect to be used with
+  // StringInputRowParser, while all the others expect InputRowParser.
+  // ---------------------------
+  private static final InputRowParser EMPTY_STRING_PARSER_SHIM = new StringInputRowParser(
+      new ParseSpec(new TimestampSpec(null, null, DateTimes.EPOCH), new DimensionsSpec(null))
+      {
+        @Override
+        public Parser<String, Object> makeParser()
+        {
+          return new Parser<String, Object>()
+          {
+            @Nullable
+            @Override
+            public Map<String, Object> parseToMap(String input)
+            {
+              throw new ParseException(null);
+            }
+
+            @Override
+            public void setFieldNames(Iterable<String> fieldNames)
+            {
+            }
+
+            @Override
+            public List<String> getFieldNames()
+            {
+              return ImmutableList.of();
+            }
+          };
+        }
+      }, null);
+
+  private static final InputRowParser EMPTY_PARSER_SHIM = new InputRowParser()
+  {
+    @Override
+    public ParseSpec getParseSpec()
+    {
+      return null;
+    }
+
+    @Override
+    public InputRowParser withParseSpec(ParseSpec parseSpec)
+    {
+      return null;
+    }
+
+    @Override
+    public List<InputRow> parseBatch(Object input)
+    {
+      throw new ParseException(null);
+    }
+
+    @Override
+    public InputRow parse(Object input)
+    {
+      throw new ParseException(null);
+    }
+  };
+  // ---------------------------
+
+  // We want to be able to sort the list of processed results back into the same order that we read them from the
+  // firehose so that the rows in the data loader are not always changing. To do this, we add a temporary column to the
+  // InputRow (in SamplerInputRow) and tag each row with a sortKey. We use an aggregator so that it will not affect
+  // rollup, and we use a longMin aggregator so that as rows get rolled up, the earlier rows stay stable and later
+  // rows may get rolled into these rows. After getting the results back from the IncrementalIndex, we sort by this
+  // column and then exclude it from the response.
+  private static final AggregatorFactory INTERNAL_ORDERING_AGGREGATOR = new LongMinAggregatorFactory(
+      SamplerInputRow.SAMPLER_ORDERING_COLUMN,
+      SamplerInputRow.SAMPLER_ORDERING_COLUMN
+  );
+
+  private final ObjectMapper objectMapper;
+  private final SamplerCache samplerCache;
+
+  @Inject
+  public FirehoseSampler(ObjectMapper objectMapper, SamplerCache samplerCache)
+  {
+    this.objectMapper = objectMapper;
+    this.samplerCache = samplerCache;
+  }
+
+  public SamplerResponse sample(FirehoseFactory firehoseFactory, DataSchema dataSchema, SamplerConfig samplerConfig)
+  {
+    Preconditions.checkNotNull(firehoseFactory, "firehoseFactory required");
+
+    if (dataSchema == null) {
+      dataSchema = new DataSchema("sampler", null, null, null, null, objectMapper);
+    }
+
+    if (samplerConfig == null) {
+      samplerConfig = SamplerConfig.empty();
+    }
+
+    final InputRowParser parser = dataSchema.getParser() != null
+                                  ? dataSchema.getParser()
+                                  : (firehoseFactory instanceof AbstractTextFilesFirehoseFactory
+                                     ? EMPTY_STRING_PARSER_SHIM
+                                     : EMPTY_PARSER_SHIM);
+
+    final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
+        .withTimestampSpec(parser)
+        .withQueryGranularity(dataSchema.getGranularitySpec().getQueryGranularity())
+        .withDimensionsSpec(parser)
+        .withMetrics(ArrayUtils.addAll(dataSchema.getAggregators(), INTERNAL_ORDERING_AGGREGATOR))
+        .withRollup(dataSchema.getGranularitySpec().isRollup())
+        .build();
+
+    FirehoseFactory myFirehoseFactory = null;
+    boolean usingCachedData = true;
+    if (!samplerConfig.isSkipCache() && samplerConfig.getCacheKey() != null) {
+      myFirehoseFactory = samplerCache.getAsFirehoseFactory(samplerConfig.getCacheKey(), parser);
+    }
+    if (myFirehoseFactory == null) {
+      myFirehoseFactory = firehoseFactory;
+      usingCachedData = false;
+    }
+
+    if (samplerConfig.getTimeoutMs() > 0) {
+      myFirehoseFactory = new TimedShutoffFirehoseFactory(
+          myFirehoseFactory,
+          DateTimes.nowUtc().plusMillis(samplerConfig.getTimeoutMs())
+      );
+    }
+
+    final File tempDir = Files.createTempDir();
+    try (final Firehose firehose = myFirehoseFactory.connectForSampler(parser, tempDir);
+         final IncrementalIndex index = new IncrementalIndex.Builder().setIndexSchema(indexSchema)
+                                                                      .setMaxRowCount(samplerConfig.getNumRows())
+                                                                      .buildOnheap()) {
+
+      List<byte[]> dataToCache = new ArrayList<>();
+      SamplerResponse.SamplerResponseRow responseRows[] = new SamplerResponse.SamplerResponseRow[samplerConfig.getNumRows()];
+      int counter = 0, numRowsIndexed = 0;
+
+      while (counter < responseRows.length && firehose.hasMore()) {
+        String raw = null;
+        try {
+          final InputRowPlusRaw row = firehose.nextRowWithRaw();
+
+          if (row == null || row.isEmpty()) {
+            continue;
+          }
+
+          if (row.getRaw() != null) {
+            raw = StringUtils.fromUtf8(row.getRaw());
+
+            if (!usingCachedData) {
+              dataToCache.add(row.getRaw());
+            }
+          }
+
+          if (row.getParseException() != null) {
+            throw row.getParseException();
+          }
+
+          if (row.getInputRow() == null) {
+            continue;
+          }
+
+          if (!Intervals.ETERNITY.contains(row.getInputRow().getTimestamp())) {
+            throw new ParseException("Timestamp cannot be represented as a long: [%s]", row.getInputRow());
+          }
+
+          IncrementalIndexAddResult result = index.add(new SamplerInputRow(row.getInputRow(), counter), true);
+          if (result.getParseException() != null) {
+            throw result.getParseException();
+          } else {
+            // store the raw value; will be merged with the data from the IncrementalIndex later
+            responseRows[counter] = new SamplerResponse.SamplerResponseRow(raw, null, null, null);
+            counter++;
+            numRowsIndexed++;
+          }
+        }
+        catch (ParseException e) {
+          responseRows[counter] = new SamplerResponse.SamplerResponseRow(raw, null, true, e.getMessage());
+          counter++;
+        }
+      }
+
+      final List<String> columnNames = index.getColumnNames();
+      columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
+
+      for (Row row : (Iterable<Row>) index) {
+        Map<String, Object> parsed = new HashMap<>();
+
+        columnNames.forEach(k -> {
+          if (row.getRaw(k) != null) {
+            parsed.put(k, row.getRaw(k));
+          }
+        });
+        parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
+
+        Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
+        if (sortKey != null) {
+          responseRows[sortKey.intValue()] = responseRows[sortKey.intValue()].withParsed(parsed);
+        }
+      }
+
+      // cache raw data if available
+      String cacheKey = usingCachedData ? samplerConfig.getCacheKey() : null;
+      if (!samplerConfig.isSkipCache() && !dataToCache.isEmpty()) {
+        cacheKey = samplerCache.put(UUIDUtils.generateUuid(), dataToCache);
+      }
+
+      return new SamplerResponse(
+          cacheKey,
+          counter,
+          numRowsIndexed,
+          Arrays.stream(responseRows)
+                .filter(Objects::nonNull)
+                .filter(x -> x.getParsed() != null || x.isUnparseable() != null)
+                .collect(Collectors.toList())
+      );
+    }
+    catch (Exception e) {
+      throw new SamplerException(e, "Failed to sample data: %s", e.getMessage());
+    }
+    finally {
+      try {
+        FileUtils.deleteDirectory(tempDir);
+      }
+      catch (IOException e) {
+        log.warn(e, "Failed to cleanup temporary directory");
+      }
+    }
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
new file mode 100644
index 0000000..241651c
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.segment.indexing.DataSchema;
+
+public class IndexTaskSamplerSpec implements SamplerSpec
+{
+  private final DataSchema dataSchema;
+  private final FirehoseFactory firehoseFactory;
+  private final SamplerConfig samplerConfig;
+  private final FirehoseSampler firehoseSampler;
+
+  @JsonCreator
+  public IndexTaskSamplerSpec(
+      @JsonProperty("spec") final IndexTask.IndexIngestionSpec ingestionSpec,
+      @JsonProperty("samplerConfig") final SamplerConfig samplerConfig,
+      @JacksonInject FirehoseSampler firehoseSampler
+  )
+  {
+    this.dataSchema = Preconditions.checkNotNull(ingestionSpec, "[spec] is required").getDataSchema();
+
+    Preconditions.checkNotNull(ingestionSpec.getIOConfig(), "[spec.ioConfig] is required");
+
+    this.firehoseFactory = Preconditions.checkNotNull(
+        ingestionSpec.getIOConfig().getFirehoseFactory(),
+        "[spec.ioConfig.firehose] is required"
+    );
+
+    this.samplerConfig = samplerConfig;
+    this.firehoseSampler = firehoseSampler;
+  }
+
+  @Override
+  public SamplerResponse sample()
+  {
+    return firehoseSampler.sample(firehoseFactory, dataSchema, samplerConfig);
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java
new file mode 100644
index 0000000..3c3d664
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.data.input.ByteBufferInputRowParser;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.utils.Runnables;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class SamplerCache
+{
+  private static final EmittingLogger log = new EmittingLogger(SamplerCache.class);
+  private static final String NAMESPACE = "sampler";
+
+  private final Cache cache;
+
+  @Inject
+  public SamplerCache(Cache cache)
+  {
+    this.cache = cache;
+  }
+
+  @Nullable
+  public String put(String key, Collection<byte[]> values)
+  {
+    if (values == null) {
+      return null;
+    }
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+      oos.writeObject(new ArrayList<>(values));
+      cache.put(new Cache.NamedKey(NAMESPACE, StringUtils.toUtf8(key)), baos.toByteArray());
+      return key;
+    }
+    catch (IOException e) {
+      log.warn(e, "Exception while serializing to sampler cache");
+      return null;
+    }
+  }
+
+  @Nullable
+  public FirehoseFactory getAsFirehoseFactory(String key, InputRowParser parser)
+  {
+    if (!(parser instanceof ByteBufferInputRowParser)) {
+      log.warn("SamplerCache expects a ByteBufferInputRowParser");
+      return null;
+    }
+
+    Collection<byte[]> data = get(key);
+    if (data == null) {
+      return null;
+    }
+
+    return new FirehoseFactory<ByteBufferInputRowParser>()
+    {
+      @Override
+      public Firehose connect(ByteBufferInputRowParser parser, @Nullable File temporaryDirectory)
+      {
+        return new SamplerCacheFirehose(parser, data);
+      }
+    };
+  }
+
+  @Nullable
+  private Collection<byte[]> get(String key)
+  {
+    byte[] data = cache.get(new Cache.NamedKey(NAMESPACE, StringUtils.toUtf8(key)));
+    if (data == null) {
+      return null;
+    }
+
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
+         ObjectInputStream ois = new ObjectInputStream(bais)) {
+      return (ArrayList) ois.readObject();
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception while deserializing from sampler cache");
+      return null;
+    }
+  }
+
+  public static class SamplerCacheFirehose implements Firehose
+  {
+    private final ByteBufferInputRowParser parser;
+    private final Iterator<byte[]> it;
+
+    public SamplerCacheFirehose(ByteBufferInputRowParser parser, Collection<byte[]> data)
+    {
+      this.parser = parser;
+      this.it = data != null ? data.iterator() : Collections.emptyIterator();
+
+      if (parser instanceof StringInputRowParser) {
+        ((StringInputRowParser) parser).startFileFromBeginning();
+      }
+    }
+
+    @Override
+    public boolean hasMore()
+    {
+      return it.hasNext();
+    }
+
+    @Nullable
+    @Override
+    public InputRow nextRow()
+    {
+      if (!hasMore()) {
+        throw new NoSuchElementException();
+      }
+
+      List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(it.next()));
+      return rows.isEmpty() ? null : rows.get(0);
+    }
+
+    @Override
+    public InputRowPlusRaw nextRowWithRaw()
+    {
+      if (!hasMore()) {
+        throw new NoSuchElementException();
+      }
+
+      byte[] raw = it.next();
+
+      try {
+        List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(raw));
+        return InputRowPlusRaw.of(rows.isEmpty() ? null : rows.get(0), raw);
+      }
+      catch (ParseException e) {
+        return InputRowPlusRaw.of(raw, e);
+      }
+    }
+
+    @Override
+    public Runnable commit()
+    {
+      return Runnables.getNoopRunnable();
+    }
+
+    @Override
+    public void close()
+    {
+    }
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
new file mode 100644
index 0000000..a7d6ca9
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+public class SamplerConfig
+{
+  private static final int DEFAULT_NUM_ROWS = 200;
+  private static final int MAX_NUM_ROWS = 5000;
+  private static final boolean DEFAULT_SKIP_CACHE = false;
+  private static final int DEFAULT_TIMEOUT_MS = 10000;
+
+  private final int numRows;
+  private final String cacheKey;
+  private final boolean skipCache;
+  private final int timeoutMs;
+
+  @JsonCreator
+  public SamplerConfig(
+      @JsonProperty("numRows") Integer numRows,
+      @JsonProperty("cacheKey") String cacheKey,
+      @JsonProperty("skipCache") Boolean skipCache,
+      @JsonProperty("timeoutMs") Integer timeoutMs
+  )
+  {
+    this.numRows = numRows != null ? numRows : DEFAULT_NUM_ROWS;
+    this.cacheKey = cacheKey;
+    this.skipCache = skipCache != null ? skipCache : DEFAULT_SKIP_CACHE;
+    this.timeoutMs = timeoutMs != null ? timeoutMs : DEFAULT_TIMEOUT_MS;
+
+    Preconditions.checkArgument(this.numRows <= MAX_NUM_ROWS, "numRows must be <= %s", MAX_NUM_ROWS);
+  }
+
+  /**
+   * The maximum number of rows to return in a response. The actual number of returned rows may be less if:
+   *   - The sampled source contains less data.
+   *   - We are reading from the cache ({@link SamplerConfig#cacheKey} is set and {@link SamplerConfig#isSkipCache()}
+   *     is false) and the cache contains less data.
+   *   - {@link SamplerConfig#timeoutMs} elapses before this value is reached.
+   *   - {@link org.apache.druid.segment.indexing.granularity.GranularitySpec#isRollup()} is true and input rows get
+   *     rolled-up into fewer indexed rows.
+   *
+   * @return maximum number of sampled rows to return
+   */
+  public int getNumRows()
+  {
+    return numRows;
+  }
+
+  /**
+   * The sampler uses a best-effort system to attempt to cache the raw data so that future requests to the sampler
+   * can be answered without reading again from the source. In addition to responsiveness benefits, this also provides a
+   * better user experience for sources such as streams, where repeated calls to the sampler (which would happen as the
+   * user tweaks data schema configurations) would otherwise return a different set of sampled data every time. For the
+   * caching system to work, 1) the sampler must have access to the raw data (e.g. for {@link FirehoseSampler},
+   * {@link org.apache.druid.data.input.InputRowPlusRaw#getRaw()} must be non-null) and 2) the parser must be an
+   * implementation of {@link org.apache.druid.data.input.ByteBufferInputRowParser} since the data is cached as a byte
+   * array. If these conditions are not satisfied, the cache returns a miss and the sampler would read from source.
+   * <p>
+   * {@link SamplerResponse} returns a {@link SamplerResponse#cacheKey} which should be supplied here in
+   * {@link SamplerConfig} for future requests to prefer the cache if available. This field is ignored if
+   * {@link SamplerConfig#skipCache} is true.
+   *
+   * @return key to use for locating previously cached raw data
+   */
+  public String getCacheKey()
+  {
+    return cacheKey;
+  }
+
+  /**
+   * Whether to read/write to the cache. See cache description in {@link SamplerConfig#getCacheKey()}.
+   *
+   * @return true if cache reads and writes should be skipped
+   */
+  public boolean isSkipCache()
+  {
+    return skipCache;
+  }
+
+  /**
+   * Time to wait in milliseconds before closing the sampler and returning the data which has already been read.
+   * Particularly useful for handling streaming input sources where the rate of data is unknown, to prevent the sampler
+   * from taking an excessively long time trying to reach {@link SamplerConfig#numRows}.
+   *
+   * @return timeout in milliseconds
+   */
+  public int getTimeoutMs()
+  {
+    return timeoutMs;
+  }
+
+  public static SamplerConfig empty()
+  {
+    return new SamplerConfig(null, null, null, null);
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java
new file mode 100644
index 0000000..b7937b8
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import org.apache.druid.java.util.common.StringUtils;
+
+public class SamplerException extends RuntimeException
+{
+  public SamplerException(Throwable cause, String formatText, Object... arguments)
+  {
+    super(StringUtils.nonStrictFormat(formatText, arguments), cause);
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
new file mode 100644
index 0000000..d35e5ab
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.google.common.collect.ImmutableMap;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class SamplerExceptionMapper implements ExceptionMapper<SamplerException>
+{
+  @Override
+  public Response toResponse(SamplerException exception)
+  {
+    return Response.status(Response.Status.BAD_REQUEST)
+                   .entity(ImmutableMap.of(
+                       "error",
+                       exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage()
+                   ))
+                   .build();
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerInputRow.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerInputRow.java
new file mode 100644
index 0000000..aa187e0
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerInputRow.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.Row;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+
+public class SamplerInputRow implements InputRow
+{
+  public static final String SAMPLER_ORDERING_COLUMN = "__internal_sampler_order";
+
+  private final InputRow row;
+  private final int sortKey;
+
+  public SamplerInputRow(InputRow row, int sortKey)
+  {
+    this.row = row;
+    this.sortKey = sortKey;
+  }
+
+  @Override
+  public List<String> getDimensions()
+  {
+    return row.getDimensions();
+  }
+
+  @Override
+  public long getTimestampFromEpoch()
+  {
+    return row.getTimestampFromEpoch();
+  }
+
+  @Override
+  public DateTime getTimestamp()
+  {
+    return row.getTimestamp();
+  }
+
+  @Override
+  public List<String> getDimension(String dimension)
+  {
+    return row.getDimension(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Object getRaw(String dimension)
+  {
+    return SAMPLER_ORDERING_COLUMN.equals(dimension) ? sortKey : row.getRaw(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Number getMetric(String metric)
+  {
+    return SAMPLER_ORDERING_COLUMN.equals(metric) ? sortKey : row.getMetric(metric);
+  }
+
+  @Override
+  public int compareTo(Row o)
+  {
+    return row.compareTo(o);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SamplerInputRow that = (SamplerInputRow) o;
+    return sortKey == that.sortKey && Objects.equals(row, that.row);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(row, sortKey);
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerModule.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerModule.java
new file mode 100644
index 0000000..a4f23c5
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerModule.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Singleton;
+import org.apache.druid.guice.CacheModule;
+import org.apache.druid.guice.Jerseys;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.List;
+
+public class SamplerModule implements DruidModule
+{
+  @Override
+  public List<? extends Module> getJacksonModules()
+  {
+    return ImmutableList.of(
+        new SimpleModule(getClass().getSimpleName())
+            .registerSubtypes(
+                new NamedType(IndexTaskSamplerSpec.class, "index")
+            )
+    );
+  }
+
+  @Override
+  public void configure(Binder binder)
+  {
+    Jerseys.addResource(binder, SamplerResource.class);
+
+    binder.install(new CacheModule());
+
+    binder.bind(FirehoseSampler.class).in(Singleton.class);
+    binder.bind(SamplerExceptionMapper.class).in(Singleton.class);
+    binder.bind(SamplerCache.class).in(Singleton.class);
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResource.java
new file mode 100644
index 0000000..3b8276c
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.google.common.base.Preconditions;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.server.http.security.StateResourceFilter;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/druid/indexer/v1/sampler")
+public class SamplerResource
+{
+  @POST
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(StateResourceFilter.class)
+  public SamplerResponse post(final SamplerSpec sampler)
+  {
+    return Preconditions.checkNotNull(sampler, "Request body cannot be empty").sample();
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResponse.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResponse.java
new file mode 100644
index 0000000..2320a37
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerResponse.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class SamplerResponse
+{
+  private final String cacheKey;
+  private final Integer numRowsRead;
+  private final Integer numRowsIndexed;
+  private final List<SamplerResponseRow> data;
+
+  public SamplerResponse(String cacheKey, Integer numRowsRead, Integer numRowsIndexed, List<SamplerResponseRow> data)
+  {
+    this.cacheKey = cacheKey;
+    this.numRowsRead = numRowsRead;
+    this.numRowsIndexed = numRowsIndexed;
+    this.data = data;
+  }
+
+  @JsonProperty
+  public String getCacheKey()
+  {
+    return cacheKey;
+  }
+
+  @JsonProperty
+  public Integer getNumRowsRead()
+  {
+    return numRowsRead;
+  }
+
+  @JsonProperty
+  public Integer getNumRowsIndexed()
+  {
+    return numRowsIndexed;
+  }
+
+  @JsonProperty
+  public List<SamplerResponseRow> getData()
+  {
+    return data;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class SamplerResponseRow
+  {
+    private final String raw;
+    private final Map<String, Object> parsed;
+    private final Boolean unparseable;
+    private final String error;
+
+    public SamplerResponseRow(
+        String raw,
+        Map<String, Object> parsed,
+        Boolean unparseable,
+        String error
+    )
+    {
+      this.raw = raw;
+      this.parsed = parsed;
+      this.unparseable = unparseable;
+      this.error = error;
+    }
+
+    @JsonProperty
+    public String getRaw()
+    {
+      return raw;
+    }
+
+    @JsonProperty
+    public Map<String, Object> getParsed()
+    {
+      return parsed;
+    }
+
+    @JsonProperty
+    public Boolean isUnparseable()
+    {
+      return unparseable;
+    }
+
+    @JsonProperty
+    public String getError()
+    {
+      return error;
+    }
+
+    public SamplerResponseRow withParsed(Map<String, Object> parsed)
+    {
+      return new SamplerResponseRow(raw, parsed, unparseable, error);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SamplerResponseRow that = (SamplerResponseRow) o;
+      return Objects.equals(raw, that.raw) &&
+             Objects.equals(parsed, that.parsed) &&
+             Objects.equals(unparseable, that.unparseable) &&
+             Objects.equals(error, that.error);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(raw, parsed, unparseable, error);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SamplerResponseRow{" +
+             "raw='" + raw + '\'' +
+             ", parsed=" + parsed +
+             ", unparseable=" + unparseable +
+             ", error='" + error + '\'' +
+             '}';
+    }
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerSpec.java
new file mode 100644
index 0000000..0c1505d
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerSpec.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+public interface SamplerSpec
+{
+  SamplerResponse sample();
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java
new file mode 100644
index 0000000..49896d9
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import org.apache.druid.data.input.FiniteFirehoseFactory;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.Runnables;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.List;
+import java.util.Optional;
+
+public class TestFirehose implements Firehose
+{
+  public static class TestFirehoseFactory implements FirehoseFactory<InputRowParser>
+  {
+    private boolean waitForClose = true;
+    private List<Object> seedRows;
+
+    public TestFirehoseFactory() {}
+
+    public TestFirehoseFactory(boolean waitForClose, List<Object> seedRows)
+    {
+      this.waitForClose = waitForClose;
+      this.seedRows = seedRows;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Firehose connect(InputRowParser parser, File temporaryDirectory) throws ParseException
+    {
+      return new TestFirehose(parser, waitForClose, seedRows);
+    }
+  }
+
+  public static class TestAbstractTextFilesFirehoseFactory extends AbstractTextFilesFirehoseFactory
+  {
+    private boolean waitForClose;
+    private List<Object> seedRows;
+
+    public TestAbstractTextFilesFirehoseFactory(boolean waitForClose, List<Object> seedRows)
+    {
+      this.waitForClose = waitForClose;
+      this.seedRows = seedRows;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Firehose connect(StringInputRowParser parser, File temporaryDirectory) throws ParseException
+    {
+      return new TestFirehose(parser, waitForClose, seedRows);
+    }
+
+    @Override
+    protected Collection initObjects()
+    {
+      return null;
+    }
+
+    @Override
+    protected InputStream openObjectStream(Object object)
+    {
+      return null;
+    }
+
+    @Override
+    protected InputStream wrapObjectStream(Object object, InputStream stream)
+    {
+      return null;
+    }
+
+    @Override
+    public FiniteFirehoseFactory withSplit(InputSplit split)
+    {
+      return null;
+    }
+  }
+
+  public static final String FAIL_DIM = "__fail__";
+
+  private final Deque<Optional<Object>> queue = new ArrayDeque<>();
+
+  private InputRowParser parser;
+  private boolean closed;
+
+  private TestFirehose(InputRowParser parser, boolean waitForClose, List<Object> seedRows)
+  {
+    this.parser = parser;
+    this.closed = !waitForClose;
+
+    if (parser instanceof StringInputRowParser) {
+      ((StringInputRowParser) parser).startFileFromBeginning();
+    }
+
+    if (seedRows != null) {
+      seedRows.stream().map(Optional::ofNullable).forEach(queue::add);
+    }
+  }
+
+  public void addRows(List<Object> rows)
+  {
+    synchronized (this) {
+      rows.stream().map(Optional::ofNullable).forEach(queue::add);
+      notifyAll();
+    }
+  }
+
+  @Override
+  public boolean hasMore()
+  {
+    try {
+      synchronized (this) {
+        while (queue.isEmpty() && !closed) {
+          wait();
+        }
+        return !queue.isEmpty();
+      }
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public InputRow nextRow()
+  {
+    synchronized (this) {
+      final InputRow row = parser instanceof StringInputRowParser
+                           ? ((StringInputRowParser) parser).parse((String) queue.removeFirst().orElse(null))
+                           : (InputRow) parser.parseBatch(queue.removeFirst().orElse(null)).get(0);
+      if (row != null && row.getRaw(FAIL_DIM) != null) {
+        throw new ParseException(FAIL_DIM);
+      }
+      return row;
+    }
+  }
+
+  @Override
+  public InputRowPlusRaw nextRowWithRaw()
+  {
+    Object next = queue.removeFirst().orElse(null);
+
+    synchronized (this) {
+      try {
+        final InputRow row = parser instanceof StringInputRowParser
+                             ? ((StringInputRowParser) parser).parse((String) next)
+                             : (InputRow) parser.parseBatch(next).get(0);
+
+        if (row != null && row.getRaw(FAIL_DIM) != null) {
+          throw new ParseException(FAIL_DIM);
+        }
+        return InputRowPlusRaw.of(row, next != null ? StringUtils.toUtf8(next.toString()) : null);
+      }
+      catch (ParseException e) {
+        return InputRowPlusRaw.of(next != null ? StringUtils.toUtf8(next.toString()) : null, e);
+      }
+    }
+  }
+
+  @Override
+  public Runnable commit()
+  {
+    return Runnables.getNoopRunnable();
+  }
+
+  @Override
+  public void close()
+  {
+    synchronized (this) {
+      closed = true;
+      notifyAll();
+    }
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 348c349..e2a24f5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -31,11 +31,8 @@ import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.client.cache.MapCache;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.FirehoseFactory;
-import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
@@ -47,6 +44,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.TestFirehose;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -117,7 +115,6 @@ import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.Runnables;
 import org.easymock.EasyMock;
 import org.hamcrest.CoreMatchers;
 import org.joda.time.DateTime;
@@ -134,14 +131,11 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.nio.file.Files;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -156,86 +150,6 @@ public class RealtimeIndexTaskTest
       new NoopEmitter()
   );
 
-  private static final String FAIL_DIM = "__fail__";
-
-  private static class TestFirehose implements Firehose
-  {
-    private final InputRowParser<Map<String, Object>> parser;
-    private final Deque<Optional<Map<String, Object>>> queue = new ArrayDeque<>();
-    private boolean closed = false;
-
-    public TestFirehose(final InputRowParser<Map<String, Object>> parser)
-    {
-      this.parser = parser;
-    }
-
-    public void addRows(List<Map<String, Object>> rows)
-    {
-      synchronized (this) {
-        rows.stream().map(Optional::ofNullable).forEach(queue::add);
-        notifyAll();
-      }
-    }
-
-    @Override
-    public boolean hasMore()
-    {
-      try {
-        synchronized (this) {
-          while (queue.isEmpty() && !closed) {
-            wait();
-          }
-          return !queue.isEmpty();
-        }
-      }
-      catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public InputRow nextRow()
-    {
-      synchronized (this) {
-        final InputRow row = parser.parseBatch(queue.removeFirst().orElse(null)).get(0);
-        if (row != null && row.getRaw(FAIL_DIM) != null) {
-          throw new ParseException(FAIL_DIM);
-        }
-        return row;
-      }
-    }
-
-    @Override
-    public Runnable commit()
-    {
-      return Runnables.getNoopRunnable();
-    }
-
-    @Override
-    public void close()
-    {
-      synchronized (this) {
-        closed = true;
-        notifyAll();
-      }
-    }
-  }
-
-  private static class TestFirehoseFactory implements FirehoseFactory<InputRowParser>
-  {
-    public TestFirehoseFactory()
-    {
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public Firehose connect(InputRowParser parser, File temporaryDirectory) throws ParseException
-    {
-      return new TestFirehose(parser);
-    }
-  }
-
   @Rule
   public final ExpectedException expectedException = ExpectedException.none();
 
@@ -521,7 +435,7 @@ public class RealtimeIndexTaskTest
             ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"),
 
             // Bad row- will be unparseable.
-            ImmutableMap.of("dim1", "foo", "met1", 2.0, FAIL_DIM, "x"),
+            ImmutableMap.of("dim1", "foo", "met1", 2.0, TestFirehose.FAIL_DIM, "x"),
 
             // Old row- will be thrownAway.
             ImmutableMap.of("t", now.minus(Period.days(1)).getMillis(), "dim1", "foo", "met1", 2.0),
@@ -907,7 +821,7 @@ public class RealtimeIndexTaskTest
         objectMapper
     );
     RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
-        new TestFirehoseFactory(),
+        new TestFirehose.TestFirehoseFactory(),
         null,
         null
     );
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/FirehoseSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/FirehoseSamplerTest.java
new file mode 100644
index 0000000..9c3da5b
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/FirehoseSamplerTest.java
@@ -0,0 +1,839 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.impl.DelimitedParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.TestFirehose;
+import org.apache.druid.indexing.overlord.sampler.SamplerResponse.SamplerResponseRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.transform.ExpressionTransform;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@RunWith(Parameterized.class)
+public class FirehoseSamplerTest
+{
+  private enum ParserType
+  {
+    MAP, STR_JSON, STR_CSV
+  }
+
+  private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+  private static final boolean USE_DEFAULT_VALUE_FOR_NULL = Boolean.valueOf(System.getProperty(
+      NullHandling.NULL_HANDLING_CONFIG_STRING,
+      "true"
+  ));
+
+  private static final List<Object> MAP_ROWS = ImmutableList.of(
+      ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo", "met1", "1"),
+      ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo", "met1", "2"),
+      ImmutableMap.of("t", "2019-04-22T12:01", "dim1", "foo", "met1", "3"),
+      ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo2", "met1", "4"),
+      ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo", "dim2", "bar", "met1", "5"),
+      ImmutableMap.of("t", "bad_timestamp", "dim1", "foo", "met1", "6")
+  );
+
+  private static final List<Object> STR_JSON_ROWS = ImmutableList.of(
+      "{ \"t\": \"2019-04-22T12:00\", \"dim1\": \"foo\", \"met1\": 1 }",
+      "{ \"t\": \"2019-04-22T12:00\", \"dim1\": \"foo\", \"met1\": 2 }",
+      "{ \"t\": \"2019-04-22T12:01\", \"dim1\": \"foo\", \"met1\": 3 }",
+      "{ \"t\": \"2019-04-22T12:00\", \"dim1\": \"foo2\", \"met1\": 4 }",
+      "{ \"t\": \"2019-04-22T12:00\", \"dim1\": \"foo\", \"dim2\": \"bar\", \"met1\": 5 }",
+      "{ \"t\": \"bad_timestamp\", \"dim1\": \"foo\", \"met1\": 6 }"
+  );
+
+  private static final List<Object> STR_CSV_ROWS = ImmutableList.of(
+      "2019-04-22T12:00,foo,,1",
+      "2019-04-22T12:00,foo,,2",
+      "2019-04-22T12:01,foo,,3",
+      "2019-04-22T12:00,foo2,,4",
+      "2019-04-22T12:00,foo,bar,5",
+      "bad_timestamp,foo,,6"
+  );
+
+  private SamplerCache samplerCache;
+  private FirehoseSampler firehoseSampler;
+  private ParserType parserType;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Parameterized.Parameters(name = "parserType = {0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+        new Object[]{ParserType.MAP},
+        new Object[]{ParserType.STR_JSON},
+        new Object[]{ParserType.STR_CSV}
+    );
+  }
+
+  public FirehoseSamplerTest(ParserType parserType)
+  {
+    this.parserType = parserType;
+  }
+
+  @Before
+  public void setupTest()
+  {
+    samplerCache = new SamplerCache(MapCache.create(100000));
+    firehoseSampler = new FirehoseSampler(objectMapper, samplerCache);
+  }
+
+  @Test
+  public void testNoParams()
+  {
+    expectedException.expect(NullPointerException.class);
+    expectedException.expectMessage("firehoseFactory required");
+
+    firehoseSampler.sample(null, null, null);
+  }
+
+  @Test
+  public void testNoDataSchema()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, null, null);
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(0, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(6, response.getData().size());
+
+    List<SamplerResponseRow> data = response.getData();
+
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(0).toString(), null, true, null), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(1).toString(), null, true, null), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(2).toString(), null, true, null), data.get(2));
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(3).toString(), null, true, null), data.get(3));
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(4).toString(), null, true, null), data.get(4));
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(5).toString(), null, true, null), data.get(5));
+  }
+
+  @Test
+  public void testNoDataSchemaNumRows()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, null, new SamplerConfig(3, null, true, null));
+
+    Assert.assertNull(response.getCacheKey());
+    Assert.assertEquals(3, (int) response.getNumRowsRead());
+    Assert.assertEquals(0, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(3, response.getData().size());
+
+    List<SamplerResponseRow> data = response.getData();
+
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(0).toString(), null, true, null), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(1).toString(), null, true, null), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(2).toString(), null, true, null), data.get(2));
+  }
+
+  @Test
+  public void testNoDataSchemaNumRowsCacheReplay()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, null, new SamplerConfig(3, null, false, null));
+    String cacheKey = response.getCacheKey();
+
+    Assert.assertNotNull(cacheKey);
+    Assert.assertEquals(3, (int) response.getNumRowsRead());
+    Assert.assertEquals(0, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(3, response.getData().size());
+
+    List<SamplerResponseRow> data = response.getData();
+
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(0).toString(), null, true, null), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(1).toString(), null, true, null), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(getTestRows().get(2).toString(), null, true, null), data.get(2));
+
+    response = firehoseSampler.sample(firehoseFactory, null, new SamplerConfig(3, cacheKey, false, null));
+
+    Assert.assertTrue(!isCacheable() || cacheKey.equals(response.getCacheKey()));
+    Assert.assertEquals(3, (int) response.getNumRowsRead());
+    Assert.assertEquals(0, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(3, response.getData().size());
+    Assert.assertEquals(data, response.getData());
+  }
+
+  @Test
+  public void testMissingValueTimestampSpec()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(new TimestampSpec(null, null, DateTimes.of("1970")), new DimensionsSpec(null));
+    DataSchema dataSchema = new DataSchema("sampler", getParser(parseSpec), null, null, null, objectMapper);
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(6, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(6, response.getData().size());
+
+    List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo", "met1", "1"),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(1).toString(),
+        ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo", "met1", "2"),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(2).toString(),
+        ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:01", "dim1", "foo", "met1", "3"),
+        null,
+        null
+    ), data.get(2));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(3).toString(),
+        ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo2", "met1", "4"),
+        null,
+        null
+    ), data.get(3));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(4).toString(),
+        ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo", "dim2", "bar", "met1", "5"),
+        null,
+        null
+    ), data.get(4));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        ImmutableMap.of("__time", 0L, "t", "bad_timestamp", "dim1", "foo", "met1", "6"),
+        null,
+        null
+    ), data.get(5));
+  }
+
+  @Test
+  public void testWithTimestampSpec()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(new TimestampSpec("t", null, null), new DimensionsSpec(null));
+    DataSchema dataSchema = new DataSchema("sampler", getParser(parseSpec), null, null, null, objectMapper);
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(5, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(6, response.getData().size());
+
+    List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "1"),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(1).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "2"),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(2).toString(),
+        ImmutableMap.of("__time", 1555934460000L, "dim1", "foo", "met1", "3"),
+        null,
+        null
+    ), data.get(2));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(3).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", "4"),
+        null,
+        null
+    ), data.get(3));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(4).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", "5"),
+        null,
+        null
+    ), data.get(4));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        null,
+        true,
+        getUnparseableTimestampString()
+    ), data.get(5));
+  }
+
+  @Test
+  public void testWithDimensionSpec()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(
+        new TimestampSpec("t", null, null),
+        new DimensionsSpec(ImmutableList.of(
+            StringDimensionSchema.create("dim1"),
+            StringDimensionSchema.create("met1")
+        ))
+    );
+    DataSchema dataSchema = new DataSchema("sampler", getParser(parseSpec), null, null, null, objectMapper);
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(5, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(6, response.getData().size());
+
+    List<SamplerResponseRow> data = response.getData();
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "1"),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(1).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "2"),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(2).toString(),
+        ImmutableMap.of("__time", 1555934460000L, "dim1", "foo", "met1", "3"),
+        null,
+        null
+    ), data.get(2));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(3).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", "4"),
+        null,
+        null
+    ), data.get(3));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(4).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "5"),
+        null,
+        null
+    ), data.get(4));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        null,
+        true,
+        getUnparseableTimestampString()
+    ), data.get(5));
+  }
+
+  @Test
+  public void testWithNoRollup()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(new TimestampSpec("t", null, null), new DimensionsSpec(null));
+    AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, false, null);
+    DataSchema dataSchema = new DataSchema(
+        "sampler",
+        getParser(parseSpec),
+        aggregatorFactories,
+        granularitySpec,
+        null,
+        objectMapper
+    );
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(5, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(6, response.getData().size());
+
+    List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 1L),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(1).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 2L),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(2).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 3L),
+        null,
+        null
+    ), data.get(2));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(3).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
+        null,
+        null
+    ), data.get(3));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(4).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
+        null,
+        null
+    ), data.get(4));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        null,
+        true,
+        getUnparseableTimestampString()
+    ), data.get(5));
+  }
+
+  @Test
+  public void testWithRollup()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(new TimestampSpec("t", null, null), new DimensionsSpec(null));
+    AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
+    DataSchema dataSchema = new DataSchema(
+        "sampler",
+        getParser(parseSpec),
+        aggregatorFactories,
+        granularitySpec,
+        null,
+        objectMapper
+    );
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(5, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(4, response.getData().size());
+
+    List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 6L),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(3).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(4).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
+        null,
+        null
+    ), data.get(2));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        null,
+        true,
+        getUnparseableTimestampString()
+    ), data.get(3));
+  }
+
+  @Test
+  public void testWithMoreRollup()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(
+        new TimestampSpec("t", null, null),
+        new DimensionsSpec(ImmutableList.of(StringDimensionSchema.create("dim1")))
+    );
+    AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
+    DataSchema dataSchema = new DataSchema(
+        "sampler",
+        getParser(parseSpec),
+        aggregatorFactories,
+        granularitySpec,
+        null,
+        objectMapper
+    );
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(5, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(3, response.getData().size());
+
+    List<SamplerResponseRow> data = response.getData();
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 11L),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(3).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        null,
+        true,
+        getUnparseableTimestampString()
+    ), data.get(2));
+  }
+
+  @Test
+  public void testWithMoreRollupCacheReplay()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(
+        new TimestampSpec("t", null, null),
+        new DimensionsSpec(ImmutableList.of(StringDimensionSchema.create("dim1")))
+    );
+    AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
+    DataSchema dataSchema = new DataSchema(
+        "sampler",
+        getParser(parseSpec),
+        aggregatorFactories,
+        granularitySpec,
+        null,
+        objectMapper
+    );
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+    String cacheKey = response.getCacheKey();
+
+    response = firehoseSampler.sample(firehoseFactory, dataSchema, new SamplerConfig(null, cacheKey, false, null));
+
+    Assert.assertTrue(!isCacheable() || cacheKey.equals(response.getCacheKey()));
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(5, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(3, response.getData().size());
+
+    List<SamplerResponseRow> data = response.getData();
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 11L),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(3).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        null,
+        true,
+        getUnparseableTimestampString()
+    ), data.get(2));
+  }
+
+  @Test
+  public void testWithTransformsAutoDimensions()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(
+        new TimestampSpec("t", null, null),
+        new DimensionsSpec(null)
+    );
+    AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+
+    DataSchema dataSchema = new DataSchema(
+        "sampler",
+        getParser(parseSpec),
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec,
+        objectMapper
+    );
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(5, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(4, response.getData().size());
+
+    List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 6L),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(3).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(4).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
+        null,
+        null
+    ), data.get(2));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        null,
+        true,
+        getUnparseableTimestampString()
+    ), data.get(3));
+  }
+
+  @Test
+  public void testWithTransformsDimensionsSpec()
+  {
+    // There's a bug in the CSV parser that does not allow a column added by a transform to be put in the dimensions
+    // list if the 'columns' field is specified (it will complain that the dimensionName is not a valid column).
+    if (ParserType.STR_CSV.equals(parserType)) {
+      return;
+    }
+
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(
+        new TimestampSpec("t", null, null),
+        new DimensionsSpec(ImmutableList.of(StringDimensionSchema.create("dim1PlusBar")))
+    );
+    AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
+    );
+
+    DataSchema dataSchema = new DataSchema(
+        "sampler",
+        getParser(parseSpec),
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec,
+        objectMapper
+    );
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+
+    Assert.assertEquals(6, (int) response.getNumRowsRead());
+    Assert.assertEquals(5, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(3, response.getData().size());
+
+    List<SamplerResponseRow> data = response.getData();
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1PlusBar", "foobar", "met1", 11L),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(3).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1PlusBar", "foo2bar", "met1", 4L),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        null,
+        true,
+        getUnparseableTimestampString()
+    ), data.get(2));
+  }
+
+  @Test
+  public void testWithFilter()
+  {
+    FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
+
+    ParseSpec parseSpec = getParseSpec(
+        new TimestampSpec("t", null, null),
+        new DimensionsSpec(null)
+    );
+    AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
+    GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
+    TransformSpec transformSpec = new TransformSpec(new SelectorDimFilter("dim1", "foo", null), null);
+    DataSchema dataSchema = new DataSchema(
+        "sampler",
+        getParser(parseSpec),
+        aggregatorFactories,
+        granularitySpec,
+        transformSpec,
+        objectMapper
+    );
+
+    SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
+
+    Assert.assertEquals(5, (int) response.getNumRowsRead());
+    Assert.assertEquals(4, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(3, response.getData().size());
+
+    List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
+
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(0).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 6L),
+        null,
+        null
+    ), data.get(0));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(4).toString(),
+        ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
+        null,
+        null
+    ), data.get(1));
+    Assert.assertEquals(new SamplerResponseRow(
+        getTestRows().get(5).toString(),
+        null,
+        true,
+        getUnparseableTimestampString()
+    ), data.get(2));
+  }
+
+  private Map<String, Object> getParser(ParseSpec parseSpec)
+  {
+    return objectMapper.convertValue(
+        ParserType.MAP.equals(parserType)
+        ? new MapInputRowParser(parseSpec)
+        : new StringInputRowParser(parseSpec, StandardCharsets.UTF_8.name()),
+        new TypeReference<Map<String, Object>>()
+        {
+        }
+    );
+  }
+
+  private List<Object> getTestRows()
+  {
+    switch (parserType) {
+      case MAP:
+        return MAP_ROWS;
+      case STR_JSON:
+        return STR_JSON_ROWS;
+      case STR_CSV:
+        return STR_CSV_ROWS;
+      default:
+        throw new UnsupportedOperationException();
+    }
+  }
+
+  private FirehoseFactory<? extends InputRowParser> getFirehoseFactory(List<Object> seedRows)
+  {
+    return ParserType.MAP.equals(parserType)
+           ? new TestFirehose.TestFirehoseFactory(false, seedRows)
+           : new TestFirehose.TestAbstractTextFilesFirehoseFactory(false, seedRows);
+  }
+
+  private boolean isCacheable()
+  {
+    return !ParserType.MAP.equals(parserType);
+  }
+
+  private ParseSpec getParseSpec(TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec)
+  {
+    return ParserType.STR_CSV.equals(parserType) ? new DelimitedParseSpec(
+        timestampSpec,
+        dimensionsSpec,
+        ",",
+        null,
+        ImmutableList.of("t", "dim1", "dim2", "met1"),
+        false,
+        0
+    ) : new JSONParseSpec(timestampSpec, dimensionsSpec, null, null);
+  }
+
+  private String getUnparseableTimestampString()
+  {
+    return ParserType.STR_CSV.equals(parserType)
+           ? (USE_DEFAULT_VALUE_FOR_NULL
+              ? "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
+              : "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, dim2=, met1=6}")
+           : "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, met1=6}";
+  }
+
+  private List<SamplerResponseRow> removeEmptyColumns(List<SamplerResponseRow> rows)
+  {
+    return USE_DEFAULT_VALUE_FOR_NULL
+           ? rows
+           : rows.stream().map(x -> x.withParsed(removeEmptyValues(x.getParsed()))).collect(Collectors.toList());
+  }
+
+  @Nullable
+  private Map<String, Object> removeEmptyValues(Map<String, Object> data)
+  {
+    return data == null
+           ? null : data.entrySet()
+                        .stream()
+                        .filter(x -> !(x.getValue() instanceof String) || !((String) x.getValue()).isEmpty())
+                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java
new file mode 100644
index 0000000..1964a80
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.guice.FirehoseModule;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class IndexTaskSamplerSpecTest extends EasyMockSupport
+{
+  private static final ObjectMapper mapper = TestHelper.makeJsonMapper();
+
+  private final FirehoseSampler firehoseSampler = createMock(FirehoseSampler.class);
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  public IndexTaskSamplerSpecTest()
+  {
+    mapper.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(FirehoseSampler.class, firehoseSampler)
+            .addValue(ObjectMapper.class, mapper)
+    );
+    mapper.registerModules((Iterable<Module>) new SamplerModule().getJacksonModules());
+    mapper.registerModules((Iterable<Module>) new FirehoseModule().getJacksonModules());
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    String json = "{\n"
+                  + "  \"type\": \"index\",\n"
+                  + "  \"samplerConfig\": {\n"
+                  + "    \"numRows\": 123,\n"
+                  + "    \"cacheKey\": \"eaebbfd87ec34bc6a9f8c03ecee4dd7a\",\n"
+                  + "    \"skipCache\": false,\n"
+                  + "    \"timeoutMs\": 2345\n"
+                  + "  },\n"
+                  + "  \"spec\": {\n"
+                  + "    \"dataSchema\": {\n"
+                  + "      \"dataSource\": \"sampler\",\n"
+                  + "      \"parser\": {\n"
+                  + "        \"type\": \"string\",\n"
+                  + "        \"parseSpec\": {\n"
+                  + "          \"format\": \"json\",\n"
+                  + "          \"dimensionsSpec\": {},\n"
+                  + "          \"timestampSpec\": {\n"
+                  + "            \"missingValue\": \"1970\"\n"
+                  + "          }\n"
+                  + "        }\n"
+                  + "      }\n"
+                  + "    },\n"
+                  + "    \"ioConfig\": {\n"
+                  + "      \"type\": \"index\",\n"
+                  + "      \"firehose\": {\n"
+                  + "        \"type\": \"local\",\n"
+                  + "        \"baseDir\": \"/tmp\",\n"
+                  + "        \"filter\": \"wikiticker-2015-09-12-sampled.json\"\n"
+                  + "      }\n"
+                  + "    }\n"
+                  + "  }\n"
+                  + "}";
+
+    Capture<FirehoseFactory> capturedFirehoseFactory = EasyMock.newCapture();
+    Capture<DataSchema> capturedDataSchema = EasyMock.newCapture();
+    Capture<SamplerConfig> capturedSamplerConfig = EasyMock.newCapture();
+
+    IndexTaskSamplerSpec spec = mapper.readValue(json, IndexTaskSamplerSpec.class);
+
+    EasyMock.expect(firehoseSampler.sample(
+        EasyMock.capture(capturedFirehoseFactory),
+        EasyMock.capture(capturedDataSchema),
+        EasyMock.capture(capturedSamplerConfig)
+    )).andReturn(new SamplerResponse(null, null, null, null));
+
+    replayAll();
+
+    spec.sample();
+    verifyAll();
+
+    FirehoseFactory firehoseFactory = capturedFirehoseFactory.getValue();
+    Assert.assertEquals(new File("/tmp"), ((LocalFirehoseFactory) firehoseFactory).getBaseDir());
+    Assert.assertEquals("wikiticker-2015-09-12-sampled.json", ((LocalFirehoseFactory) firehoseFactory).getFilter());
+
+    DataSchema dataSchema = capturedDataSchema.getValue();
+    Assert.assertEquals("sampler", dataSchema.getDataSource());
+    Assert.assertEquals("json", ((Map) dataSchema.getParserMap().get("parseSpec")).get("format"));
+
+    SamplerConfig samplerConfig = capturedSamplerConfig.getValue();
+    Assert.assertEquals(123, samplerConfig.getNumRows());
+    Assert.assertEquals("eaebbfd87ec34bc6a9f8c03ecee4dd7a", samplerConfig.getCacheKey());
+    Assert.assertFalse(samplerConfig.isSkipCache());
+    Assert.assertEquals(2345, samplerConfig.getTimeoutMs());
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerCacheTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerCacheTest.java
new file mode 100644
index 0000000..ff53cf4
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerCacheTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class SamplerCacheTest
+{
+  private static final String KEY_1 = "abcdefghijklmnopqrstuvwxyz";
+  private static final String KEY_2 = "1234567890!@#$%^&*()";
+
+  private static final byte[] VALUE_1_1 = StringUtils.toUtf8("The quick");
+  private static final byte[] VALUE_1_2 = StringUtils.toUtf8("brown fox");
+  private static final byte[] VALUE_1_3 = StringUtils.toUtf8("jumps over");
+  private static final byte[] VALUE_2_1 = StringUtils.toUtf8("the lazy");
+  private static final byte[] VALUE_2_2 = StringUtils.toUtf8("Druid");
+
+  private static final StringInputRowParser PARSER = new StringInputRowParser(
+      new CSVParseSpec(
+          new TimestampSpec(null, null, DateTimes.of("1970")),
+          new DimensionsSpec(null),
+          null,
+          ImmutableList.of("col"),
+          false,
+          0
+      ),
+      StandardCharsets.UTF_8.name()
+  );
+
+  private SamplerCache cache;
+  private File tempDir;
+
+  @Before
+  public void setupTest()
+  {
+    cache = new SamplerCache(MapCache.create(100000));
+    tempDir = Files.createTempDir();
+  }
+
+  @After
+  public void teardownTest() throws IOException
+  {
+    FileUtils.deleteDirectory(tempDir);
+  }
+
+  @Test
+  public void testOneEntryNextRowWithRaw() throws IOException
+  {
+    cache.put(KEY_1, ImmutableList.of(VALUE_1_1, VALUE_1_2, VALUE_1_3));
+
+    for (int i = 0; i < 4; i++) {
+      Firehose firehose1 = cache.getAsFirehoseFactory(KEY_1, PARSER).connectForSampler(PARSER, tempDir);
+
+      Assert.assertTrue(firehose1.hasMore());
+
+      InputRowPlusRaw row = firehose1.nextRowWithRaw();
+      Assert.assertArrayEquals(VALUE_1_1, row.getRaw());
+      Assert.assertEquals("The quick", row.getInputRow().getDimension("col").get(0));
+      row = firehose1.nextRowWithRaw();
+      Assert.assertArrayEquals(VALUE_1_2, row.getRaw());
+      Assert.assertEquals("brown fox", row.getInputRow().getDimension("col").get(0));
+      row = firehose1.nextRowWithRaw();
+      Assert.assertArrayEquals(VALUE_1_3, row.getRaw());
+      Assert.assertEquals("jumps over", row.getInputRow().getDimension("col").get(0));
+
+      Assert.assertFalse(firehose1.hasMore());
+
+      firehose1.close();
+
+      if (i % 2 == 1) {
+        FirehoseFactory firehoseFactory2 = cache.getAsFirehoseFactory(KEY_2, PARSER);
+        Assert.assertNull(firehoseFactory2);
+      }
+    }
+  }
+
+  @Test
+  public void testOneEntryNextRow() throws IOException
+  {
+    cache.put(KEY_1, ImmutableList.of(VALUE_1_1, VALUE_1_2, VALUE_1_3));
+
+    Firehose firehose = cache.getAsFirehoseFactory(KEY_1, PARSER).connectForSampler(PARSER, tempDir);
+
+    Assert.assertTrue(firehose.hasMore());
+
+    InputRow row = firehose.nextRow();
+    Assert.assertEquals("The quick", row.getDimension("col").get(0));
+    row = firehose.nextRow();
+    Assert.assertEquals("brown fox", row.getDimension("col").get(0));
+    row = firehose.nextRow();
+    Assert.assertEquals("jumps over", row.getDimension("col").get(0));
+
+    Assert.assertFalse(firehose.hasMore());
+
+    firehose.close();
+  }
+
+  @Test
+  public void testTwoEntriesNextRowWithRaw() throws IOException
+  {
+    cache.put(KEY_1, ImmutableList.of(VALUE_1_1, VALUE_1_2, VALUE_1_3));
+    cache.put(KEY_2, ImmutableList.of(VALUE_2_1, VALUE_2_2));
+
+    for (int i = 0; i < 4; i++) {
+      Firehose firehose1 = cache.getAsFirehoseFactory(KEY_1, PARSER).connectForSampler(PARSER, tempDir);
+
+      Assert.assertTrue(firehose1.hasMore());
+
+      InputRowPlusRaw row = firehose1.nextRowWithRaw();
+      Assert.assertArrayEquals(VALUE_1_1, row.getRaw());
+      Assert.assertEquals("The quick", row.getInputRow().getDimension("col").get(0));
+      row = firehose1.nextRowWithRaw();
+      Assert.assertArrayEquals(VALUE_1_2, row.getRaw());
+      Assert.assertEquals("brown fox", row.getInputRow().getDimension("col").get(0));
+      row = firehose1.nextRowWithRaw();
+      Assert.assertArrayEquals(VALUE_1_3, row.getRaw());
+      Assert.assertEquals("jumps over", row.getInputRow().getDimension("col").get(0));
+
+      Assert.assertFalse(firehose1.hasMore());
+
+      firehose1.close();
+
+      Firehose firehose2 = cache.getAsFirehoseFactory(KEY_2, PARSER).connectForSampler(PARSER, tempDir);
+
+      Assert.assertTrue(firehose2.hasMore());
+
+      row = firehose2.nextRowWithRaw();
+      Assert.assertArrayEquals(VALUE_2_1, row.getRaw());
+      Assert.assertEquals("the lazy", row.getInputRow().getDimension("col").get(0));
+      row = firehose2.nextRowWithRaw();
+      Assert.assertArrayEquals(VALUE_2_2, row.getRaw());
+      Assert.assertEquals("Druid", row.getInputRow().getDimension("col").get(0));
+
+      Assert.assertFalse(firehose2.hasMore());
+
+      firehose2.close();
+    }
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java
new file mode 100644
index 0000000..503045d
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.sampler;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SamplerResponseTest
+{
+  private static final ObjectMapper mapper = TestHelper.makeJsonMapper();
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    List<SamplerResponse.SamplerResponseRow> data = ImmutableList.of(
+        new SamplerResponse.SamplerResponseRow(
+            "parsed1",
+            ImmutableMap.of("t", 123456, "dim1", "foo", "met1", 6),
+            null,
+            null
+        ),
+        new SamplerResponse.SamplerResponseRow(
+            "parsed2",
+            ImmutableMap.of("t", 123457, "dim1", "foo2", "met1", 7),
+            null,
+            null
+        ),
+        new SamplerResponse.SamplerResponseRow("unparsed", null, true, "Could not parse")
+    );
+
+    String out = mapper.writeValueAsString(new SamplerResponse("eaebbfd87ec34bc6a9f8c03ecee4dd7a", 1123, 1112, data));
+    String expected = "{\"cacheKey\":\"eaebbfd87ec34bc6a9f8c03ecee4dd7a\",\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"data\":[{\"raw\":\"parsed1\",\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"raw\":\"parsed2\",\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"raw\":\"unparsed\",\"unparseable\":true,\"error\":\"Could not parse\"}]}";
+
+    Assert.assertEquals(expected, out);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
index 705b90e..2ace21c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
@@ -25,6 +25,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -63,7 +64,13 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
   @Override
   public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException
   {
-    return new TimedShutoffFirehose(parser, temporaryDirectory);
+    return new TimedShutoffFirehose(parser, temporaryDirectory, false);
+  }
+
+  @Override
+  public Firehose connectForSampler(InputRowParser parser, File temporaryDirectory) throws IOException
+  {
+    return new TimedShutoffFirehose(parser, temporaryDirectory, true);
   }
 
   class TimedShutoffFirehose implements Firehose
@@ -73,9 +80,11 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
     @GuardedBy("this")
     private boolean closed = false;
 
-    TimedShutoffFirehose(InputRowParser parser, File temporaryDirectory) throws IOException
+    TimedShutoffFirehose(InputRowParser parser, File temporaryDirectory, boolean sampling) throws IOException
     {
-      firehose = delegateFactory.connect(parser, temporaryDirectory);
+      firehose = sampling
+                 ? delegateFactory.connectForSampler(parser, temporaryDirectory)
+                 : delegateFactory.connect(parser, temporaryDirectory);
 
       shutdownExec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d");
 
@@ -111,6 +120,12 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
     }
 
     @Override
+    public InputRowPlusRaw nextRowWithRaw()
+    {
+      return firehose.nextRowWithRaw();
+    }
+
+    @Override
     public Runnable commit()
     {
       return firehose.commit();
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 4a1c22a..a0f565f 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -87,6 +87,7 @@ import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
 import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerResource;
 import org.apache.druid.indexing.overlord.http.OverlordRedirectInfo;
 import org.apache.druid.indexing.overlord.http.OverlordResource;
+import org.apache.druid.indexing.overlord.sampler.SamplerModule;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
@@ -331,7 +332,8 @@ public class CliOverlord extends ServerRunnable
           }
         },
         new IndexingServiceFirehoseModule(),
-        new IndexingServiceTaskLogsModule()
+        new IndexingServiceTaskLogsModule(),
+        new SamplerModule()
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org