You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/02/28 17:06:42 UTC

[druid] branch master updated: Make ParseExceptions more informative (#12259)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f709db  Make ParseExceptions more informative (#12259)
3f709db is described below

commit 3f709db173d779db1466e57a1564a5f557b4b0cf
Author: Laksh Singla <30...@users.noreply.github.com>
AuthorDate: Mon Feb 28 22:31:15 2022 +0530

    Make ParseExceptions more informative (#12259)
    
    This PR aims to make the ParseExceptions in Druid more informative, by adding additional information (metadata) to the ParseException, which can contain additional information about the exception. For example - the path of the file generating the issue, the line number (where it can be easily fetched - like CsvReader)
    
    Following changes are addressed in this PR:
    
    A new class CloseableIteratorWithMetadata has been created which is like CloseableIterator but also has a metadata method that returns a context Map<String, Object> about the current element returned by next().
    IntermediateRowParsingReader#read() now attaches the InputEntity and the "record number" which created the exception (while parsing them), and IntermediateRowParsingReader#sample attaches the InputEntity (but not the "record number").
    TextReader (and its subclasses), which is a specific implementation of the IntermediateRowParsingReader also include the line number which caused the generation of the error.
    This will also help in triaging the issues when InputSourceReader generates ParseException because it can point to the specific InputEntity which caused the exception (while trying to read it).
---
 .../data/input/IntermediateRowParsingReader.java   | 199 +++++++++++++++++----
 .../org/apache/druid/data/input/TextReader.java    |  25 ++-
 .../apache/druid/data/input/impl/JsonReader.java   |   6 +
 .../parsers/CloseableIteratorWithMetadata.java     |  81 +++++++++
 .../druid/data/input/avro/AvroOCFReader.java       |   6 +
 .../druid/data/input/avro/AvroStreamReader.java    |   7 +
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  10 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  12 +-
 .../org/apache/druid/data/input/orc/OrcReader.java |   6 +
 .../druid/data/input/parquet/ParquetReader.java    |   6 +
 .../druid/data/input/protobuf/ProtobufReader.java  |   6 +
 .../druid/indexing/input/DruidSegmentReader.java   |   6 +
 .../druid/indexing/common/task/IndexTaskTest.java  | 161 +++++++++++------
 .../parallel/SinglePhaseParallelIndexingTest.java  |  12 +-
 .../overlord/sampler/InputSourceSamplerTest.java   |  45 +++--
 .../org/apache/druid/metadata/input/SqlReader.java |   6 +
 16 files changed, 479 insertions(+), 115 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
index 935c985..de0b58a 100644
--- a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
@@ -19,10 +19,15 @@
 
 package org.apache.druid.data.input;
 
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.utils.CollectionUtils;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -41,7 +46,7 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
   @Override
   public CloseableIterator<InputRow> read() throws IOException
   {
-    final CloseableIterator<T> intermediateRowIterator = intermediateRowIterator();
+    final CloseableIteratorWithMetadata<T> intermediateRowIteratorWithMetadata = intermediateRowIteratorWithMetadata();
 
     return new CloseableIterator<InputRow>()
     {
@@ -52,28 +57,42 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
       // good idea. Subclasses could implement read() with some duplicate codes to avoid unnecessary iteration on
       // a singleton list.
       Iterator<InputRow> rows = null;
+      long currentRecordNumber = 1;
 
       @Override
       public boolean hasNext()
       {
         if (rows == null || !rows.hasNext()) {
-          if (!intermediateRowIterator.hasNext()) {
+          if (!intermediateRowIteratorWithMetadata.hasNext()) {
             return false;
           }
-          final T row = intermediateRowIterator.next();
+          final T row = intermediateRowIteratorWithMetadata.next();
           try {
             rows = parseInputRows(row).iterator();
+            ++currentRecordNumber;
           }
           catch (IOException e) {
+            final Map<String, Object> metadata = intermediateRowIteratorWithMetadata.currentMetadata();
             rows = new ExceptionThrowingIterator(new ParseException(
                 String.valueOf(row),
                 e,
-                "Unable to parse row [%s]",
-                row
+                buildParseExceptionMessage(
+                    StringUtils.format("Unable to parse row [%s]", row),
+                    source(),
+                    currentRecordNumber,
+                    metadata
+                )
             ));
           }
           catch (ParseException e) {
-            rows = new ExceptionThrowingIterator(e);
+            final Map<String, Object> metadata = intermediateRowIteratorWithMetadata.currentMetadata();
+            // Replace the message of the ParseException e
+            rows = new ExceptionThrowingIterator(
+                new ParseException(
+                    e.getInput(),
+                    e.isFromPartiallyValidRow(),
+                    buildParseExceptionMessage(e.getMessage(), source(), currentRecordNumber, metadata)
+                ));
           }
         }
 
@@ -93,7 +112,7 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
       @Override
       public void close() throws IOException
       {
-        intermediateRowIterator.close();
+        intermediateRowIteratorWithMetadata.close();
       }
     };
   }
@@ -101,43 +120,128 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
   @Override
   public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
   {
-    return intermediateRowIterator().map(row -> {
 
-      final List<Map<String, Object>> rawColumnsList;
-      try {
-        rawColumnsList = toMap(row);
-      }
-      catch (Exception e) {
-        return InputRowListPlusRawValues.of(null,
-                                            new ParseException(String.valueOf(row), e, "Unable to parse row [%s] into JSON", row));
-      }
+    final CloseableIteratorWithMetadata<T> delegate = intermediateRowIteratorWithMetadata();
 
-      if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
-        return InputRowListPlusRawValues.of(null,
-                                            new ParseException(String.valueOf(row), "No map object parsed for row [%s]", row));
+    return new CloseableIterator<InputRowListPlusRawValues>()
+    {
+      @Override
+      public void close() throws IOException
+      {
+        delegate.close();
       }
 
-      List<InputRow> rows;
-      try {
-        rows = parseInputRows(row);
-      }
-      catch (ParseException e) {
-        return InputRowListPlusRawValues.ofList(rawColumnsList, e);
+      @Override
+      public boolean hasNext()
+      {
+        return delegate.hasNext();
       }
-      catch (IOException e) {
-        ParseException exception = new ParseException(String.valueOf(row), e, "Unable to parse row [%s] into inputRow", row);
-        return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+
+      @Override
+      public InputRowListPlusRawValues next()
+      {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+
+        return sampleIntermediateRow(delegate.next(), delegate.currentMetadata());
       }
+    };
+  }
+
+  /**
+   * Parses and samples the intermediate row and returns input row and the raw values in it. Metadata supplied can
+   * contain information about the source which will get surfaced in case an exception occurs while parsing the
+   * intermediate row
+   *
+   * @param row intermediate row
+   * @param metadata additional information about the source and the record getting parsed
+   * @return sampled data from the intermediate row
+   */
+  private InputRowListPlusRawValues sampleIntermediateRow(T row, Map<String, Object> metadata)
+  {
+
+    final List<Map<String, Object>> rawColumnsList;
+    try {
+      rawColumnsList = toMap(row);
+    }
+    catch (Exception e) {
+      return InputRowListPlusRawValues.of(
+          null,
+          new ParseException(String.valueOf(row), e, buildParseExceptionMessage(
+              StringUtils.nonStrictFormat("Unable to parse row [%s] into JSON", row),
+              source(),
+              null,
+              metadata
+          ))
+      );
+    }
 
-      return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
-    });
+    if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
+      return InputRowListPlusRawValues.of(
+          null,
+          new ParseException(String.valueOf(row), buildParseExceptionMessage(
+              StringUtils.nonStrictFormat("No map object parsed for row [%s]", row),
+              source(),
+              null,
+              metadata
+          ))
+      );
+    }
+
+    List<InputRow> rows;
+    try {
+      rows = parseInputRows(row);
+    }
+    catch (ParseException e) {
+      return InputRowListPlusRawValues.ofList(rawColumnsList, new ParseException(
+          String.valueOf(row),
+          e,
+          buildParseExceptionMessage(e.getMessage(), source(), null, metadata)
+      ));
+    }
+    catch (IOException e) {
+      ParseException exception = new ParseException(String.valueOf(row), e, buildParseExceptionMessage(
+          StringUtils.nonStrictFormat("Unable to parse row [%s] into inputRow", row),
+          source(),
+          null,
+          metadata
+      ));
+      return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+    }
+
+    return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
   }
 
   /**
    * Creates an iterator of intermediate rows. The returned rows will be consumed by {@link #parseInputRows} and
-   * {@link #toMap}.
+   * {@link #toMap}. Either this or {@link #intermediateRowIteratorWithMetadata()} should be implemented
+   */
+  protected CloseableIterator<T> intermediateRowIterator() throws IOException
+  {
+    throw new UOE("intermediateRowIterator not implemented");
+  }
+
+  /**
+   * Same as {@code intermediateRowIterator}, but it also contains the metadata such as the line number to generate
+   * more informative {@link ParseException}.
    */
-  protected abstract CloseableIterator<T> intermediateRowIterator() throws IOException;
+  protected CloseableIteratorWithMetadata<T> intermediateRowIteratorWithMetadata() throws IOException
+  {
+    return CloseableIteratorWithMetadata.withEmptyMetadata(intermediateRowIterator());
+  }
+
+  /**
+   * @return InputEntity which the implementation is reading from. Useful in generating informative {@link ParseException}s.
+   * For example, in case of {@link org.apache.druid.data.input.impl.FileEntity}, file name containing erroneous records
+   * or in case of {@link org.apache.druid.data.input.impl.HttpEntity}, the endpoint containing the erroneous data can
+   * be attached to the error message
+   */
+  @Nullable
+  protected InputEntity source()
+  {
+    return null;
+  }
 
   /**
    * Parses the given intermediate row into a list of {@link InputRow}s.
@@ -155,6 +259,37 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
    */
   protected abstract List<Map<String, Object>> toMap(T intermediateRow) throws IOException;
 
+  /**
+   * A helper method which enriches the base parse exception message with additional information. The returned message
+   * has a format: "baseExceptionMessage (key1: value1, key2: value2)" if additional properties are present. Else it
+   * returns the baseException message without any modification
+   */
+  private static String buildParseExceptionMessage(
+      @Nonnull String baseExceptionMessage,
+      @Nullable InputEntity source,
+      @Nullable Long recordNumber,
+      @Nullable Map<String, Object> metadata
+  )
+  {
+    StringBuilder sb = new StringBuilder();
+    if (source != null && source.getUri() != null) {
+      sb.append(StringUtils.format("Path: %s, ", source.getUri()));
+    }
+    if (recordNumber != null) {
+      sb.append(StringUtils.format("Record: %d, ", recordNumber));
+    }
+    if (metadata != null) {
+      metadata.entrySet().stream()
+              .map(entry -> StringUtils.format("%s: %s, ", entry.getKey(), entry.getValue().toString()))
+              .forEach(sb::append);
+    }
+    if (sb.length() == 0) {
+      return baseExceptionMessage;
+    }
+    sb.setLength(sb.length() - 2); // Erase the last stray ", "
+    return baseExceptionMessage + " (" + sb + ")"; // Wrap extra information in a bracket before returning
+  }
+
   private static class ExceptionThrowingIterator implements CloseableIterator<InputRow>
   {
     private final Exception exception;
diff --git a/core/src/main/java/org/apache/druid/data/input/TextReader.java b/core/src/main/java/org/apache/druid/data/input/TextReader.java
index 7214d32..e6f541c 100644
--- a/core/src/main/java/org/apache/druid/data/input/TextReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/TextReader.java
@@ -22,14 +22,16 @@ package org.apache.druid.data.input;
 import com.google.common.base.Strings;
 import org.apache.commons.io.LineIterator;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.common.parsers.ParserUtils;
 
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Abstract {@link InputEntityReader} for text format readers such as CSV or JSON.
@@ -51,8 +53,7 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
   }
 
   @Override
-  public CloseableIterator<String> intermediateRowIterator()
-      throws IOException
+  public CloseableIteratorWithMetadata<String> intermediateRowIteratorWithMetadata() throws IOException
   {
     final LineIterator delegate = new LineIterator(
         new InputStreamReader(source.open(), StringUtils.UTF8_STRING)
@@ -65,8 +66,17 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
       processHeaderLine(delegate.nextLine());
     }
 
-    return new CloseableIterator<String>()
+    return new CloseableIteratorWithMetadata<String>()
     {
+      private static final String LINE_KEY = "Line";
+      private long currentLineNumber = numHeaderLines + (needsToProcessHeaderLine() ? 1 : 0);
+
+      @Override
+      public Map<String, Object> currentMetadata()
+      {
+        return Collections.singletonMap(LINE_KEY, currentLineNumber);
+      }
+
       @Override
       public boolean hasNext()
       {
@@ -76,6 +86,7 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
       @Override
       public String next()
       {
+        currentLineNumber++;
         return delegate.nextLine();
       }
 
@@ -87,6 +98,12 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
     };
   }
 
+  @Override
+  protected InputEntity source()
+  {
+    return source;
+  }
+
   /**
    * Parses the given line into a list of {@link InputRow}s. Note that some file formats can explode a single line of
    * input into multiple inputRows.
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
index 0437103..8d0f667 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
@@ -91,6 +91,12 @@ public class JsonReader extends IntermediateRowParsingReader<String>
   }
 
   @Override
+  protected InputEntity source()
+  {
+    return source;
+  }
+
+  @Override
   protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
   {
     final List<InputRow> inputRows;
diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
new file mode 100644
index 0000000..e1e4895
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Like {@link CloseableIterator}, but has a currentMetadata() method, which returns "metadata", which is effectively a Map<String, Object>
+ * about the source of last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root cause of a parse exception.
+ * So it can include information that helps with such exercise. For example, for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be passed here so for kafka it could be an offset.
+ * The source information is already available via {@link IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+
+  /**
+   * @return A map containing the information about the source of the last value returned by {@link #next()}
+   */
+  Map<String, Object> currentMetadata();
+
+  /**
+   * Creates an instance of CloseableIteratorWithMetadata from a {@link CloseableIterator}. {@link #currentMetadata()}
+   * for the instance is guaranteed to return an empty map
+   */
+  static <T> CloseableIteratorWithMetadata<T> withEmptyMetadata(CloseableIterator<T> delegate)
+  {
+    return new CloseableIteratorWithMetadata<T>()
+    {
+
+      @Override
+      public Map<String, Object> currentMetadata()
+      {
+        return Collections.emptyMap();
+      }
+
+      @Override
+      public void close() throws IOException
+      {
+        delegate.close();
+      }
+
+      @Override
+      public boolean hasNext()
+      {
+        return delegate.hasNext();
+      }
+
+      @Override
+      public T next()
+      {
+        return delegate.next();
+      }
+    };
+  }
+}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index 09888f7..66552f9 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -112,6 +112,12 @@ public class AvroOCFReader extends IntermediateRowParsingReader<GenericRecord>
   }
 
   @Override
+  protected InputEntity source()
+  {
+    return source;
+  }
+
+  @Override
   protected List<InputRow> parseInputRows(GenericRecord intermediateRow) throws ParseException
   {
     return Collections.singletonList(
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
index 50da0e5..4ed6a8a 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
@@ -72,6 +72,13 @@ public class AvroStreamReader extends IntermediateRowParsingReader<GenericRecord
   }
 
   @Override
+  protected InputEntity source()
+  {
+    return source;
+  }
+
+
+  @Override
   protected List<InputRow> parseInputRows(GenericRecord intermediateRow) throws ParseException
   {
     return Collections.singletonList(
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 629358f..94c0106 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1579,9 +1579,9 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
         "Unable to parse value[notanumber] for field[met1]",
         "could not convert value [notanumber] to float",
         "could not convert value [notanumber] to long",
-        "Unable to parse [] as the intermediateRow resulted in empty input row",
-        "Unable to parse row [unparseable]",
-        "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
+        "Unable to parse [] as the intermediateRow resulted in empty input row (Record: 1)",
+        "Unable to parse row [unparseable] (Record: 1)",
+        "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}] (Record: 1)"
     );
     List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
@@ -1665,8 +1665,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
         .get(RowIngestionMeters.BUILD_SEGMENTS);
 
     List<String> expectedMessages = Arrays.asList(
-        "Unable to parse [] as the intermediateRow resulted in empty input row",
-        "Unable to parse row [unparseable]"
+        "Unable to parse [] as the intermediateRow resulted in empty input row (Record: 1)",
+        "Unable to parse row [unparseable] (Record: 1)"
     );
     List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 8a90862..db7c927 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -1351,10 +1351,10 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
         "Unable to parse value[notanumber] for field[met1]",
         "could not convert value [notanumber] to float",
         "could not convert value [notanumber] to long",
-        "Timestamp[null] is unparseable! Event: {}",
-        "Unable to parse [] as the intermediateRow resulted in empty input row",
-        "Unable to parse row [unparseable]",
-        "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
+        "Timestamp[null] is unparseable! Event: {} (Record: 1)",
+        "Unable to parse [] as the intermediateRow resulted in empty input row (Record: 1)",
+        "Unable to parse row [unparseable] (Record: 1)",
+        "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}] (Record: 1)"
     );
     List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
@@ -1457,8 +1457,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
         .get(RowIngestionMeters.BUILD_SEGMENTS);
 
     List<String> expectedMessages = Arrays.asList(
-        "Unable to parse [] as the intermediateRow resulted in empty input row",
-        "Unable to parse row [unparseable]"
+        "Unable to parse [] as the intermediateRow resulted in empty input row (Record: 1)",
+        "Unable to parse row [unparseable] (Record: 1)"
     );
     List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
index ca10988..5bcec8c 100644
--- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -144,6 +144,12 @@ public class OrcReader extends IntermediateRowParsingReader<OrcStruct>
   }
 
   @Override
+  protected InputEntity source()
+  {
+    return source;
+  }
+
+  @Override
   protected List<InputRow> parseInputRows(OrcStruct intermediateRow) throws ParseException
   {
     return Collections.singletonList(
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index bbd45f6..aced110 100644
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -133,6 +133,12 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
   }
 
   @Override
+  protected InputEntity source()
+  {
+    return source;
+  }
+
+  @Override
   protected List<InputRow> parseInputRows(Group intermediateRow) throws ParseException
   {
     return Collections.singletonList(
diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
index e24579f..d512d10 100644
--- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
+++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
@@ -85,6 +85,12 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
   }
 
   @Override
+  protected InputEntity source()
+  {
+    return source;
+  }
+
+  @Override
   protected List<InputRow> parseInputRows(DynamicMessage intermediateRow) throws ParseException, JsonProcessingException
   {
     Map<String, Object> record;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
index 322526b..7117eea 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
@@ -142,6 +142,12 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
   }
 
   @Override
+  protected InputEntity source()
+  {
+    return source;
+  }
+
+  @Override
   protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow) throws ParseException
   {
     return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, intermediateRow));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index b714947..9168f53 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -1323,6 +1323,7 @@ public class IndexTaskTest extends IngestionTestBase
     // report parse exception
     final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true);
     final IndexIngestionSpec indexIngestionSpec;
+    List<String> expectedMessages;
     if (useInputFormatApi) {
       indexIngestionSpec = createIngestionSpec(
           jsonMapper,
@@ -1336,6 +1337,12 @@ public class IndexTaskTest extends IngestionTestBase
           false,
           false
       );
+      expectedMessages = ImmutableList.of(
+          StringUtils.format(
+              "Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a, val=1} (Path: %s, Record: 1, Line: 2)",
+              tmpFile.toURI()
+          )
+      );
     } else {
       indexIngestionSpec = createIngestionSpec(
           jsonMapper,
@@ -1347,6 +1354,9 @@ public class IndexTaskTest extends IngestionTestBase
           false,
           false
       );
+      expectedMessages = ImmutableList.of(
+          "Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a, val=1}"
+      );
     }
 
     IndexTask indexTask = new IndexTask(
@@ -1366,9 +1376,6 @@ public class IndexTaskTest extends IngestionTestBase
         .getUnparseableEvents()
         .get(RowIngestionMeters.BUILD_SEGMENTS);
 
-    List<String> expectedMessages = ImmutableList.of(
-        "Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a, val=1}"
-    );
     List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
     }).collect(Collectors.toList());
@@ -1500,39 +1507,42 @@ public class IndexTaskTest extends IngestionTestBase
     );
     Assert.assertEquals(expectedMetrics, reportData.getRowStats());
 
-    Map<String, Object> expectedUnparseables = ImmutableMap.of(
-        RowIngestionMeters.DETERMINE_PARTITIONS,
-        Arrays.asList(
-            "Unable to parse row [this is not JSON]",
-            "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
-            "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
-            "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
-        ),
-        RowIngestionMeters.BUILD_SEGMENTS,
-        Arrays.asList(
-            "Unable to parse row [this is not JSON]",
-            "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
-            "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
-            "Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=4.0, val=notnumber}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [Unable to parse value[notnumber] for field[val]]",
-            "Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=notnumber, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to float]",
-            "Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=notnumber, dimFloat=3.0, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long]",
-            "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
-        )
-    );
-
     List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
         .getUnparseableEvents()
         .get(RowIngestionMeters.BUILD_SEGMENTS);
 
-    List<String> expectedMessages = Arrays.asList(
-        "Unable to parse row [this is not JSON]",
-        "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
-        "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
-        "Unable to parse value[notnumber] for field[val]",
-        "could not convert value [notnumber] to float",
-        "could not convert value [notnumber] to long",
-        "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
-    );
+    List<String> expectedMessages;
+    if (useInputFormatApi) {
+      expectedMessages = Arrays.asList(
+          StringUtils.format("Unable to parse row [this is not JSON] (Path: %s, Record: 6, Line: 9)", tmpFile.toURI()),
+          StringUtils.format(
+              "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 6, Line: 8)",
+              tmpFile.toURI()
+          ),
+          StringUtils.format(
+              "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}] (Path: %s, Record: 5, Line: 6)",
+              tmpFile.toURI()
+          ),
+          "Unable to parse value[notnumber] for field[val]",
+          "could not convert value [notnumber] to float",
+          "could not convert value [notnumber] to long",
+          StringUtils.format(
+              "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, Line: 1)",
+              tmpFile.toURI()
+          )
+      );
+    } else {
+      expectedMessages = Arrays.asList(
+          "Unable to parse row [this is not JSON]",
+          "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+          "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
+          "Unable to parse value[notnumber] for field[val]",
+          "could not convert value [notnumber] to float",
+          "could not convert value [notnumber] to long",
+          "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+      );
+    }
+
     List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
     }).collect(Collectors.toList());
