You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2020/05/14 20:26:14 UTC

[asterixdb] 21/26: [ASTERIXDB-2726][EXT] Report line number instead of record number in messages of parsers

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit e844fad3435568452d6f738b45570d3bc1295039
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Mon May 11 20:36:17 2020 -0700

    [ASTERIXDB-2726][EXT] Report line number instead of record number in messages of parsers
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
       IRecordReader:
          added getLineNumber() to provide line number for parsers and converters.
       IRecordConverter:
          added configure() to pass the line number supplier to the record converter.
       IRecordDataParser:
          pass line number supplier from the Reader to the Parser.
    
    Details:
    Report line number instead of record number in messages of parsers.
    
    - added getPreviousStreamName() to allow readers to report errors happening on
       the previous stream when the underlying stream has already switched to a new one.
    - changed the test executor to compare actual warnings issues by a test case with
       the expected warnigns properly.
    
    Change-Id: I00508d8eeca4d9bae95f55ab51ecfb0ce2ced6b0
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6245
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Hussain Towaileb <hu...@gmail.com>
---
 asterixdb/asterix-app/data/csv/error1_line_num.csv |  3 +
 asterixdb/asterix-app/data/csv/error2_line_num.csv |  5 ++
 .../data/json/malformed-json-no-closing.json       |  2 +
 .../test/common/CancellationTestExecutor.java      |  4 +-
 .../org/apache/asterix/test/common/IPollTask.java  |  5 +-
 .../apache/asterix/test/common/TestExecutor.java   | 70 ++++++++++++++--------
 .../aws/AwsS3ExternalDatasetTest.java              | 35 +++++++----
 .../s3/csv-warnings/query-dataset.002.ddl.sqlpp    |  2 +
 .../csv-warnings/query-dataset.014.s3bucket.sqlpp} |  9 +--
 ...t.002.ddl.sqlpp => query-dataset.015.ddl.sqlpp} | 11 +---
 .../s3/csv-warnings/query-dataset.016.query.sqlpp} | 12 ++--
 .../csv-warnings/query-dataset.017.s3bucket.sqlpp} |  9 +--
 .../s3/csv-warnings/query-dataset.018.query.sqlpp} | 12 ++--
 .../json-warnings.001.s3bucket.sqlpp}              |  9 +--
 .../json-warnings.002.ddl.sqlpp}                   |  7 +--
 .../json-warnings/json-warnings.003.query.sqlpp}   | 12 ++--
 .../s3/json-warnings/json-warnings.099.ddl.sqlpp}  |  9 +--
 .../queries_sqlpp/objects/ObjectsQueries.xml       |  2 +-
 .../aws/s3/csv-warnings/external_dataset.006.adm   |  1 +
 .../aws/s3/csv-warnings/external_dataset.007.adm   |  2 +
 .../aws/s3/json-warnings/json-warnings.003.adm     |  1 +
 .../testsuite_external_dataset_one_partition.xml   | 53 +++++++++-------
 .../asterix/external/api/AsterixInputStream.java   |  4 ++
 .../asterix/external/api/IRecordConverter.java     |  9 +++
 .../asterix/external/api/IRecordDataParser.java    |  7 ++-
 .../apache/asterix/external/api/IRecordReader.java |  5 ++
 .../CSVToRecordWithMetadataAndPKConverter.java     |  9 ++-
 .../input/record/reader/aws/AwsS3InputStream.java  | 17 ++++--
 .../record/reader/stream/LineRecordReader.java     | 20 ++++++-
 .../reader/stream/QuotedLineRecordReader.java      | 27 +++++----
 .../reader/stream/SemiStructuredRecordReader.java  | 51 ++++++++++++----
 .../record/reader/stream/StreamRecordReader.java   |  6 ++
 .../input/stream/AsterixInputStreamReader.java     |  4 ++
 .../external/input/stream/LocalFSInputStream.java  | 11 ++++
 .../external/parser/DelimitedDataParser.java       | 45 ++++++++++----
 .../external/parser/RecordWithMetadataParser.java  |  8 +++
 .../provider/DataflowControllerProvider.java       |  2 +-
 .../external/util/ExternalDataConstants.java       |  4 +-
 .../apache/asterix/external/util/ParseUtil.java    |  4 +-
 .../src/main/resources/errormsg/en.properties      |  2 +-
 .../file/FieldCursorForDelimitedDataParser.java    | 34 ++++++++---
 41 files changed, 351 insertions(+), 193 deletions(-)