@@ -1556,12 +1566,31 @@ public class IndexTaskTest extends IngestionTestBase
         .getUnparseableEvents()
         .get(RowIngestionMeters.DETERMINE_PARTITIONS);
 
-    expectedMessages = Arrays.asList(
-        "Unable to parse row [this is not JSON]",
-        "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
-        "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
-        "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
-    );
+    if (useInputFormatApi) {
+      expectedMessages = Arrays.asList(
+          StringUtils.format("Unable to parse row [this is not JSON] (Path: %s, Record: 6, Line: 9)", tmpFile.toURI()),
+          StringUtils.format(
+              "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 6, Line: 8)",
+              tmpFile.toURI()
+          ),
+          StringUtils.format(
+              "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}] (Path: %s, Record: 5, Line: 6)",
+              tmpFile.toURI()
+          ),
+          StringUtils.format(
+              "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, Line: 1)",
+              tmpFile.toURI()
+          )
+      );
+    } else {
+      expectedMessages = Arrays.asList(
+          "Unable to parse row [this is not JSON]",
+          "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+          "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
+          "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+      );
+    }
+
     actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
     }).collect(Collectors.toList());
@@ -1634,6 +1663,8 @@ public class IndexTaskTest extends IngestionTestBase
     );
     final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val");
     final IndexIngestionSpec ingestionSpec;
+
+    List<String> expectedMessages;
     if (useInputFormatApi) {
       ingestionSpec = createIngestionSpec(
           jsonMapper,
@@ -1647,6 +1678,20 @@ public class IndexTaskTest extends IngestionTestBase
           false,
           false
       );
+      expectedMessages = Arrays.asList(
+          StringUtils.format(
+              "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 3, Line: 6)",
+              tmpFile.toURI()
+          ),
+          StringUtils.format(
+              "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 2, Line: 4)",
+              tmpFile.toURI()
+          ),
+          StringUtils.format(
+              "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, Line: 2)",
+              tmpFile.toURI()
+          )
+      );
     } else {
       ingestionSpec = createIngestionSpec(
           jsonMapper,
@@ -1658,6 +1703,11 @@ public class IndexTaskTest extends IngestionTestBase
           false,
           false
       );
+      expectedMessages = Arrays.asList(
+          "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+          "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
+          "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+      );
     }
 
     IndexTask indexTask = new IndexTask(
@@ -1696,11 +1746,6 @@ public class IndexTaskTest extends IngestionTestBase
         .getUnparseableEvents()
         .get(RowIngestionMeters.BUILD_SEGMENTS);
 
-    List<String> expectedMessages = Arrays.asList(
-        "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
-        "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
-        "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
-    );
     List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
     }).collect(Collectors.toList());