diff --git a/asterixdb/asterix-app/data/csv/error1_line_num.csv b/asterixdb/asterix-app/data/csv/error1_line_num.csv
new file mode 100644
index 0000000..34bcee9
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/error1_line_num.csv
@@ -0,0 +1,3 @@
+1,"good","recommend"
+
+2,"bad" ,"not recommend"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/error2_line_num.csv b/asterixdb/asterix-app/data/csv/error2_line_num.csv
new file mode 100644
index 0000000..0f1286f
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/error2_line_num.csv
@@ -0,0 +1,5 @@
+1,"good","recommend"
+2,"bad and
+not so good and
+bad" ,"not recommend"
+3,"good","recommend"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/json/malformed-json-no-closing.json b/asterixdb/asterix-app/data/json/malformed-json-no-closing.json
new file mode 100644
index 0000000..83f3087
--- /dev/null
+++ b/asterixdb/asterix-app/data/json/malformed-json-no-closing.json
@@ -0,0 +1,2 @@
+{ "field1": 1, "field2": "text"
+
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index 36a06c0..d4f92fb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.net.URI;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -129,8 +130,7 @@ public class CancellationTestExecutor extends TestExecutor {
     }
 
     @Override
-    protected void ensureWarnings(int actualWarnCount, int expectedWarnCount, TestCase.CompilationUnit cUnit)
-            throws Exception {
+    protected void ensureWarnings(BitSet expectedWarnings, TestCase.CompilationUnit cUnit) throws Exception {
         // skip checking warnings as currently cancelled queries with warnings might not run successfully at all
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java
index ab90244..a1ed12b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.test.common;
 
 import java.io.File;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -43,11 +44,11 @@ public interface IPollTask {
      * @param expectedResultFileCtxs
      * @param testFile
      * @param actualPath
-     * @param actualWarnCount
+     * @param expectedWarnings
      */
     void execute(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, String statement,
             boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
-            List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, MutableInt actualWarnCount)
+            List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, BitSet expectedWarnings)
             throws Exception;
 
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index a10dc54..10869c1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -45,6 +45,7 @@ import java.text.MessageFormat;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -53,6 +54,7 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -65,6 +67,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.asterix.api.http.server.QueryServiceRequestParameters;
@@ -159,9 +162,9 @@ public class TestExecutor {
     private static final ContentType TEXT_PLAIN_UTF8 = ContentType.create(HttpUtil.ContentType.APPLICATION_JSON, UTF_8);
 
     private final IPollTask plainExecutor = (testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
-            queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount) -> executeTestFile(testCaseCtx,
+            queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings) -> executeTestFile(testCaseCtx,
                     ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs,
-                    testFile, actualPath, actualWarnCount);
+                    testFile, actualPath, expectedWarnings);
 
     public static final String DELIVERY_ASYNC = "async";
     public static final String DELIVERY_DEFERRED = "deferred";
@@ -947,7 +950,7 @@ public class TestExecutor {
     public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
             String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
             MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
-            MutableInt actualWarnCount) throws Exception {
+            BitSet expectedWarnings) throws Exception {
         InputStream resultStream;
         File qbcFile;
         boolean failed = false;
@@ -975,11 +978,11 @@ public class TestExecutor {
             case "pollquery":
                 poll(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
                         expectedResultFileCtxs, testFile, actualPath, ctx.getType().substring("poll".length()),
-                        actualWarnCount, plainExecutor);
+                        expectedWarnings, plainExecutor);
                 break;
             case "polldynamic":
                 polldynamic(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
-                        expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
+                        expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
                 break;
             case "query":
             case "async":
@@ -1006,7 +1009,7 @@ public class TestExecutor {
 
                 if (testCaseCtx.getTestCase().isCheckWarnings()) {
                     boolean expectedSourceLoc = testCaseCtx.isSourceLocationExpected(cUnit);
-                    validateWarnings(extractedResult.getWarnings(), cUnit.getExpectedWarn(), actualWarnCount,
+                    validateWarnings(extractedResult.getWarnings(), cUnit.getExpectedWarn(), expectedWarnings,
                             expectedSourceLoc);
                 }
                 break;
@@ -1413,17 +1416,17 @@ public class TestExecutor {
     private void polldynamic(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
             String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
             MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
-            MutableInt actualWarnCount) throws Exception {
+            BitSet expectedWarnings) throws Exception {
         IExpectedResultPoller poller = getExpectedResultPoller(statement);
         final String key = getKey(statement);
         poll(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs,
-                testFile, actualPath, "validate", actualWarnCount, new IPollTask() {
+                testFile, actualPath, "validate", expectedWarnings, new IPollTask() {
                     @Override
                     public void execute(TestCaseContext testCaseCtx, TestFileContext ctx,
                             Map<String, Object> variableCtx, String statement, boolean isDmlRecoveryTest,
                             ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
                             List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
-                            MutableInt actualWarnCount) throws Exception {
+                            BitSet expectedWarnings) throws Exception {
                         File actualResultFile = new File(actualPath, testCaseCtx.getTestCase().getFilePath()
                                 + File.separatorChar + cUnit.getName() + '.' + ctx.getSeqNum() + ".polled.adm");
                         if (actualResultFile.exists() && !actualResultFile.delete()) {
@@ -1461,7 +1464,7 @@ public class TestExecutor {
     private void poll(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
             String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
             MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
-            String newType, MutableInt actualWarnCount, IPollTask pollTask) throws Exception {
+            String newType, BitSet expectedWarnings, IPollTask pollTask) throws Exception {
         // polltimeoutsecs=nnn, polldelaysecs=nnn
         int timeoutSecs = getTimeoutSecs(statement);
         int retryDelaySecs = getRetryDelaySecs(statement);
@@ -1484,7 +1487,7 @@ public class TestExecutor {
                         try {
                             startSemaphore.release();
                             pollTask.execute(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
-                                    queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
+                                    queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
                         } finally {
                             endSemaphore.release();
                         }
@@ -1798,7 +1801,8 @@ public class TestExecutor {
         for (CompilationUnit cUnit : cUnits) {
             List<String> expectedErrors = cUnit.getExpectedError();
             int expectedWarnCount = cUnit.getExpectedWarn().size();
-            MutableInt actualWarnCount = new MutableInt(0);
+            BitSet expectedWarnings = new BitSet(cUnit.getExpectedWarn().size());
+            expectedWarnings.set(0, cUnit.getExpectedWarn().size());
             LOGGER.info(
                     "Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... ");
             Map<String, Object> variableCtx = new HashMap<>();
@@ -1818,7 +1822,7 @@ public class TestExecutor {
                 try {
                     if (!testFile.getName().startsWith(DIAGNOSE)) {
                         executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
-                                queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
+                                queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
                     }
                 } catch (TestLoop loop) {
                     // rewind the iterator until we find our target
@@ -1850,7 +1854,7 @@ public class TestExecutor {
                         throw new Exception(
                                 "Test \"" + cUnit.getName() + "\" FAILED; expected exception was not thrown...");
                     }
-                    ensureWarnings(actualWarnCount.getValue(), expectedWarnCount, cUnit);
+                    ensureWarnings(expectedWarnings, cUnit);
                     LOGGER.info(
                             "[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " PASSED ");
                     if (passedGroup != null) {
@@ -1873,7 +1877,7 @@ public class TestExecutor {
                         final File file = ctx.getFile();
                         final String statement = readTestFile(file);
                         executeTestFile(testCaseCtx, ctx, variableCtx, statement, false, pb, cUnit, new MutableInt(-1),
-                                Collections.emptyList(), file, null, new MutableInt(-1));
+                                Collections.emptyList(), file, null, new BitSet());
                     }
                 }
             } catch (Exception diagnosticFailure) {
@@ -2074,9 +2078,17 @@ public class TestExecutor {
         LOGGER.info("Cluster state now " + desiredState);
     }
 
-    protected void ensureWarnings(int actualWarnCount, int expectedWarnCount, CompilationUnit cUnit) throws Exception {
-        if (actualWarnCount < expectedWarnCount) {
-            LOGGER.error("Test {} failed to raise (an) expected warning(s)", cUnit.getName());
+    protected void ensureWarnings(BitSet expectedWarnings, CompilationUnit cUnit) throws Exception {
+        boolean fail = !expectedWarnings.isEmpty();
+        if (fail) {
+            LOGGER.error("Test {} failed to raise (an) expected warning(s):", cUnit.getName());
+        }
+        List<String> expectedWarn = cUnit.getExpectedWarn();
+        for (int i = expectedWarnings.nextSetBit(0); i >= 0; i = expectedWarnings.nextSetBit(i + 1)) {
+            String warning = expectedWarn.get(i);
+            LOGGER.error(warning);
+        }
+        if (fail) {
             throw new Exception("Test \"" + cUnit.getName() + "\" FAILED; expected warning(s) was not returned...");
         }
     }
@@ -2212,22 +2224,30 @@ public class TestExecutor {
         return extension.endsWith(AQL) ? getEndpoint(Servlets.QUERY_AQL) : getEndpoint(Servlets.QUERY_SERVICE);
     }
 
-    private void validateWarnings(List<String> actualWarnings, List<String> expectedWarn, MutableInt actualWarnCount,
+    private void validateWarnings(List<String> actualWarnings, List<String> expectedWarn, BitSet expectedWarnings,
             boolean expectedSourceLoc) throws Exception {
         if (actualWarnings != null) {
             for (String actualWarn : actualWarnings) {
-                if (expectedWarn.stream().noneMatch(actualWarn::contains)) {
-                    throw new Exception("unexpected warning was encountered (" + actualWarn + ")");
+                OptionalInt first = IntStream.range(0, expectedWarn.size())
+                        .filter(i -> actualWarn.contains(expectedWarn.get(i)) && expectedWarnings.get(i)).findFirst();
+                if (!first.isPresent()) {
+                    String msg = "unexpected warning was encountered or has already been matched (" + actualWarn + ")";
+                    LOGGER.error(msg);
+                    if (!expectedWarnings.isEmpty()) {
+                        LOGGER.error("was expecting the following warnings: ");
+                    }
+                    for (int i = expectedWarnings.nextSetBit(0); i >= 0; i = expectedWarnings.nextSetBit(i + 1)) {
+                        LOGGER.error(expectedWarn.get(i));
+                    }
+                    throw new Exception(msg);
                 }
                 if (expectedSourceLoc && !containsSourceLocation(actualWarn)) {
                     throw new Exception(MessageFormat.format(
                             "Expected to find source location \"{}, {}\" in warning text: +++++{}+++++",
                             ERR_MSG_SRC_LOC_LINE_REGEX, ERR_MSG_SRC_LOC_COLUMN_REGEX, actualWarn));
                 }
-                actualWarnCount.increment();
-                if (actualWarnCount.getValue() > expectedWarn.size()) {
-                    throw new Exception("returned warnings exceeded expected warnings");
-                }
+                int warningIndex = first.getAsInt();
+                expectedWarnings.clear(warningIndex);
             }
         }
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index 37a3916..55c78e3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -28,6 +28,7 @@ import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -385,11 +386,11 @@ public class AwsS3ExternalDatasetTest {
         public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
                 String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, TestCase.CompilationUnit cUnit,
                 MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
-                MutableInt actualWarnCount) throws Exception {
+                BitSet expectedWarnings) throws Exception {
             String[] lines;
             switch (ctx.getType()) {
                 case "s3bucket":
-                    // <bucket_name> <def_name> <sub-path:src_file1,sub-path:src_file2,sub-path:src_file3>
+                    // <bucket> <def> <sub-path:new_fname:src_file1,sub-path:new_fname:src_file2,sub-path:src_file3>
                     lines = TestExecutor.stripAllComments(statement).trim().split("\n");
                     String lastLine = lines[lines.length - 1];
                     String[] command = lastLine.trim().split(" ");
@@ -401,7 +402,7 @@ public class AwsS3ExternalDatasetTest {
                     break;
                 default:
                     super.executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
-                            queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
+                            queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
             }
         }
     }
@@ -425,23 +426,37 @@ public class AwsS3ExternalDatasetTest {
             int size = s3pathAndSourceFile.length;
             String path;
             String sourceFilePath;
-            String sourceFileName;
+            String uploadedFileName;
             if (size == 1) {
                 // case: playground json-data/reviews SOURCE_FILE1,SOURCE_FILE2
                 path = definitionPath;
                 sourceFilePath = s3pathAndSourceFile[0];
-                sourceFileName = FilenameUtils.getName(s3pathAndSourceFile[0]);
-            } else {
+                uploadedFileName = FilenameUtils.getName(s3pathAndSourceFile[0]);
+            } else if (size == 2) {
                 // case: playground json-data/reviews level1/sub-level:SOURCE_FILE1,level2/sub-level:SOURCE_FILE2
+                String subPathOrNewFileName = s3pathAndSourceFile[0];
+                if (subPathOrNewFileName.startsWith("$$")) {
+                    path = definitionPath;
+                    sourceFilePath = s3pathAndSourceFile[1];
+                    uploadedFileName = subPathOrNewFileName.substring(2);
+                } else {
+                    path = definitionPath + subPathOrNewFileName + (subPathOrNewFileName.endsWith("/") ? "" : "/");
+                    sourceFilePath = s3pathAndSourceFile[1];
+                    uploadedFileName = FilenameUtils.getName(s3pathAndSourceFile[1]);
+                }
+            } else if (size == 3) {
                 path = definitionPath + s3pathAndSourceFile[0] + (s3pathAndSourceFile[0].endsWith("/") ? "" : "/");
-                sourceFilePath = s3pathAndSourceFile[1];
-                sourceFileName = FilenameUtils.getName(s3pathAndSourceFile[1]);
+                uploadedFileName = s3pathAndSourceFile[1];
+                sourceFilePath = s3pathAndSourceFile[2];
+
+            } else {
+                throw new IllegalArgumentException();
             }
 
-            String keyPath = path + sourceFileName;
+            String keyPath = path + uploadedFileName;
             int k = 1;
             while (fileNames.contains(keyPath)) {
-                keyPath = path + (k++) + sourceFileName;
+                keyPath = path + (k++) + uploadedFileName;
             }
             fileNames.add(keyPath);
             client.putObject(PUT_OBJECT_BUILDER.bucket(bucketName).key(keyPath).build(),
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
index f7c5c61..6df570c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
@@ -23,8 +23,10 @@ USE test;
 
 DROP TYPE t1 IF EXISTS;
 DROP TYPE t2 IF EXISTS;
+DROP TYPE t3 IF EXISTS;
 CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
 CREATE TYPE t2 AS {f1: bigint, f2: bigint?, f3: double, f4: double?, f5: string, f6: string?, f7: boolean, f8: boolean?};
+CREATE TYPE t3 AS {f1: bigint, f2: string, f3: string};
 
 DROP DATASET ds1 IF EXISTS;
 CREATE EXTERNAL DATASET ds1(t1) USING S3 (
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.014.s3bucket.sqlpp
similarity index 79%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.014.s3bucket.sqlpp
index 9d9ff28..5d3989b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.014.s3bucket.sqlpp
@@ -16,12 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.IOException;
-
-@FunctionalInterface
-public interface IRecordConverter<I, O> {
-
-    public O convert(IRawRecord<? extends I> input) throws IOException;
-}
+playground data_dir data/csv/error1_line_num.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.015.ddl.sqlpp
similarity index 74%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.015.ddl.sqlpp
index f7c5c61..75ba5d6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.015.ddl.sqlpp
@@ -17,17 +17,10 @@
  * under the License.
  */
 
-DROP DATAVERSE test IF EXISTS;
-CREATE DATAVERSE test;
 USE test;
 
-DROP TYPE t1 IF EXISTS;
-DROP TYPE t2 IF EXISTS;
-CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
-CREATE TYPE t2 AS {f1: bigint, f2: bigint?, f3: double, f4: double?, f5: string, f6: string?, f7: boolean, f8: boolean?};
-
-DROP DATASET ds1 IF EXISTS;
-CREATE EXTERNAL DATASET ds1(t1) USING S3 (
+DROP DATASET ds2 IF EXISTS;
+CREATE EXTERNAL DATASET ds2(t3) USING S3 (
 ("accessKeyId"="dummyAccessKey"),
 ("secretAccessKey"="dummySecretKey"),
 ("region"="us-west-2"),
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.016.query.sqlpp
similarity index 79%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.016.query.sqlpp
index 9d9ff28..e6b24f3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.016.query.sqlpp
@@ -16,12 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.IOException;
-
-@FunctionalInterface
-public interface IRecordConverter<I, O> {
-
-    public O convert(IRawRecord<? extends I> input) throws IOException;
-}
+FROM ds2 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.017.s3bucket.sqlpp
similarity index 79%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.017.s3bucket.sqlpp
index 9d9ff28..c35d646 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.017.s3bucket.sqlpp
@@ -16,12 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.IOException;
-
-@FunctionalInterface
-public interface IRecordConverter<I, O> {
-
-    public O convert(IRawRecord<? extends I> input) throws IOException;
-}
+playground data_dir data/csv/error2_line_num.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.018.query.sqlpp
similarity index 79%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.018.query.sqlpp
index 9d9ff28..e6b24f3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.018.query.sqlpp
@@ -16,12 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.IOException;
-
-@FunctionalInterface
-public interface IRecordConverter<I, O> {
-
-    public O convert(IRawRecord<? extends I> input) throws IOException;
-}
+FROM ds2 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.001.s3bucket.sqlpp
similarity index 79%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.001.s3bucket.sqlpp
index 9d9ff28..2bd413b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.001.s3bucket.sqlpp
@@ -16,12 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.IOException;
-
-@FunctionalInterface
-public interface IRecordConverter<I, O> {
-
-    public O convert(IRawRecord<? extends I> input) throws IOException;
-}
+playground data_dir $$1.json:data/json/malformed-json-no-closing.json,$$2.json:data/json/double-150-11.json
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.002.ddl.sqlpp
similarity index 82%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.002.ddl.sqlpp
index f7c5c61..7112bb9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.002.ddl.sqlpp
@@ -22,9 +22,7 @@ CREATE DATAVERSE test;
 USE test;
 
 DROP TYPE t1 IF EXISTS;
-DROP TYPE t2 IF EXISTS;
-CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
-CREATE TYPE t2 AS {f1: bigint, f2: bigint?, f3: double, f4: double?, f5: string, f6: string?, f7: boolean, f8: boolean?};
+CREATE TYPE t1 AS {};
 
 DROP DATASET ds1 IF EXISTS;
 CREATE EXTERNAL DATASET ds1(t1) USING S3 (
@@ -34,6 +32,5 @@ CREATE EXTERNAL DATASET ds1(t1) USING S3 (
 ("serviceEndpoint"="http://localhost:8001"),
 ("container"="playground"),
 ("definition"="data_dir"),
-("format"="CSV"),
-("header"="false")
+("format"="JSON")
 );
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.003.query.sqlpp
similarity index 79%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.003.query.sqlpp
index 9d9ff28..5f2ad26 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.003.query.sqlpp
@@ -16,12 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.IOException;
-
-@FunctionalInterface
-public interface IRecordConverter<I, O> {
-
-    public O convert(IRawRecord<? extends I> input) throws IOException;
-}
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.099.ddl.sqlpp
similarity index 79%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.099.ddl.sqlpp
index 9d9ff28..36b2bab 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.099.ddl.sqlpp
@@ -16,12 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.IOException;
-
-@FunctionalInterface
-public interface IRecordConverter<I, O> {
-
-    public O convert(IRawRecord<? extends I> input) throws IOException;
-}
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml
index 87bd204..e8902b6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml
@@ -223,7 +223,7 @@
       <expected-warn>Duplicate field name "name" (in line 22, at column 30)</expected-warn>
       <expected-warn>Duplicate field name "id" (in line 22, at column 56)</expected-warn>
       <expected-warn>Duplicate field name "f1" (in line 22, at column 70)</expected-warn>
-      <expected-warn>Duplicate field name "id" (in line 22, at column 36)</expected-warn>
+      <expected-warn>Duplicate field name "id" (in line 22, at column 56)</expected-warn>
       <expected-warn>Duplicate field name "f1" (in line 22, at column 83)</expected-warn>
       <expected-warn>Duplicate field name "fname1" (in line 25, at column 45)</expected-warn>
     </compilation-unit>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.006.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.006.adm
new file mode 100644
index 0000000..c3ce0a1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.006.adm
@@ -0,0 +1 @@
+{ "f1": 1, "f2": "good", "f3": "recommend" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.007.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.007.adm
new file mode 100644
index 0000000..c56f0c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.007.adm
@@ -0,0 +1,2 @@
+{ "f1": 1, "f2": "good", "f3": "recommend" }
+{ "f1": 3, "f2": "good", "f3": "recommend" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/json-warnings/json-warnings.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/json-warnings/json-warnings.003.adm
new file mode 100644
index 0000000..5bae5d1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/json-warnings/json-warnings.003.adm
@@ -0,0 +1 @@
+{ "double_value": 150.11 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml
index 0597d8f..6704d78 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml
@@ -42,35 +42,44 @@
     <test-case FilePath="external-dataset" check-warnings="true">
       <compilation-unit name="aws/s3/csv-warnings">
         <output-dir compare="Text">aws/s3/csv-warnings</output-dir>
-        <expected-warn>Parsing error at data_dir/no_h_missing_fields.csv record 2 field 3: some fields are missing</expected-warn>
-        <expected-warn>Parsing error at data_dir/no_h_no_closing_q.csv record 0 field 0: malformed input record ended inside quote</expected-warn>
-        <expected-warn>Parsing error at  record 0 field 0: malformed input record ended inside quote</expected-warn>
+        <expected-warn>Parsing error at data_dir/no_h_missing_fields.csv line 2 field 3: some fields are missing</expected-warn>
+        <expected-warn>Parsing error at data_dir/no_h_no_closing_q.csv line 2 field 0: malformed input record ended abruptly</expected-warn>
+        <expected-warn>Parsing error at  line 2 field 0: malformed input record ended abruptly</expected-warn>
 
-        <expected-warn>Parsing error at  record 4 field 3: invalid value</expected-warn>
-        <expected-warn>Parsing error at  record 1 field 1: invalid value</expected-warn>
-        <expected-warn>Parsing error at  record 10 field 1: invalid value</expected-warn>
-        <expected-warn>Parsing error at  record 2 field 1: invalid value</expected-warn>
-        <expected-warn>Parsing error at  record 3 field 1: invalid value</expected-warn>
-        <expected-warn>Parsing error at  record 6 field 7: invalid value</expected-warn>
-        <expected-warn>Parsing error at  record 12 field 7: invalid value</expected-warn>
-        <expected-warn>Parsing error at  record 11 field 3: invalid value</expected-warn>
-        <expected-warn>Parsing error at  record 8 field 6: a quote should be in the beginning</expected-warn>
+        <expected-warn>Parsing error at  line 5 field 3: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 2 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 11 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 3 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 4 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 7 field 7: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 13 field 7: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 12 field 3: invalid value</expected-warn>
+        <expected-warn>Parsing error at  line 9 field 6: a quote should be in the beginning</expected-warn>
 
-        <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 4 field 3: invalid value</expected-warn>
-        <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 1 field 1: invalid value</expected-warn>
-        <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 10 field 1: invalid value</expected-warn>
-        <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 2 field 1: invalid value</expected-warn>
-        <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 3 field 1: invalid value</expected-warn>
-        <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 6 field 7: invalid value</expected-warn>
-        <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 12 field 7: invalid value</expected-warn>
-        <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 11 field 3: invalid value</expected-warn>
-        <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 8 field 6: a quote should be in the beginning</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 5 field 3: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 2 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 11 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 3 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 4 field 1: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 7 field 7: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 13 field 7: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 12 field 3: invalid value</expected-warn>
+        <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 9 field 6: a quote should be in the beginning</expected-warn>
+
+        <expected-warn>Parsing error at data_dir/error1_line_num.csv line 3 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn>
+        <expected-warn>Parsing error at data_dir/error2_line_num.csv line 4 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset" check-warnings="true">
       <compilation-unit name="aws/s3/tsv-warnings">
         <output-dir compare="Text">aws/s3/tsv-warnings</output-dir>
-        <expected-warn>Parsing error at data_dir/no_h_missing_fields.tsv record 2 field 3: some fields are missing</expected-warn>
+        <expected-warn>Parsing error at data_dir/no_h_missing_fields.tsv line 2 field 3: some fields are missing</expected-warn>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="aws/s3/json-warnings">
+        <output-dir compare="Text">aws/s3/json-warnings</output-dir>
+        <expected-warn>Parsing error at data_dir/1.json line 3 field 0: malformed input record ended abruptly</expected-warn>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset" check-warnings="true">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index 4dfcbb5..f959f8d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@ -51,4 +51,8 @@ public abstract class AsterixInputStream extends InputStream {
     public String getStreamName() {
         return "";
     }
+
+    public String getPreviousStreamName() {
+        return "";
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index 9d9ff28..f544ca0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -19,9 +19,18 @@
 package org.apache.asterix.external.api;
 
 import java.io.IOException;
+import java.util.function.LongSupplier;
 
 @FunctionalInterface
 public interface IRecordConverter<I, O> {
 
     public O convert(IRawRecord<? extends I> input) throws IOException;
+
+    /**
+     * Configures the converter with information suppliers from the {@link IRecordReader} data source.
+     *
+     * @param lineNumber line number supplier
+     */
+    default void configure(LongSupplier lineNumber) {
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
index 9c9ec1c..c4dfdd0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.api;
 
 import java.io.DataOutput;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -40,11 +41,11 @@ public interface IRecordDataParser<T> extends IDataParser {
     public boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
 
     /**
-     * Sets the data source name supplier that this parser is receiving records from. The data source name could be
-     * used for reporting, for example.
+     * Configures the parser with information suppliers from the {@link IRecordReader} data source.
      *
      * @param dataSourceName data source name supplier
+     * @param lineNumber line number supplier
      */
-    default void setDataSourceName(Supplier<String> dataSourceName) {
+    default void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 95e83f2..cb97526 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.asterix.external.api;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -90,4 +91,8 @@ public interface IRecordReader<T> extends Closeable {
     default Supplier<String> getDataSourceName() {
         return ExternalDataConstants.EMPTY_STRING;
     }
+
+    default LongSupplier getLineNumber() {
+        return ExternalDataConstants.NO_LINES;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
index 78240a0..8b930aa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.input.record.converter;
 
 import java.io.IOException;
+import java.util.function.LongSupplier;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -38,6 +39,7 @@ public class CSVToRecordWithMetadataAndPKConverter
     private final int valueIndex;
     private final RecordWithMetadataAndPK<char[]> recordWithMetadata;
     private final CharArrayRecord record;
+    private LongSupplier lineNumber = ExternalDataConstants.NO_LINES;
 
     public CSVToRecordWithMetadataAndPKConverter(final int valueIndex, final char delimiter, final ARecordType metaType,
             final ARecordType recordType, final int[] keyIndicator, final int[] keyIndexes, final IAType[] keyTypes,
@@ -54,7 +56,7 @@ public class CSVToRecordWithMetadataAndPKConverter
     public RecordWithMetadataAndPK<char[]> convert(final IRawRecord<? extends char[]> input) throws IOException {
         record.reset();
         recordWithMetadata.reset();
-        cursor.nextRecord(input.get(), input.size());
+        cursor.nextRecord(input.get(), input.size(), lineNumber.getAsLong());
         int i = 0;
         int j = 0;
         FieldCursorForDelimitedDataParser.Result lastResult;
@@ -77,4 +79,9 @@ public class CSVToRecordWithMetadataAndPKConverter
         }
         return recordWithMetadata;
     }
+
+    @Override
+    public void configure(LongSupplier lineNumber) {
+        this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index a70848c..448d3f5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -88,12 +88,11 @@ public class AwsS3InputStream extends AbstractMultipleInputStream {
             in = new GZIPInputStream(s3Client.getObject(getObjectRequest), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
         }
 
+        // Current file ready, point to the next file
+        nextFileIndex++;
         if (notificationHandler != null) {
             notificationHandler.notifyNewSource();
         }
-
-        // Current file ready, point to the next file
-        nextFileIndex++;
         return true;
     }
 
@@ -116,8 +115,16 @@ public class AwsS3InputStream extends AbstractMultipleInputStream {
 
     @Override
     public String getStreamName() {
-        int currentFileIndex = nextFileIndex - 1;
-        return currentFileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(currentFileIndex);
+        return getStreamNameAt(nextFileIndex - 1);
+    }
+
+    @Override
+    public String getPreviousStreamName() {
+        return getStreamNameAt(nextFileIndex - 2);
+    }
+
+    private String getStreamNameAt(int fileIndex) {
+        return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex);
     }
 
     /**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index a3f560d..4b86142 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.LongSupplier;
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -35,7 +36,8 @@ public class LineRecordReader extends StreamRecordReader {
     protected boolean hasHeader;
     protected boolean prevCharCR;
     protected int newlineLength;
-    protected int recordNumber = 0;
+    protected long beginLineNumber = 1;
+    protected long lineNumber = 1;
     protected boolean newSource = false;
     private static final List<String> recordReaderFormats =
             Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT,
@@ -60,7 +62,8 @@ public class LineRecordReader extends StreamRecordReader {
     public void resetForNewSource() {
         super.resetForNewSource();
         newSource = true;
-        recordNumber = 0;
+        beginLineNumber = 1;
+        lineNumber = 1;
         prevCharCR = false;
         newlineLength = 0;
     }
@@ -98,6 +101,7 @@ public class LineRecordReader extends StreamRecordReader {
              * consuming it until we have a chance to look at the char that
              * follows.
              */
+            beginLineNumber = lineNumber;
             newlineLength = 0; //length of terminating newline
             prevCharCR = false; //true of prev char was CR
             record.reset();
@@ -120,9 +124,11 @@ public class LineRecordReader extends StreamRecordReader {
                     if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
                         newlineLength = (prevCharCR) ? 2 : 1;
                         ++bufferPosn; // at next invocation proceed from following byte
+                        ++lineNumber;
                         break;
                     }
                     if (prevCharCR) { //CR + notLF, we are at notLF
+                        ++lineNumber;
                         newlineLength = 1;
                         break;
                     }
@@ -140,8 +146,16 @@ public class LineRecordReader extends StreamRecordReader {
                 newSource = false;
                 continue;
             }
-            recordNumber++;
             return true;
         }
     }
+
+    @Override
+    public LongSupplier getLineNumber() {
+        return this::getBeginLineNumber;
+    }
+
+    private long getBeginLineNumber() {
+        return beginLineNumber;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 81b8e41..3a502d0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.external.input.record.reader.stream;
 
-import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_IN_Q;
+import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_AT_EOF;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -60,7 +60,7 @@ public class QuotedLineRecordReader extends LineRecordReader {
     @Override
     public void notifyNewSource() {
         if (!record.isEmptyRecord() && warnings.shouldWarn()) {
-            ParseUtil.warn(warnings, getDataSourceName().get(), recordNumber, 0, REC_ENDED_IN_Q);
+            ParseUtil.warn(warnings, getPreviousStreamName(), lineNumber, 0, REC_ENDED_AT_EOF);
         }
         // restart for a new record from a new source
         resetForNewSource();
@@ -90,6 +90,7 @@ public class QuotedLineRecordReader extends LineRecordReader {
             if (done) {
                 return false;
             }
+            beginLineNumber = lineNumber;
             newlineLength = 0;
             prevCharCR = false;
             prevCharEscape = false;
@@ -106,7 +107,7 @@ public class QuotedLineRecordReader extends LineRecordReader {
                         if (readLength <= 0 || inQuote) {
                             // haven't read anything previously OR have read and in the middle and hit the end
                             if (inQuote && warnings.shouldWarn()) {
-                                ParseUtil.warn(warnings, getDataSourceName().get(), recordNumber, 0, REC_ENDED_IN_Q);
+                                ParseUtil.warn(warnings, getDataSourceName().get(), lineNumber, 0, REC_ENDED_AT_EOF);
                             }
                             close();
                             return false;
@@ -117,13 +118,18 @@ public class QuotedLineRecordReader extends LineRecordReader {
                 }
                 boolean maybeInQuote = false;
                 for (; bufferPosn < bufferLength; ++bufferPosn) {
-                    if (inputBuffer[bufferPosn] == quote && escape == quote) {
+                    char ch = inputBuffer[bufferPosn];
+                    // count lines here since we need to also count the lines inside quotes
+                    if (ch == ExternalDataConstants.LF || prevCharCR) {
+                        lineNumber++;
+                    }
+                    if (ch == quote && escape == quote) {
                         inQuote |= maybeInQuote;
                         prevCharEscape |= maybeInQuote;
                     }
                     maybeInQuote = false;
                     if (!inQuote) {
-                        if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+                        if (ch == ExternalDataConstants.LF) {
                             newlineLength = (prevCharCR) ? 2 : 1;
                             ++bufferPosn;
                             break;
@@ -132,20 +138,20 @@ public class QuotedLineRecordReader extends LineRecordReader {
                             newlineLength = 1;
                             break;
                         }
-                        prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
                         // if this is an opening quote, mark it
-                        inQuote = inputBuffer[bufferPosn] == quote && !prevCharEscape;
+                        inQuote = ch == quote && !prevCharEscape;
                         // the escape != quote is for making an opening quote not an escape
-                        prevCharEscape = inputBuffer[bufferPosn] == escape && !prevCharEscape && escape != quote;
+                        prevCharEscape = ch == escape && !prevCharEscape && escape != quote;
                     } else {
                         // if quote == escape and current char is quote, then it could be closing or escaping
-                        if (inputBuffer[bufferPosn] == quote && !prevCharEscape) {
+                        if (ch == quote && !prevCharEscape) {
                             // this is most likely a closing quote. the outcome depends on the next char
                             inQuote = false;
                             maybeInQuote = true;
                         }
-                        prevCharEscape = inputBuffer[bufferPosn] == escape && !prevCharEscape && escape != quote;
+                        prevCharEscape = ch == escape && !prevCharEscape && escape != quote;
                     }
+                    prevCharCR = (ch == ExternalDataConstants.CR);
                 }
                 readLength = bufferPosn - startPosn;
                 if (readLength > 0) {
@@ -159,7 +165,6 @@ public class QuotedLineRecordReader extends LineRecordReader {
                 newSource = false;
                 continue;
             }
-            recordNumber++;
             return true;
         }
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 1fb5b25..dfc60bc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -24,6 +24,7 @@ import static org.apache.asterix.external.util.ExternalDataConstants.CR;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
 import static org.apache.asterix.external.util.ExternalDataConstants.LF;
 import static org.apache.asterix.external.util.ExternalDataConstants.OPEN_BRACKET;
+import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_AT_EOF;
 import static org.apache.asterix.external.util.ExternalDataConstants.SPACE;
 import static org.apache.asterix.external.util.ExternalDataConstants.TAB;
 
@@ -32,14 +33,17 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.LongSupplier;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.ParseUtil;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 
 public class SemiStructuredRecordReader extends StreamRecordReader {
 
@@ -50,6 +54,7 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
         AFTER_COMMA // valid chars at this state: '{' to start a new nested record
     }
 
+    private IWarningCollector warnings;
     private int depth;
     private boolean prevCharEscape;
     private boolean inString;
@@ -57,8 +62,10 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
     private char recordEnd;
     private boolean hasStarted;
     private boolean hasFinished;
-    private int recordNumber = 0;
+    private boolean isLastCharCR;
     private State state = State.TOP_LEVEL;
+    private long beginLineNumber = 1;
+    private long lineNumber = 1;
 
     private static final List<String> recordReaderFormats = Collections.unmodifiableList(
             Arrays.asList(ExternalDataConstants.FORMAT_ADM, ExternalDataConstants.FORMAT_JSON_LOWER_CASE,
@@ -70,6 +77,7 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
             throws HyracksDataException {
         super.configure(stream, config);
         stream.setNotificationHandler(this);
+        warnings = ctx.getWarningCollector();
         // set record opening char
         recordStart = ExternalDataUtils.validateGetRecordStart(config);
         // set record ending char
@@ -81,16 +89,22 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
 
     @Override
     public void notifyNewSource() {
-        if (hasStarted) {
-            // TODO(ali): WARN
+        if (hasStarted && warnings.shouldWarn()) {
+            ParseUtil.warn(warnings, getPreviousStreamName(), lineNumber, 0, REC_ENDED_AT_EOF);
         }
-        recordNumber = 0;
+        beginLineNumber = 1;
+        lineNumber = 1;
         state = State.TOP_LEVEL;
         resetForNewRecord();
     }
 
-    public int getRecordNumber() {
-        return recordNumber;
+    @Override
+    public LongSupplier getLineNumber() {
+        return this::getBeginLineNumber;
+    }
+
+    private long getBeginLineNumber() {
+        return beginLineNumber;
     }
 
     @Override
@@ -99,12 +113,16 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
             return false;
         }
         resetForNewRecord();
+        beginLineNumber = lineNumber;
         do {
             int startPosn = bufferPosn; // starting from where we left off the last time
             if (bufferPosn >= bufferLength) {
                 startPosn = bufferPosn = 0;
                 bufferLength = reader.read(inputBuffer);
                 if (bufferLength < 0) {
+                    if (hasStarted && warnings.shouldWarn()) {
+                        ParseUtil.warn(warnings, getDataSourceName().get(), lineNumber, 0, REC_ENDED_AT_EOF);
+                    }
                     close();
                     return false; // EOF
                 }
@@ -112,6 +130,10 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
             if (!hasStarted) {
                 for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
                     char c = inputBuffer[bufferPosn];
+                    if (c == LF || isLastCharCR) {
+                        lineNumber++;
+                    }
+                    isLastCharCR = c == CR;
                     if (c == SPACE || c == TAB || c == LF || c == CR) {
                         continue;
                     }
@@ -144,18 +166,22 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
             }
             if (hasStarted) {
                 for (; bufferPosn < bufferLength; ++bufferPosn) {
+                    char c = inputBuffer[bufferPosn];
+                    if (c == LF || isLastCharCR) {
+                        lineNumber++;
+                    }
                     if (inString) {
                         // we are in a string, we only care about the string end
-                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
+                        if (c == ExternalDataConstants.QUOTE && !prevCharEscape) {
                             inString = false;
                         }
-                        prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE && !prevCharEscape;
+                        prevCharEscape = c == ExternalDataConstants.ESCAPE && !prevCharEscape;
                     } else {
-                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
+                        if (c == ExternalDataConstants.QUOTE) {
                             inString = true;
-                        } else if (inputBuffer[bufferPosn] == recordStart) {
+                        } else if (c == recordStart) {
                             depth += 1;
-                        } else if (inputBuffer[bufferPosn] == recordEnd) {
+                        } else if (c == recordEnd) {
                             depth -= 1;
                             if (depth == 0) {
                                 hasFinished = true;
@@ -164,6 +190,7 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
                             }
                         }
                     }
+                    isLastCharCR = c == CR;
                 }
             }
 
@@ -179,7 +206,6 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
             }
         } while (!hasFinished);
         record.endRecord();
-        recordNumber++;
         return true;
     }
 
@@ -198,6 +224,7 @@ public class SemiStructuredRecordReader extends StreamRecordReader {
         hasStarted = false;
         hasFinished = false;
         prevCharEscape = false;
+        isLastCharCR = false;
         inString = false;
         depth = 0;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 6139f82..cb16de5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -48,6 +48,7 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre
     protected boolean done = false;
     protected FeedLogManager feedLogManager;
     private Supplier<String> dataSourceName = EMPTY_STRING;
+    private Supplier<String> previousDataSourceName = EMPTY_STRING;
 
     public void configure(AsterixInputStream inputStream, Map<String, String> config) {
         this.reader = new AsterixInputStreamReader(inputStream);
@@ -55,6 +56,7 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre
         inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
         if (!ExternalDataUtils.isTrue(config, KEY_REDACT_WARNINGS)) {
             this.dataSourceName = reader::getStreamName;
+            this.previousDataSourceName = reader::getPreviousStreamName;
         }
     }
 
@@ -118,6 +120,10 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre
         return dataSourceName;
     }
 
+    String getPreviousStreamName() {
+        return previousDataSourceName.get();
+    }
+
     public abstract List<String> getRecordReaderFormats();
 
     public abstract String getRequiredConfigs();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index f5f68fe..4e963e4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@ -131,4 +131,8 @@ public class AsterixInputStreamReader extends Reader {
     public String getStreamName() {
         return in.getStreamName();
     }
+
+    public String getPreviousStreamName() {
+        return in.getPreviousStreamName();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 4fec3c4..9e1b052 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -37,6 +37,7 @@ public class LocalFSInputStream extends AbstractMultipleInputStream {
     private static final Logger LOGGER = LogManager.getLogger();
     private final FileSystemWatcher watcher;
     private File currentFile;
+    private String lastFileName = "";
 
     public LocalFSInputStream(FileSystemWatcher watcher) {
         this.watcher = watcher;
@@ -90,6 +91,10 @@ public class LocalFSInputStream extends AbstractMultipleInputStream {
 
     @Override
     protected boolean advance() throws IOException {
+        String tmpLastFileName = "";
+        if (currentFile != null) {
+            tmpLastFileName = currentFile.getPath();
+        }
         closeFile();
         currentFile = watcher.poll();
         if (currentFile == null) {
@@ -100,6 +105,7 @@ public class LocalFSInputStream extends AbstractMultipleInputStream {
         }
         if (currentFile != null) {
             in = new FileInputStream(currentFile);
+            lastFileName = tmpLastFileName;
             if (notificationHandler != null) {
                 notificationHandler.notifyNewSource();
             }
@@ -156,4 +162,9 @@ public class LocalFSInputStream extends AbstractMultipleInputStream {
     public String getStreamName() {
         return currentFile == null ? "" : currentFile.getPath();
     }
+
+    @Override
+    public String getPreviousStreamName() {
+        return lastFileName;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 8ac483e..60e6e77 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -26,6 +26,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import org.apache.asterix.builders.IARecordBuilder;
@@ -65,6 +66,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
     private final IValueParser[] valueParsers;
     private FieldCursorForDelimitedDataParser cursor;
     private Supplier<String> dataSourceName;
+    private LongSupplier lineNumber;
     private final byte[] fieldTypeTags;
     private final int[] fldIds;
     private final ArrayBackedValueStorage[] nameBuffers;
@@ -74,6 +76,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
             char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser, String nullString)
             throws HyracksDataException {
         this.dataSourceName = ExternalDataConstants.EMPTY_STRING;
+        this.lineNumber = ExternalDataConstants.NO_LINES;
         this.warnings = ctx.getWarningCollector();
         this.fieldDelimiter = fieldDelimiter;
         this.quote = quote;
@@ -114,7 +117,8 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
             }
         }
         if (!isStreamParser) {
-            cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote, warnings, dataSourceName);
+            cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote, warnings,
+                    this::getDataSourceName);
         }
         this.nullChars = nullString != null ? nullString.toCharArray() : null;
     }
@@ -122,7 +126,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
     @Override
     public boolean parse(DataOutput out) throws HyracksDataException {
         try {
-            while (cursor.nextRecord()) {
+            if (cursor.nextRecord()) {
                 if (parseRecord()) {
                     recBuilder.write(out, true);
                     return true;
@@ -149,7 +153,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
                         break;
                     case END:
                         if (warnings.shouldWarn()) {
-                            ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(),
+                            ParseUtil.warn(warnings, dataSourceName.get(), cursor.getLineCount(),
                                     cursor.getFieldCount(), MISSING_FIELDS);
                         }
                         return false;
@@ -164,8 +168,10 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
                     fieldValueBufferOutput.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                 } else {
                     if (cursor.isFieldEmpty() && !canProcessEmptyField(recordType.getFieldTypes()[i])) {
-                        ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(), cursor.getFieldCount(),
-                                EMPTY_FIELD);
+                        if (warnings.shouldWarn()) {
+                            ParseUtil.warn(warnings, dataSourceName.get(), cursor.getLineCount(),
+                                    cursor.getFieldCount(), EMPTY_FIELD);
+                        }
                         return false;
                     }
                     fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
@@ -176,8 +182,10 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
                     boolean success = valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(),
                             cursor.getFieldLength(), fieldValueBufferOutput);
                     if (!success) {
-                        ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(), cursor.getFieldCount(),
-                                INVALID_VAL);
+                        if (warnings.shouldWarn()) {
+                            ParseUtil.warn(warnings, dataSourceName.get(), cursor.getLineCount(),
+                                    cursor.getFieldCount(), INVALID_VAL);
+                        }
                         return false;
                     }
                 }
@@ -190,12 +198,19 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
                 throw HyracksDataException.create(e);
             }
         }
-        return true;
+        try {
+            while (cursor.nextField() == FieldCursorForDelimitedDataParser.Result.OK) {
+                // keep reading and discarding the extra fields
+            }
+            return true;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
     }
 
     @Override
     public boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
-        cursor.nextRecord(record.get(), record.size());
+        cursor.nextRecord(record.get(), record.size(), lineNumber.getAsLong());
         if (parseRecord()) {
             recBuilder.write(out, true);
             return true;
@@ -207,7 +222,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
     public void setInputStream(InputStream in) throws IOException {
         // TODO(ali): revisit this in regards to stream
         cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote, warnings,
-                dataSourceName);
+                this::getDataSourceName);
         if (hasHeader) {
             cursor.nextRecord();
             FieldCursorForDelimitedDataParser.Result result;
@@ -224,13 +239,19 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
     public boolean reset(InputStream in) throws IOException {
         // TODO(ali): revisit this in regards to stream
         cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote, warnings,
-                dataSourceName);
+                this::getDataSourceName);
         return true;
     }
 
     @Override
-    public void setDataSourceName(Supplier<String> dataSourceName) {
+    public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
         this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName;
+        this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
+
+    }
+
+    private String getDataSourceName() {
+        return dataSourceName.get();
     }
 
     private static boolean canProcessEmptyField(IAType fieldType) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
index d799f22..820775c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -20,6 +20,8 @@ package org.apache.asterix.external.parser;
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.external.api.IDataParser;
@@ -111,4 +113,10 @@ public class RecordWithMetadataParser<T, O> implements IRecordWithMetadataParser
     public void appendLastParsedPrimaryKeyToTuple(ArrayTupleBuilder tb) throws HyracksDataException {
         rwm.appendPrimaryKeyToTuple(tb);
     }
+
+    @Override
+    public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
+        this.recordParser.configure(dataSourceName, lineNumber);
+        this.converter.configure(lineNumber);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 2644f3d..f60ecdc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -71,7 +71,7 @@ public class DataflowControllerProvider {
                     IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
                     IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
                     // TODO(ali): revisit to think about passing data source name via setter or via createRecordParser
-                    dataParser.setDataSourceName(recordReader.getDataSourceName());
+                    dataParser.configure(recordReader.getDataSourceName(), recordReader.getLineNumber());
                     if (indexingOp) {
                         return new IndexingDataFlowController(ctx, dataParser, recordReader,
                                 ((IIndexingDatasource) recordReader).getIndexer());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index b0acf44..63f57b6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.util;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 public class ExternalDataConstants {
@@ -253,6 +254,7 @@ public class ExternalDataConstants {
     public static final int MAX_RECORD_SIZE = 32000000;
 
     public static final Supplier<String> EMPTY_STRING = () -> "";
+    public static final LongSupplier NO_LINES = () -> -1;
 
     /**
      * Expected parameter values
@@ -270,7 +272,7 @@ public class ExternalDataConstants {
 
     public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
     public static final String MISSING_FIELDS = "some fields are missing";
-    public static final String REC_ENDED_IN_Q = "malformed input record ended inside quote";
+    public static final String REC_ENDED_AT_EOF = "malformed input record ended abruptly";
     public static final String EMPTY_FIELD = "empty value";
     public static final String INVALID_VAL = "invalid value";
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java
index 129f28a..598d9ff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java
@@ -30,9 +30,9 @@ public class ParseUtil {
     private ParseUtil() {
     }
 
-    public static void warn(IWarningCollector warningCollector, String dataSourceName, int recordNum, int fieldNum,
+    public static void warn(IWarningCollector warningCollector, String dataSourceName, long lineNum, int fieldNum,
             String warnMessage) {
         warningCollector.warn(
-                Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName, recordNum, fieldNum, warnMessage));
+                Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName, lineNum, fieldNum, warnMessage));
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 4e3cf4e..ec536c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -140,7 +140,7 @@
 121 = A numeric type promotion error has occurred: %1$s
 122 = Encountered an error while printing the plan
 123 = Insufficient memory is provided for the join operators, please increase the join memory budget.
-124 = Parsing error at %1$s record %2$s field %3$s: %4$s
+124 = Parsing error at %1$s line %2$s field %3$s: %4$s
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index ed2777b..48cfca6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -54,7 +54,7 @@ public class FieldCursorForDelimitedDataParser {
     private char[] buffer; //buffer to holds the input coming form the underlying input stream
     private int fStart; //start position for field
     private int fEnd; //end position for field
-    private int recordCount; //count of records
+    private long lineCount; //count of lines
     private int fieldCount; //count of fields in current record
     private int doubleQuoteCount; //count of double quotes
     private boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes
@@ -99,7 +99,7 @@ public class FieldCursorForDelimitedDataParser {
         doubleQuoteCount = 0;
         startedQuote = false;
         isDoubleQuoteIncludedInThisField = false;
-        recordCount = 0;
+        lineCount = 1;
         fieldCount = 0;
     }
 
@@ -127,12 +127,12 @@ public class FieldCursorForDelimitedDataParser {
         return fieldCount;
     }
 
-    public int getRecordCount() {
-        return recordCount;
+    public long getLineCount() {
+        return lineCount;
     }
 
-    public void nextRecord(char[] buffer, int recordLength) {
-        recordCount++;
+    public void nextRecord(char[] buffer, int recordLength, long lineNumber) {
+        lineCount = lineNumber;
         fieldCount = 0;
         lastDelimiterPosition = -1;
         lastQuotePosition = -1;
@@ -148,7 +148,6 @@ public class FieldCursorForDelimitedDataParser {
     }
 
     public boolean nextRecord() throws IOException {
-        recordCount++;
         fieldCount = 0;
         while (true) {
             switch (state) {
@@ -164,6 +163,7 @@ public class FieldCursorForDelimitedDataParser {
 
                 case IN_RECORD:
                     int p = start;
+                    char lastChar = '\0';
                     while (true) {
                         if (p >= end) {
                             int s = start;
@@ -204,6 +204,11 @@ public class FieldCursorForDelimitedDataParser {
                             lastDelimiterPosition = p;
                             break;
                         }
+                        // count lines inside quotes
+                        if (ch == '\r' || (ch == '\n' && lastChar != '\r')) {
+                            lineCount++;
+                        }
+                        lastChar = ch;
                         ++p;
                     }
                     break;
@@ -217,6 +222,10 @@ public class FieldCursorForDelimitedDataParser {
                         }
                     }
                     char ch = buffer[start];
+                    // if the next char "ch" is not \n, then count the \r
+                    if (ch != '\n') {
+                        lineCount++;
+                    }
                     if (ch == '\n' && !startedQuote) {
                         ++start;
                         state = State.EOR;
@@ -226,6 +235,7 @@ public class FieldCursorForDelimitedDataParser {
                     }
 
                 case EOR:
+                    lineCount++;
                     if (start >= end) {
                         eof = !readMore();
                         if (eof) {
@@ -265,6 +275,7 @@ public class FieldCursorForDelimitedDataParser {
                 quoteCount = 0;
                 doubleQuoteCount = 0;
 
+                char lastChar = '\0';
                 int p = start;
                 while (true) {
                     if (p >= end) {
@@ -380,6 +391,11 @@ public class FieldCursorForDelimitedDataParser {
                             return Result.OK;
                         }
                     }
+                    // count lines inside quotes
+                    if (ch == '\r' || (ch == '\n' && lastChar != '\r')) {
+                        lineCount++;
+                    }
+                    lastChar = ch;
                     ++p;
                 }
         }
@@ -434,7 +450,7 @@ public class FieldCursorForDelimitedDataParser {
     }
 
     private void warn(String message) {
-        warnings.warn(Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName.get(), recordCount,
-                fieldCount, message));
+        warnings.warn(Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName.get(), lineCount, fieldCount,
+                message));
     }
 }