@@ -1771,6 +1816,8 @@ public class IndexTaskTest extends IngestionTestBase
     );
     final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val");
     final IndexIngestionSpec ingestionSpec;
+
+    List<String> expectedMessages;
     if (useInputFormatApi) {
       ingestionSpec = createIngestionSpec(
           jsonMapper,
@@ -1784,6 +1831,11 @@ public class IndexTaskTest extends IngestionTestBase
           false,
           false
       );
+      expectedMessages = Arrays.asList(
+          StringUtils.format("Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 3, Line: 6)", tmpFile.toURI()),
+          StringUtils.format("Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 2, Line: 4)", tmpFile.toURI()),
+          StringUtils.format("Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, Line: 2)", tmpFile.toURI())
+      );
     } else {
       ingestionSpec = createIngestionSpec(
           jsonMapper,
@@ -1795,6 +1847,11 @@ public class IndexTaskTest extends IngestionTestBase
           false,
           false
       );
+      expectedMessages = Arrays.asList(
+          "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+          "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
+          "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
+      );
     }
 
     IndexTask indexTask = new IndexTask(
@@ -1833,11 +1890,6 @@ public class IndexTaskTest extends IngestionTestBase
         .getUnparseableEvents()
         .get(RowIngestionMeters.DETERMINE_PARTITIONS);
 
-    List<String> expectedMessages = Arrays.asList(
-        "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
-        "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
-        "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
-    );
     List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
     }).collect(Collectors.toList());
@@ -1957,6 +2009,7 @@ public class IndexTaskTest extends IngestionTestBase
     // report parse exception
     final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true);
     final IndexIngestionSpec ingestionSpec;
+    List<String> expectedMessages;
     if (useInputFormatApi) {
       ingestionSpec = createIngestionSpec(
           jsonMapper,
@@ -1970,6 +2023,12 @@ public class IndexTaskTest extends IngestionTestBase
           false,
           false
       );
+      expectedMessages = ImmutableList.of(
+          StringUtils.format(
+              "Timestamp[null] is unparseable! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1} (Path: %s, Record: 1, Line: 2)",
+              tmpFile.toURI()
+          )
+      );
     } else {
       ingestionSpec = createIngestionSpec(
           jsonMapper,
@@ -1981,6 +2040,9 @@ public class IndexTaskTest extends IngestionTestBase
           false,
           false
       );
+      expectedMessages = ImmutableList.of(
+          "Timestamp[null] is unparseable! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}"
+      );
     }
 
     IndexTask indexTask = new IndexTask(
@@ -2001,9 +2063,6 @@ public class IndexTaskTest extends IngestionTestBase
         .getUnparseableEvents()
         .get(RowIngestionMeters.BUILD_SEGMENTS);
 
-    List<String> expectedMessages = ImmutableList.of(
-        "Timestamp[null] is unparseable! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}"
-    );
     List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
       return ((List<String>) r.get("details")).get(0);
     }).collect(Collectors.toList());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 91ea632..b57e55b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -341,7 +341,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
             new ParseExceptionReport(
                 "{ts=2017unparseable}",
                 "unparseable",
-                ImmutableList.of("Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable}"),
+                ImmutableList.of(getErrorMessageForUnparseableTimestamp()),
                 1L
             ),
             new ParseExceptionReport(
@@ -462,7 +462,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
         new ParseExceptionReport(
             "{ts=2017unparseable}",
             "unparseable",
-            ImmutableList.of("Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable}"),
+            ImmutableList.of(getErrorMessageForUnparseableTimestamp()),
             1L
         ),
         new ParseExceptionReport(
@@ -989,6 +989,14 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
     );
   }
 
+  private String getErrorMessageForUnparseableTimestamp()
+  {
+    return useInputFormatApi ? StringUtils.format(
+        "Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable} (Path: %s, Record: 5, Line: 5)",
+        new File(inputDir, "test_0").toURI()
+    ) : "Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable}";
+  }
+
   private static class SettableSplittableLocalInputSource extends LocalInputSource
   {
     private final boolean splittableInputSource;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index b8725b1..89a4642 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -186,7 +186,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
             getRawColumns().get(0),
             null,
             true,
-            unparseableTimestampErrorString(data.get(0).getInput())
+            unparseableTimestampErrorString(data.get(0).getInput(), 1)
         ),
         data.get(0)
     );
@@ -195,7 +195,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
             getRawColumns().get(1),
             null,
             true,
-            unparseableTimestampErrorString(data.get(1).getInput())
+            unparseableTimestampErrorString(data.get(1).getInput(), 2)
         ),
         data.get(1)
     );
@@ -204,7 +204,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
             getRawColumns().get(2),
             null,
             true,
-            unparseableTimestampErrorString(data.get(2).getInput())
+            unparseableTimestampErrorString(data.get(2).getInput(), 3)
         ),
         data.get(2)
     );
@@ -213,7 +213,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
             getRawColumns().get(3),
             null,
             true,
-            unparseableTimestampErrorString(data.get(3).getInput())
+            unparseableTimestampErrorString(data.get(3).getInput(), 4)
         ),
         data.get(3)
     );
@@ -222,7 +222,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
             getRawColumns().get(4),
             null,
             true,
-            unparseableTimestampErrorString(data.get(4).getInput())
+            unparseableTimestampErrorString(data.get(4).getInput(), 5)
         ),
         data.get(4)
     );
@@ -231,7 +231,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
             getRawColumns().get(5),
             null,
             true,
-            unparseableTimestampErrorString(data.get(5).getInput())
+            unparseableTimestampErrorString(data.get(5).getInput(), 6)
         ),
         data.get(5)
     );
@@ -259,7 +259,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
             getRawColumns().get(0),
             null,
             true,
-            unparseableTimestampErrorString(data.get(0).getInput())
+            unparseableTimestampErrorString(data.get(0).getInput(), 1)
         ),
         data.get(0)
     );
@@ -268,7 +268,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
             getRawColumns().get(1),
             null,
             true,
-            unparseableTimestampErrorString(data.get(1).getInput())
+            unparseableTimestampErrorString(data.get(1).getInput(), 2)
         ),
         data.get(1)
     );
@@ -277,7 +277,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
             getRawColumns().get(2),
             null,
             true,
-            unparseableTimestampErrorString(data.get(2).getInput())
+            unparseableTimestampErrorString(data.get(2).getInput(), 3)
         ),
         data.get(2)
     );
@@ -1248,7 +1248,12 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
     //
     // first n rows are related to the first json block which fails to parse
     //
-    String parseExceptionMessage = "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
+    String parseExceptionMessage;
+    if (useInputFormatApi) {
+      parseExceptionMessage = "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
+    } else {
+      parseExceptionMessage = "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
+    }
     for (; index < illegalRows; index++) {
       assertEqualsSamplerResponseRow(
           new SamplerResponseRow(
@@ -1436,14 +1441,24 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
 
   private String getUnparseableTimestampString()
   {
-    return ParserType.STR_CSV.equals(parserType)
-           ? "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
-           : "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
+    if (useInputFormatApi) {
+      return ParserType.STR_CSV.equals(parserType)
+             ? "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6} (Line: 6)"
+             : "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6} (Line: 6)";
+    } else {
+      return ParserType.STR_CSV.equals(parserType)
+             ? "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
+             : "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
+    }
   }
 
-  private String unparseableTimestampErrorString(Map<String, Object> rawColumns)
+  private String unparseableTimestampErrorString(Map<String, Object> rawColumns, int line)
   {
-    return StringUtils.format("Timestamp[null] is unparseable! Event: %s", rawColumns);
+    if (useInputFormatApi) {
+      return StringUtils.format("Timestamp[null] is unparseable! Event: %s (Line: %d)", rawColumns, line);
+    } else {
+      return StringUtils.format("Timestamp[null] is unparseable! Event: %s", rawColumns);
+    }
   }
 
   @Nullable
diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
index 0709799..0a503eb 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
@@ -76,6 +76,12 @@ public class SqlReader extends IntermediateRowParsingReader<Map<String, Object>>
   }
 
   @Override
+  protected InputEntity source()
+  {
+    return source;
+  }
+
+  @Override
   protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow) throws ParseException
   {
     return Collections.singletonList(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org