You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2020/05/14 20:25:55 UTC

[asterixdb] 02/26: [ASTERIXDB-2713][EXT] CSV & TSV support for external dataset p3

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 2bbcdd8c8dd2dbb734528bd074d278acbe85908c
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Tue Apr 21 23:16:29 2020 -0700

    [ASTERIXDB-2713][EXT] CSV & TSV support for external dataset p3
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    	IRecordDataParser, IRecordReader, IRecordConverter
    
    Details:
    - record parser:
    	- delimited-data (CSV/TSV) parser: ignore and warn for invalid records.
    	- other parses: continue to use their existing behaviour.
    - stream parser:
    	continue to use their existing behaviour.
    - fixes:
    	- fixed S3 stream read() to properly advance to next files and also
    	  to notify consumers to handle properties like header properly.
    	- fixed localfs stream read() when reached end of current file
    	  and notifying of a new file source.
    	- extracted the read() of both streams since now they are identical.
    - report file, record number and field number in warnings of parser
    - propagate stream name to parsers that need report stream name
    - add test cases
    
    Change-Id: Ie1ba545d753d8afef9cef4e290e058019a465201
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5926
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
 asterixdb/asterix-app/data/csv/empty.csv           |  5 --
 .../data/csv/{empty.csv => empty_lines.csv}        |  0
 .../asterix-app/data/csv/header/h_mul_rec.csv      |  4 ++
 .../data/csv/header/h_mul_rec_with_ln.csv          |  4 ++
 .../asterix-app/data/csv/header/h_one_rec.csv      |  2 +
 .../data/csv/header/h_one_rec_with_ln.csv          |  2 +
 asterixdb/asterix-app/data/csv/header/h_only.csv   |  1 +
 .../asterix-app/data/csv/header/h_only_with_ln.csv |  1 +
 .../data/csv/no_header/no_h_missing_fields.csv     |  3 +
 .../data/csv/no_header/no_h_mul_rec.csv            |  3 +
 .../data/csv/no_header/no_h_mul_rec_with_ln.csv    |  3 +
 .../data/csv/no_header/no_h_no_closing_q.csv       |  1 +
 .../data/csv/no_header/no_h_one_rec.csv            |  1 +
 .../data/csv/no_header/no_h_one_rec_with_ln.csv    |  1 +
 .../csv_07/csv_07.1.adm => data/tsv/empty.tsv}     |  0
 .../data/{csv/empty.csv => tsv/empty_lines.tsv}    |  0
 .../asterix-app/data/tsv/header/h_mul_rec.tsv      |  4 ++
 .../data/tsv/header/h_mul_rec_with_ln.tsv          |  4 ++
 .../asterix-app/data/tsv/header/h_one_rec.tsv      |  2 +
 .../data/tsv/header/h_one_rec_with_ln.tsv          |  2 +
 asterixdb/asterix-app/data/tsv/header/h_only.tsv   |  1 +
 .../asterix-app/data/tsv/header/h_only_with_ln.tsv |  1 +
 .../data/tsv/no_header/no_h_missing_fields.tsv     |  3 +
 .../data/tsv/no_header/no_h_mul_rec.tsv            |  3 +
 .../data/tsv/no_header/no_h_mul_rec_with_ln.tsv    |  3 +
 .../data/tsv/no_header/no_h_one_rec.tsv            |  1 +
 .../data/tsv/no_header/no_h_one_rec_with_ln.tsv    |  1 +
 .../aws/AwsS3ExternalDatasetOnePartitionTest.java  | 55 ++++++++++++++
 .../aws/AwsS3ExternalDatasetTest.java              | 78 +++++++++++++++++++-
 .../csv-parser-001/csv-parser-001.1.ddl.sqlpp      |  2 +-
 .../csv-header/query-dataset.000.s3bucket.sqlpp}   | 17 +----
 .../aws/s3/csv-header/query-dataset.001.ddl.sqlpp} | 28 ++++----
 .../s3/csv-header/query-dataset.002.query.sqlpp}   | 18 ++---
 .../csv-header/query-dataset.003.s3bucket.sqlpp}   | 15 +---
 .../s3/csv-header/query-dataset.005.query.sqlpp}   | 18 ++---
 .../csv-header/query-dataset.006.s3bucket.sqlpp}   | 15 +---
 .../s3/csv-header/query-dataset.008.query.sqlpp}   | 18 ++---
 .../csv-header/query-dataset.009.s3bucket.sqlpp}   | 15 +---
 .../s3/csv-header/query-dataset.011.query.sqlpp}   | 18 ++---
 .../aws/s3/csv-header/query-dataset.099.ddl.sqlpp} | 15 +---
 .../query-dataset.000.s3bucket.sqlpp}              | 17 +----
 .../s3/csv-no-header/query-dataset.001.ddl.sqlpp}  | 28 ++++----
 .../csv-no-header/query-dataset.002.query.sqlpp}   | 18 ++---
 .../query-dataset.003.s3bucket.sqlpp}              | 15 +---
 .../csv-no-header/query-dataset.005.query.sqlpp}   | 18 ++---
 .../query-dataset.006.s3bucket.sqlpp}              | 15 +---
 .../csv-no-header/query-dataset.008.query.sqlpp}   | 18 ++---
 .../query-dataset.009.s3bucket.sqlpp}              | 15 +---
 .../csv-no-header/query-dataset.011.query.sqlpp}   | 18 ++---
 .../s3/csv-no-header/query-dataset.099.ddl.sqlpp}  | 15 +---
 .../csv-warnings/query-dataset.001.s3bucket.sqlpp} | 15 +---
 .../s3/csv-warnings/query-dataset.002.ddl.sqlpp}   | 28 ++++----
 .../s3/csv-warnings/query-dataset.003.query.sqlpp} | 18 ++---
 .../csv-warnings/query-dataset.004.s3bucket.sqlpp} | 15 +---
 .../s3/csv-warnings/query-dataset.006.query.sqlpp} | 18 ++---
 .../s3/csv-warnings/query-dataset.099.ddl.sqlpp}   | 15 +---
 .../tsv-header/query-dataset.000.s3bucket.sqlpp}   | 17 +----
 .../aws/s3/tsv-header/query-dataset.001.ddl.sqlpp} | 28 ++++----
 .../s3/tsv-header/query-dataset.002.query.sqlpp}   | 18 ++---
 .../tsv-header/query-dataset.003.s3bucket.sqlpp}   | 15 +---
 .../s3/tsv-header/query-dataset.005.query.sqlpp}   | 18 ++---
 .../tsv-header/query-dataset.006.s3bucket.sqlpp}   | 15 +---
 .../s3/tsv-header/query-dataset.008.query.sqlpp}   | 18 ++---
 .../tsv-header/query-dataset.009.s3bucket.sqlpp}   | 15 +---
 .../s3/tsv-header/query-dataset.011.query.sqlpp}   | 18 ++---
 .../aws/s3/tsv-header/query-dataset.099.ddl.sqlpp} | 15 +---
 .../query-dataset.000.s3bucket.sqlpp}              | 17 +----
 .../s3/tsv-no-header/query-dataset.001.ddl.sqlpp}  | 28 ++++----
 .../tsv-no-header/query-dataset.002.query.sqlpp}   | 18 ++---
 .../query-dataset.003.s3bucket.sqlpp}              | 15 +---
 .../tsv-no-header/query-dataset.005.query.sqlpp}   | 18 ++---
 .../query-dataset.006.s3bucket.sqlpp}              | 15 +---
 .../tsv-no-header/query-dataset.008.query.sqlpp}   | 18 ++---
 .../query-dataset.009.s3bucket.sqlpp}              | 15 +---
 .../tsv-no-header/query-dataset.011.query.sqlpp}   | 18 ++---
 .../s3/tsv-no-header/query-dataset.099.ddl.sqlpp}  | 15 +---
 .../tsv-warnings/query-dataset.001.s3bucket.sqlpp} | 15 +---
 .../s3/tsv-warnings/query-dataset.002.ddl.sqlpp}   | 28 ++++----
 .../s3/tsv-warnings/query-dataset.003.query.sqlpp} | 18 ++---
 .../s3/tsv-warnings/query-dataset.099.ddl.sqlpp}   | 15 +---
 .../aws/s3/csv-header/external_dataset.001.adm}    |  0
 .../aws/s3/csv-header/external_dataset.002.adm}    |  0
 .../aws/s3/csv-header/external_dataset.003.adm     |  2 +
 .../aws/s3/csv-header/external_dataset.004.adm     |  6 ++
 .../aws/s3/csv-no-header/external_dataset.001.adm} |  0
 .../aws/s3/csv-no-header/external_dataset.002.adm} |  0
 .../aws/s3/csv-no-header/external_dataset.003.adm  |  2 +
 .../aws/s3/csv-no-header/external_dataset.004.adm  |  6 ++
 .../aws/s3/csv-warnings/external_dataset.001.adm   |  2 +
 .../aws/s3/csv-warnings/external_dataset.002.adm   |  1 +
 .../aws/s3/tsv-header/external_dataset.001.adm}    |  0
 .../aws/s3/tsv-header/external_dataset.002.adm}    |  0
 .../aws/s3/tsv-header/external_dataset.003.adm     |  2 +
 .../aws/s3/tsv-header/external_dataset.004.adm     |  6 ++
 .../aws/s3/tsv-no-header/external_dataset.001.adm} |  0
 .../aws/s3/tsv-no-header/external_dataset.002.adm} |  0
 .../aws/s3/tsv-no-header/external_dataset.003.adm  |  2 +
 .../aws/s3/tsv-no-header/external_dataset.004.adm  |  6 ++
 .../aws/s3/tsv-warnings/external_dataset.001.adm   |  2 +
 .../runtimets/results/load/csv_06/csv_06.1.adm     |  3 +
 .../runtimets/results/load/csv_07/csv_07.1.adm     |  3 +
 .../src/test/resources/runtimets/testsuite.xml     |  3 -
 .../testsuite_external_dataset_one_partition.xml   | 56 +++++++++++++++
 .../test/resources/runtimets/testsuite_sqlpp.xml   |  6 --
 .../asterix/common/exceptions/ErrorCode.java       |  1 -
 .../src/main/resources/asx_errormsg/en.properties  |  1 -
 .../asterix/external/api/AsterixInputStream.java   |  4 ++
 .../asterix/external/api/IRecordDataParser.java    | 25 +++++--
 .../apache/asterix/external/api/IRecordReader.java |  6 ++
 .../dataflow/FeedRecordDataFlowController.java     |  7 +-
 .../dataflow/RecordDataFlowController.java         |  9 +--
 .../external/dataset/adapter/LookupAdapter.java    |  7 +-
 .../CSVToRecordWithMetadataAndPKConverter.java     | 15 +++-
 .../converter/CSVWithRecordConverterFactory.java   |  5 +-
 .../record/converter/DCPConverterFactory.java      |  3 +-
 .../record/converter/IRecordConverterFactory.java  |  3 +-
 .../input/record/reader/aws/AwsS3InputStream.java  | 64 ++++++-----------
 .../record/reader/stream/LineRecordReader.java     | 37 +++++-----
 .../reader/stream/QuotedLineRecordReader.java      | 61 +++++++++-------
 .../record/reader/stream/StreamRecordReader.java   | 19 ++++-
 .../input/stream/AbstractMultipleInputStream.java  | 78 ++++++++++++++++++++
 .../input/stream/AsterixInputStreamReader.java     |  4 ++
 .../external/input/stream/LocalFSInputStream.java  | 48 +++----------
 .../asterix/external/parser/ADMDataParser.java     | 10 +--
 .../external/parser/DelimitedDataParser.java       | 84 +++++++++++++++-------
 .../asterix/external/parser/HiveRecordParser.java  |  3 +-
 .../asterix/external/parser/JSONDataParser.java    |  3 +-
 .../apache/asterix/external/parser/RSSParser.java  |  3 +-
 .../external/parser/RecordWithMetadataParser.java  |  5 +-
 .../external/parser/RecordWithPKDataParser.java    |  5 +-
 .../asterix/external/parser/TweetParser.java       |  3 +-
 .../factory/RecordWithMetadataParserFactory.java   |  2 +-
 .../provider/DataflowControllerProvider.java       |  6 ++
 .../external/util/ExternalDataConstants.java       |  5 ++
 .../ParseUtil.java}                                | 32 ++++-----
 .../external/classad/test/ClassAdToADMTest.java    |  7 +-
 .../asterix/external/library/ClassAdParser.java    |  7 +-
 .../external/parser/TestRecordWithPKParser.java    |  4 +-
 .../external/parser/test/RecordWithMetaTest.java   | 21 +++---
 .../external/parser/test/TweetParserTest.java      |  9 ++-
 .../apache/hyracks/api/exceptions/ErrorCode.java   |  1 +
 .../src/main/resources/errormsg/en.properties      |  1 +
 .../std/file/DelimitedDataTupleParserFactory.java  | 21 ++++--
 .../file/FieldCursorForDelimitedDataParser.java    | 70 ++++++++++++++----
 .../hyracks/dataflow/std/file/CursorTest.java      | 11 ++-
 145 files changed, 938 insertions(+), 964 deletions(-)

diff --git a/asterixdb/asterix-app/data/csv/empty.csv b/asterixdb/asterix-app/data/csv/empty.csv
index 3f2ff2d..e69de29 100644
--- a/asterixdb/asterix-app/data/csv/empty.csv
+++ b/asterixdb/asterix-app/data/csv/empty.csv
@@ -1,5 +0,0 @@
-
-
-
-
-
diff --git a/asterixdb/asterix-app/data/csv/empty.csv b/asterixdb/asterix-app/data/csv/empty_lines.csv
similarity index 100%
copy from asterixdb/asterix-app/data/csv/empty.csv
copy to asterixdb/asterix-app/data/csv/empty_lines.csv
diff --git a/asterixdb/asterix-app/data/csv/header/h_mul_rec.csv b/asterixdb/asterix-app/data/csv/header/h_mul_rec.csv
new file mode 100644
index 0000000..23d0bcd
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/header/h_mul_rec.csv
@@ -0,0 +1,4 @@
+f1,f2,f3,f4
+1,2,3,"str"
+4,5,6,"rts"
+7,8,9,"srt"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/header/h_mul_rec_with_ln.csv b/asterixdb/asterix-app/data/csv/header/h_mul_rec_with_ln.csv
new file mode 100644
index 0000000..3ea2987
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/header/h_mul_rec_with_ln.csv
@@ -0,0 +1,4 @@
+f1,f2,f3,f4
+1,2,3,"str"
+4,5,6,"rts"
+7,8,9,"srt"
diff --git a/asterixdb/asterix-app/data/csv/header/h_one_rec.csv b/asterixdb/asterix-app/data/csv/header/h_one_rec.csv
new file mode 100644
index 0000000..236141c
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/header/h_one_rec.csv
@@ -0,0 +1,2 @@
+f1,f2,f3,f4
+1,2,3,"str"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/header/h_one_rec_with_ln.csv b/asterixdb/asterix-app/data/csv/header/h_one_rec_with_ln.csv
new file mode 100644
index 0000000..83b19b3
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/header/h_one_rec_with_ln.csv
@@ -0,0 +1,2 @@
+f1,f2,f3,f4
+1,2,3,"str"
diff --git a/asterixdb/asterix-app/data/csv/header/h_only.csv b/asterixdb/asterix-app/data/csv/header/h_only.csv
new file mode 100644
index 0000000..7988898
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/header/h_only.csv
@@ -0,0 +1 @@
+f1,f2,f3,f4
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/header/h_only_with_ln.csv b/asterixdb/asterix-app/data/csv/header/h_only_with_ln.csv
new file mode 100644
index 0000000..33ddfb1
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/header/h_only_with_ln.csv
@@ -0,0 +1 @@
+f1,f2,f3,f4
diff --git a/asterixdb/asterix-app/data/csv/no_header/no_h_missing_fields.csv b/asterixdb/asterix-app/data/csv/no_header/no_h_missing_fields.csv
new file mode 100644
index 0000000..8dbe35c
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/no_header/no_h_missing_fields.csv
@@ -0,0 +1,3 @@
+1,2,3,"str"
+4,5,6
+7,8,9,"srt"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/no_header/no_h_mul_rec.csv b/asterixdb/asterix-app/data/csv/no_header/no_h_mul_rec.csv
new file mode 100644
index 0000000..85abbfb
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/no_header/no_h_mul_rec.csv
@@ -0,0 +1,3 @@
+1,2,3,"str"
+4,5,6,"rts"
+7,8,9,"srt"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/no_header/no_h_mul_rec_with_ln.csv b/asterixdb/asterix-app/data/csv/no_header/no_h_mul_rec_with_ln.csv
new file mode 100644
index 0000000..e20e795
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/no_header/no_h_mul_rec_with_ln.csv
@@ -0,0 +1,3 @@
+1,2,3,"str"
+4,5,6,"rts"
+7,8,9,"srt"
diff --git a/asterixdb/asterix-app/data/csv/no_header/no_h_no_closing_q.csv b/asterixdb/asterix-app/data/csv/no_header/no_h_no_closing_q.csv
new file mode 100644
index 0000000..abefcf7
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/no_header/no_h_no_closing_q.csv
@@ -0,0 +1 @@
+1,2,3,"5
diff --git a/asterixdb/asterix-app/data/csv/no_header/no_h_one_rec.csv b/asterixdb/asterix-app/data/csv/no_header/no_h_one_rec.csv
new file mode 100644
index 0000000..e80e3a2
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/no_header/no_h_one_rec.csv
@@ -0,0 +1 @@
+1,2,3,"str"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/no_header/no_h_one_rec_with_ln.csv b/asterixdb/asterix-app/data/csv/no_header/no_h_one_rec_with_ln.csv
new file mode 100644
index 0000000..a884ca6
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/no_header/no_h_one_rec_with_ln.csv
@@ -0,0 +1 @@
+1,2,3,"str"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/data/tsv/empty.tsv
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
copy to asterixdb/asterix-app/data/tsv/empty.tsv
diff --git a/asterixdb/asterix-app/data/csv/empty.csv b/asterixdb/asterix-app/data/tsv/empty_lines.tsv
similarity index 100%
copy from asterixdb/asterix-app/data/csv/empty.csv
copy to asterixdb/asterix-app/data/tsv/empty_lines.tsv
diff --git a/asterixdb/asterix-app/data/tsv/header/h_mul_rec.tsv b/asterixdb/asterix-app/data/tsv/header/h_mul_rec.tsv
new file mode 100644
index 0000000..ba73bb2
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/header/h_mul_rec.tsv
@@ -0,0 +1,4 @@
+f1	f2	f3	f4
+1	2	3	"str"
+4	5	6	"rts"
+7	8	9	"srt"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/header/h_mul_rec_with_ln.tsv b/asterixdb/asterix-app/data/tsv/header/h_mul_rec_with_ln.tsv
new file mode 100644
index 0000000..eaf1fab
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/header/h_mul_rec_with_ln.tsv
@@ -0,0 +1,4 @@
+f1	f2	f3	f4
+1	2	3	"str"
+4	5	6	"rts"
+7	8	9	"srt"
diff --git a/asterixdb/asterix-app/data/tsv/header/h_one_rec.tsv b/asterixdb/asterix-app/data/tsv/header/h_one_rec.tsv
new file mode 100644
index 0000000..167b56b
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/header/h_one_rec.tsv
@@ -0,0 +1,2 @@
+f1	f2	f3	f4
+1	2	3	"str"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/header/h_one_rec_with_ln.tsv b/asterixdb/asterix-app/data/tsv/header/h_one_rec_with_ln.tsv
new file mode 100644
index 0000000..95d2d55
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/header/h_one_rec_with_ln.tsv
@@ -0,0 +1,2 @@
+f1	f2	f3	f4
+1	2	3	"str"
diff --git a/asterixdb/asterix-app/data/tsv/header/h_only.tsv b/asterixdb/asterix-app/data/tsv/header/h_only.tsv
new file mode 100644
index 0000000..b72029a
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/header/h_only.tsv
@@ -0,0 +1 @@
+f1	f2	f3	f4
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/header/h_only_with_ln.tsv b/asterixdb/asterix-app/data/tsv/header/h_only_with_ln.tsv
new file mode 100644
index 0000000..8180e15
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/header/h_only_with_ln.tsv
@@ -0,0 +1 @@
+f1	f2	f3	f4
diff --git a/asterixdb/asterix-app/data/tsv/no_header/no_h_missing_fields.tsv b/asterixdb/asterix-app/data/tsv/no_header/no_h_missing_fields.tsv
new file mode 100644
index 0000000..75002ac
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/no_header/no_h_missing_fields.tsv
@@ -0,0 +1,3 @@
+1	2	3	"str"
+4	5	6
+7	8	9	"srt"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/no_header/no_h_mul_rec.tsv b/asterixdb/asterix-app/data/tsv/no_header/no_h_mul_rec.tsv
new file mode 100644
index 0000000..517974a
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/no_header/no_h_mul_rec.tsv
@@ -0,0 +1,3 @@
+1	2	3	"str"
+4	5	6	"rts"
+7	8	9	"srt"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/no_header/no_h_mul_rec_with_ln.tsv b/asterixdb/asterix-app/data/tsv/no_header/no_h_mul_rec_with_ln.tsv
new file mode 100644
index 0000000..869fefb
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/no_header/no_h_mul_rec_with_ln.tsv
@@ -0,0 +1,3 @@
+1	2	3	"str"
+4	5	6	"rts"
+7	8	9	"srt"
diff --git a/asterixdb/asterix-app/data/tsv/no_header/no_h_one_rec.tsv b/asterixdb/asterix-app/data/tsv/no_header/no_h_one_rec.tsv
new file mode 100644
index 0000000..d076cbe
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/no_header/no_h_one_rec.tsv
@@ -0,0 +1 @@
+1	2	3	"str"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/no_header/no_h_one_rec_with_ln.tsv b/asterixdb/asterix-app/data/tsv/no_header/no_h_one_rec_with_ln.tsv
new file mode 100644
index 0000000..53d0de6
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/no_header/no_h_one_rec_with_ln.tsv
@@ -0,0 +1 @@
+1	2	3	"str"
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
new file mode 100644
index 0000000..6ac1259
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.aws;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.FixMethodOrder;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+
+/**
+ * Runs an AWS S3 mock server and test it as an external dataset using one node one partition.
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class AwsS3ExternalDatasetOnePartitionTest extends AwsS3ExternalDatasetTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final String SUITE_PATH = "testsuite_external_dataset_one_partition.xml";
+
+    @Parameterized.Parameters(name = "SqlppExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf";
+        PREPARE_S3_BUCKET = AwsS3ExternalDatasetOnePartitionTest::prepareS3Bucket;
+        return LangExecutionUtil.tests("only_external_dataset.xml", SUITE_PATH);
+    }
+
+    public AwsS3ExternalDatasetOnePartitionTest(TestCaseContext tcCtx) {
+        super(tcCtx);
+    }
+
+    private static void prepareS3Bucket() {
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index d2158ba..b3fff7a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -20,26 +20,36 @@ package org.apache.asterix.test.external_dataset.aws;
 
 import static org.apache.hyracks.util.file.FileUtil.joinPath;
 
+import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
 import org.apache.asterix.test.runtime.LangExecutionUtil;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.context.TestFileContext;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
@@ -50,17 +60,21 @@ import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 
 /**
  * Runs an AWS S3 mock server and test it as an external dataset
  */
 @RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class AwsS3ExternalDatasetTest {
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+    protected static String TEST_CONFIG_FILE_NAME;
+    static Runnable PREPARE_S3_BUCKET;
 
     // S3 mock server
     private static S3Mock s3MockServer;
@@ -76,10 +90,14 @@ public class AwsS3ExternalDatasetTest {
     private static final String S3_MOCK_SERVER_HOSTNAME = "http://localhost:" + S3_MOCK_SERVER_PORT;
     private static final String CSV_DATA_PATH = joinPath("data", "csv");
     private static final String TSV_DATA_PATH = joinPath("data", "tsv");
+    private static final Set<String> fileNames = new HashSet<>();
+    private static final CreateBucketRequest.Builder CREATE_BUCKET_BUILDER = CreateBucketRequest.builder();
+    private static final DeleteBucketRequest.Builder DELETE_BUCKET_BUILDER = DeleteBucketRequest.builder();
+    private static final PutObjectRequest.Builder PUT_OBJECT_BUILDER = PutObjectRequest.builder();
 
     @BeforeClass
     public static void setUp() throws Exception {
-        final TestExecutor testExecutor = new TestExecutor();
+        final TestExecutor testExecutor = new AwsTestExecutor();
         LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
         setNcEndpoints(testExecutor);
         startAwsS3MockServer();
@@ -102,6 +120,8 @@ public class AwsS3ExternalDatasetTest {
 
     @Parameters(name = "SqlppExecutionTest {index}: {0}")
     public static Collection<Object[]> tests() throws Exception {
+        TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+        PREPARE_S3_BUCKET = AwsS3ExternalDatasetTest::prepareS3Bucket;
         return LangExecutionUtil.tests("only_external_dataset.xml", "testsuite_external_dataset.xml");
     }
 
@@ -149,7 +169,7 @@ public class AwsS3ExternalDatasetTest {
         LOGGER.info("Client created successfully");
 
         // Create the bucket and upload some json files
-        prepareS3Bucket();
+        PREPARE_S3_BUCKET.run();
     }
 
     /**
@@ -239,4 +259,56 @@ public class AwsS3ExternalDatasetTest {
                 RequestBody.fromFile(Paths.get(TSV_DATA_PATH, "02.tsv")));
         LOGGER.info("Files added successfully");
     }
+
+    static class AwsTestExecutor extends TestExecutor {
+
+        public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
+                String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, TestCase.CompilationUnit cUnit,
+                MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
+                MutableInt actualWarnCount) throws Exception {
+            String[] lines;
+            switch (ctx.getType()) {
+                case "s3bucket":
+                    // <bucket_name> <def_name> <file1,file2,file3>
+                    lines = TestExecutor.stripAllComments(statement).trim().split("\n");
+                    String lastLine = lines[lines.length - 1];
+                    String[] command = lastLine.trim().split(" ");
+                    int length = command.length;
+                    if (length != 3) {
+                        throw new Exception("invalid create bucket format");
+                    }
+                    dropRecreateBucket(command[0], command[1], command[2]);
+                    break;
+                default:
+                    super.executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
+                            queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
+            }
+        }
+    }
+
+    private static void dropRecreateBucket(String bucketName, String definition, String files) {
+        String definitionPath = definition + (definition.endsWith("/") ? "" : "/");
+        String[] fileSplits = files.split(",");
+
+        LOGGER.info("Dropping bucket");
+        try {
+            client.deleteBucket(DELETE_BUCKET_BUILDER.bucket(bucketName).build());
+        } catch (NoSuchBucketException e) {
+            // ignore
+        }
+        LOGGER.info("Creating bucket " + bucketName);
+        client.createBucket(CREATE_BUCKET_BUILDER.bucket(bucketName).build());
+        LOGGER.info("Uploading to bucket " + bucketName + " definition " + definitionPath);
+        fileNames.clear();
+        for (int i = 0; i < fileSplits.length; i++) {
+            String fileName = FilenameUtils.getName(fileSplits[i]);
+            while (fileNames.contains(fileName)) {
+                fileName = (i + 1) + fileName;
+            }
+            fileNames.add(fileName);
+            client.putObject(PUT_OBJECT_BUILDER.bucket(bucketName).key(definitionPath + fileName).build(),
+                    RequestBody.fromFile(Paths.get(fileSplits[i])));
+        }
+        LOGGER.info("Done creating bucket with data");
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
index f7fe18c..113ace3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
@@ -32,4 +32,4 @@ CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sa
 CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv"), ("header"="FALSE"));
 CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"));
 CREATE EXTERNAL DATASET ds5(t4) USING localfs(("path"="asterix_nc1://data/csv/sample_13.csv"), ("format"="csv"), ("header"="True"));
-CREATE EXTERNAL DATASET ds6(t4) USING localfs(("path"="asterix_nc1://data/csv/empty.csv"), ("format"="csv"), ("header"="false"));
\ No newline at end of file
+CREATE EXTERNAL DATASET ds6(t4) USING localfs(("path"="asterix_nc1://data/csv/empty_lines.csv"), ("format"="csv"), ("header"="false"));
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.000.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.000.s3bucket.sqlpp
index be17744..dc8b719 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.000.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
-
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+// create S3 bucket with data
+playground data_dir data/csv/empty.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.001.ddl.sqlpp
similarity index 64%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.001.ddl.sqlpp
index be17744..4830d68 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.001.ddl.sqlpp
@@ -16,18 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+DROP TYPE t1 IF EXISTS;
+CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
 
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATASET ds1 IF EXISTS;
+CREATE EXTERNAL DATASET ds1(t1) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="CSV"),
+("header"="true")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.002.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.002.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.002.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.003.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.003.s3bucket.sqlpp
index be17744..2b31301 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.003.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/csv/empty.csv,data/csv/empty.csv,data/csv/empty.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.005.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.005.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.005.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.006.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.006.s3bucket.sqlpp
index be17744..5b7650a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.006.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/csv/header/h_one_rec.csv,data/csv/header/h_only.csv,data/csv/header/h_only_with_ln.csv,data/csv/header/h_one_rec_with_ln.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.008.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.008.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.008.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.009.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.009.s3bucket.sqlpp
index be17744..d7f55e9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.009.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/csv/header/h_mul_rec.csv,data/csv/empty.csv,data/csv/header/h_only.csv,data/csv/header/h_mul_rec_with_ln.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.011.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.011.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.011.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.099.ddl.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.099.ddl.sqlpp
index be17744..36b2bab 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-header/query-dataset.099.ddl.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.000.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.000.s3bucket.sqlpp
index be17744..dc8b719 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.000.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
-
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+// create S3 bucket with data
+playground data_dir data/csv/empty.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.001.ddl.sqlpp
similarity index 64%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.001.ddl.sqlpp
index be17744..191ddff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.001.ddl.sqlpp
@@ -16,18 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+DROP TYPE t1 IF EXISTS;
+CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
 
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATASET ds1 IF EXISTS;
+CREATE EXTERNAL DATASET ds1(t1) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="CSV"),
+("header"="false")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.002.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.002.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.002.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.003.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.003.s3bucket.sqlpp
index be17744..2b31301 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.003.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/csv/empty.csv,data/csv/empty.csv,data/csv/empty.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.005.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.005.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.005.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.006.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.006.s3bucket.sqlpp
index be17744..d42d322 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.006.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/csv/no_header/no_h_one_rec.csv,data/csv/empty.csv,data/csv/empty_lines.csv,data/csv/no_header/no_h_one_rec_with_ln.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.008.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.008.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.008.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.009.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.009.s3bucket.sqlpp
index be17744..b9a9796 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.009.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/csv/no_header/no_h_mul_rec.csv,data/csv/empty.csv,data/csv/no_header/no_h_mul_rec_with_ln.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.011.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.011.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.011.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.099.ddl.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.099.ddl.sqlpp
index be17744..36b2bab 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-no-header/query-dataset.099.ddl.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.001.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.001.s3bucket.sqlpp
index be17744..132b862 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.001.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/csv/no_header/no_h_missing_fields.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
similarity index 64%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
index be17744..191ddff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
@@ -16,18 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+DROP TYPE t1 IF EXISTS;
+CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
 
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATASET ds1 IF EXISTS;
+CREATE EXTERNAL DATASET ds1(t1) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="CSV"),
+("header"="false")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.003.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.003.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.003.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.004.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.004.s3bucket.sqlpp
index be17744..6aa6fc8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.004.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/csv/no_header/no_h_no_closing_q.csv,data/csv/no_header/no_h_one_rec.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.006.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.006.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.006.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.099.ddl.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.099.ddl.sqlpp
index be17744..36b2bab 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.099.ddl.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.000.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.000.s3bucket.sqlpp
index be17744..929442e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.000.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
-
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+// create S3 bucket with data
+playground data_dir data/tsv/empty.tsv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.001.ddl.sqlpp
similarity index 64%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.001.ddl.sqlpp
index be17744..aab904a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.001.ddl.sqlpp
@@ -16,18 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+DROP TYPE t1 IF EXISTS;
+CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
 
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATASET ds1 IF EXISTS;
+CREATE EXTERNAL DATASET ds1(t1) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="tsv"),
+("header"="true")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.002.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.002.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.002.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.003.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.003.s3bucket.sqlpp
index be17744..dea81a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.003.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/tsv/empty.tsv,data/tsv/empty.tsv,data/tsv/empty.tsv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.005.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.005.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.005.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.006.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.006.s3bucket.sqlpp
index be17744..bbdba98 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.006.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/tsv/header/h_one_rec.tsv,data/tsv/header/h_only.tsv,data/tsv/header/h_only_with_ln.tsv,data/tsv/header/h_one_rec_with_ln.tsv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.008.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.008.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.008.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.009.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.009.s3bucket.sqlpp
index be17744..a2a20d2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.009.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/tsv/header/h_mul_rec.tsv,data/tsv/empty.tsv,data/tsv/header/h_only.tsv,data/tsv/header/h_mul_rec_with_ln.tsv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.011.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.011.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.011.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.099.ddl.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.099.ddl.sqlpp
index be17744..0ff713d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-header/query-dataset.099.ddl.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATASET test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.000.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.000.s3bucket.sqlpp
index be17744..929442e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.000.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
-
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+// create S3 bucket with data
+playground data_dir data/tsv/empty.tsv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.001.ddl.sqlpp
similarity index 64%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.001.ddl.sqlpp
index be17744..135c9fd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.001.ddl.sqlpp
@@ -16,18 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+DROP TYPE t1 IF EXISTS;
+CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
 
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATASET ds1 IF EXISTS;
+CREATE EXTERNAL DATASET ds1(t1) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="tsv"),
+("header"="false")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.002.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.002.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.002.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.003.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.003.s3bucket.sqlpp
index be17744..dea81a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.003.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/tsv/empty.tsv,data/tsv/empty.tsv,data/tsv/empty.tsv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.005.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.005.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.005.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.006.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.006.s3bucket.sqlpp
index be17744..2a7fa79 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.006.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/tsv/no_header/no_h_one_rec.tsv,data/tsv/empty.tsv,data/tsv/empty_lines.tsv,data/tsv/no_header/no_h_one_rec_with_ln.tsv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.008.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.008.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.008.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.009.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.009.s3bucket.sqlpp
index be17744..ce3de3c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.009.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/tsv/no_header/no_h_mul_rec.tsv,data/tsv/empty.tsv,data/tsv/no_header/no_h_mul_rec_with_ln.tsv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.011.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.011.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.011.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.099.ddl.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.099.ddl.sqlpp
index be17744..36b2bab 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-no-header/query-dataset.099.ddl.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.001.s3bucket.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.001.s3bucket.sqlpp
index be17744..2bd6d52 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.001.s3bucket.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+playground data_dir data/tsv/no_header/no_h_missing_fields.tsv
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.002.ddl.sqlpp
similarity index 64%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.002.ddl.sqlpp
index be17744..135c9fd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.002.ddl.sqlpp
@@ -16,18 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+DROP TYPE t1 IF EXISTS;
+CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
 
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATASET ds1 IF EXISTS;
+CREATE EXTERNAL DATASET ds1(t1) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="tsv"),
+("header"="false")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.003.query.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.003.query.sqlpp
index be17744..26ccfa7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.003.query.sqlpp
@@ -16,18 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+FROM ds1 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.099.ddl.sqlpp
similarity index 68%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.099.ddl.sqlpp
index be17744..36b2bab 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/tsv-warnings/query-dataset.099.ddl.sqlpp
@@ -16,18 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
 
-import java.io.DataOutput;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IRecordDataParser<T> extends IDataParser {
-
-    /**
-     * @param record
-     * @param out
-     * @throws Exception
-     */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-}
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.001.adm
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.001.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.002.adm
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.002.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.003.adm
new file mode 100644
index 0000000..dc4d9c9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.003.adm
@@ -0,0 +1,2 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.004.adm
new file mode 100644
index 0000000..1aae8c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-header/external_dataset.004.adm
@@ -0,0 +1,6 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
+{ "f1": 4, "f2": 5, "f3": 6, "f4": "rts" }
+{ "f1": 4, "f2": 5, "f3": 6, "f4": "rts" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "srt" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "srt" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.001.adm
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.001.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.002.adm
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.002.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.003.adm
new file mode 100644
index 0000000..dc4d9c9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.003.adm
@@ -0,0 +1,2 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.004.adm
new file mode 100644
index 0000000..1aae8c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-no-header/external_dataset.004.adm
@@ -0,0 +1,6 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
+{ "f1": 4, "f2": 5, "f3": 6, "f4": "rts" }
+{ "f1": 4, "f2": 5, "f3": 6, "f4": "rts" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "srt" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "srt" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.001.adm
new file mode 100644
index 0000000..245b406
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.001.adm
@@ -0,0 +1,2 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "srt" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.002.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.002.adm
new file mode 100644
index 0000000..7d3c940
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.002.adm
@@ -0,0 +1 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "str" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.001.adm
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.001.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.002.adm
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.002.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.003.adm
new file mode 100644
index 0000000..eb83800
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.003.adm
@@ -0,0 +1,2 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "\"str\"" }
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "\"str\"" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.004.adm
new file mode 100644
index 0000000..8d60432
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-header/external_dataset.004.adm
@@ -0,0 +1,6 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "\"str\"" }
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "\"str\"" }
+{ "f1": 4, "f2": 5, "f3": 6, "f4": "\"rts\"" }
+{ "f1": 4, "f2": 5, "f3": 6, "f4": "\"rts\"" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "\"srt\"" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "\"srt\"" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.001.adm
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.001.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.002.adm
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
copy to asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.002.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.003.adm
new file mode 100644
index 0000000..eb83800
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.003.adm
@@ -0,0 +1,2 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "\"str\"" }
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "\"str\"" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.004.adm
new file mode 100644
index 0000000..8d60432
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-no-header/external_dataset.004.adm
@@ -0,0 +1,6 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "\"str\"" }
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "\"str\"" }
+{ "f1": 4, "f2": 5, "f3": 6, "f4": "\"rts\"" }
+{ "f1": 4, "f2": 5, "f3": 6, "f4": "\"rts\"" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "\"srt\"" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "\"srt\"" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-warnings/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-warnings/external_dataset.001.adm
new file mode 100644
index 0000000..ac2354d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/tsv-warnings/external_dataset.001.adm
@@ -0,0 +1,2 @@
+{ "f1": 1, "f2": 2, "f3": 3, "f4": "\"str\"" }
+{ "f1": 7, "f2": 8, "f3": 9, "f4": "\"srt\"" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_06/csv_06.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_06/csv_06.1.adm
index e69de29..1cbcc6c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_06/csv_06.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_06/csv_06.1.adm
@@ -0,0 +1,3 @@
+{ "id": 2, "float": 0.2, "stringa": "test2a", "stringb": "test2b" }
+{ "id": 3, "float": 0.3, "stringa": "test,3a,3a,3a", "stringb": "\"\"test\"\"" }
+{ "id": 4, "float": 0.4, "stringa": "test\"4a\",4a", "stringb": " test with\nline break " }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
index e69de29..1cbcc6c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/load/csv_07/csv_07.1.adm
@@ -0,0 +1,3 @@
+{ "id": 2, "float": 0.2, "stringa": "test2a", "stringb": "test2b" }
+{ "id": 3, "float": 0.3, "stringa": "test,3a,3a,3a", "stringb": "\"\"test\"\"" }
+{ "id": 4, "float": 0.4, "stringa": "test\"4a\",4a", "stringb": " test with\nline break " }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index f16b187..cfb2e5d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -1679,19 +1679,16 @@
     <test-case FilePath="load">
       <compilation-unit name="csv_05"><!-- Someone should check and verify -->
         <output-dir compare="Text">csv_05</output-dir>
-        <expected-error>At record: 1</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
       <compilation-unit name="csv_06"><!-- Someone should check and verify -->
         <output-dir compare="Text">csv_06</output-dir>
-        <expected-error>At record: 1</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
       <compilation-unit name="csv_07"><!-- Someone should check and verify -->
         <output-dir compare="Text">csv_07</output-dir>
-        <expected-error>At record: 1</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml
new file mode 100644
index 0000000..876a8ef
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
+  <test-group name="external-dataset">
+    <test-case FilePath="external-dataset"  check-warnings="true">
+      <compilation-unit name="aws/s3/csv-header">
+        <output-dir compare="Text">aws/s3/csv-header</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="aws/s3/csv-no-header">
+        <output-dir compare="Text">aws/s3/csv-no-header</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="aws/s3/tsv-header">
+        <output-dir compare="Text">aws/s3/tsv-header</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="aws/s3/tsv-no-header">
+        <output-dir compare="Text">aws/s3/tsv-no-header</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="aws/s3/csv-warnings">
+        <output-dir compare="Text">aws/s3/csv-warnings</output-dir>
+        <expected-warn>Parsing error in data_dir/no_h_missing_fields.csv at record 2 field 3: some fields are missing</expected-warn>
+        <expected-warn>Parsing error in data_dir/no_h_no_closing_q.csv at record 0 field 0: malformed input record ended inside quote</expected-warn>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset" check-warnings="true">
+      <compilation-unit name="aws/s3/tsv-warnings">
+        <output-dir compare="Text">aws/s3/tsv-warnings</output-dir>
+        <expected-warn>Parsing error in data_dir/no_h_missing_fields.tsv at record 2 field 3: some fields are missing</expected-warn>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index cf4a7a3..bed1a09 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -11260,22 +11260,16 @@
     <test-case FilePath="load">
       <compilation-unit name="csv_05">
         <output-dir compare="Text">csv_05</output-dir>
-        <expected-error>At record: 1, field#: 4 - a quote enclosing a field needs to be placed in the beginning of that field</expected-error>
-        <source-location>false</source-location>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
       <compilation-unit name="csv_06">
         <output-dir compare="Text">csv_06</output-dir>
-        <expected-error>At record: 1, field#: 3 - a quote enclosing a field needs to be placed in the beginning of that field</expected-error>
-        <source-location>false</source-location>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
       <compilation-unit name="csv_07">
         <output-dir compare="Text">csv_07</output-dir>
-        <expected-error>At record: 1, field#: 3 -  A quote enclosing a field needs to be followed by the delimiter</expected-error>
-        <source-location>false</source-location>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 14f7c83..cfe4646 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -308,7 +308,6 @@ public class ErrorCode {
     public static final int INPUT_DECODE_FAILURE = 3116;
     public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
     public static final int PARAMETERS_REQUIRED = 3118;
-    public static final int MALFORMED_RECORD = 3119;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 98ae1a7..666b190 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -306,7 +306,6 @@
 3116 = Failed to decode input
 3117 = Failed to parse record, malformed log record
 3118 = Parameter(s) %1$s must be specified
-3119 = Record number %1$s is malformed
 
 # Lifecycle management errors
 4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index a4c2fae..4dfcbb5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@ -47,4 +47,8 @@ public abstract class AsterixInputStream extends InputStream {
     public void setNotificationHandler(IStreamNotificationHandler notificationHandler) {
         this.notificationHandler = notificationHandler;
     }
+
+    public String getStreamName() {
+        return "";
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
index be17744..9c9ec1c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
@@ -19,15 +19,32 @@
 package org.apache.asterix.external.api;
 
 import java.io.DataOutput;
+import java.util.function.Supplier;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IRecordDataParser<T> extends IDataParser {
 
     /**
-     * @param record
-     * @param out
-     * @throws Exception
+     * Parses the input record and writes the result into the {@code out}. Implementations should only write to the
+     * {@code out} if the record is parsed successfully. If parsing fails, the {@code out} should never be touched. In
+     * other words, no partial writing in case of failure. Additionally, implementations may choose to issue a
+     * warning and/or throw an exception in case of failure.
+     *
+     * @param record input record to parse
+     * @param out output where the parsed record is written into
+     *
+     * @return true if the record was parsed successfully and written to out. False, otherwise.
+     * @throws HyracksDataException HyracksDataException
      */
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
+    public boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
+
+    /**
+     * Sets the data source name supplier that this parser is receiving records from. The data source name could be
+     * used for reporting, for example.
+     *
+     * @param dataSourceName data source name supplier
+     */
+    default void setDataSourceName(Supplier<String> dataSourceName) {
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 9033814..95e83f2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -20,8 +20,10 @@ package org.apache.asterix.external.api;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.function.Supplier;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -84,4 +86,8 @@ public interface IRecordReader<T> extends Closeable {
     default String getStats() {
         return null;
     }
+
+    default Supplier<String> getDataSourceName() {
+        return ExternalDataConstants.EMPTY_STRING;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index f392139..7a089b8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -177,14 +177,19 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
     }
 
     private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException {
+        boolean success;
         try {
-            dataParser.parse(record, tb.getDataOutput());
+            success = dataParser.parse(record, tb.getDataOutput());
         } catch (Exception e) {
             LOGGER.log(Level.WARN, ExternalDataConstants.ERROR_PARSE_RECORD, e);
             feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
             // continue the outer loop
             return false;
         }
+        if (!success) {
+            // continue the outer loop
+            return false;
+        }
         tb.addFieldEndOffset();
         addMetaPart(tb, record);
         addPrimaryKeys(tb, record);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index c028965..34379e9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -49,10 +49,11 @@ public class RecordDataFlowController<T> extends AbstractDataFlowController {
             while (recordReader.hasNext()) {
                 IRawRecord<? extends T> record = recordReader.next();
                 tb.reset();
-                dataParser.parse(record, tb.getDataOutput());
-                tb.addFieldEndOffset();
-                appendOtherTupleFields(tb);
-                tupleForwarder.addTuple(tb);
+                if (dataParser.parse(record, tb.getDataOutput())) {
+                    tb.addFieldEndOffset();
+                    appendOtherTupleFields(tb);
+                    tupleForwarder.addTuple(tb);
+                }
             }
             tupleForwarder.complete();
         } catch (Exception e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index 82e9bba..5700f9d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -127,9 +127,10 @@ public final class LookupAdapter<T> implements IFrameWriter {
                     propagateInputFields(tupleIndex);
                 }
                 if (record != null) {
-                    dataParser.parse(record, tb.getDataOutput());
-                    tb.addFieldEndOffset();
-                    DataflowUtils.addTupleToFrame(appender, tb, writer);
+                    if (dataParser.parse(record, tb.getDataOutput())) {
+                        tb.addFieldEndOffset();
+                        DataflowUtils.addTupleToFrame(appender, tb, writer);
+                    }
                 } else if (retainNull) {
                     tb.getDataOutput().write(missingTupleBuild.getByteArray());
                     tb.addFieldEndOffset();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
index 5c8f219..78240a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
@@ -20,12 +20,15 @@ package org.apache.asterix.external.input.record.converter;
 
 import java.io.IOException;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.input.record.CharArrayRecord;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
 
 public class CSVToRecordWithMetadataAndPKConverter
@@ -37,8 +40,10 @@ public class CSVToRecordWithMetadataAndPKConverter
     private final CharArrayRecord record;
 
     public CSVToRecordWithMetadataAndPKConverter(final int valueIndex, final char delimiter, final ARecordType metaType,
-            final ARecordType recordType, final int[] keyIndicator, final int[] keyIndexes, final IAType[] keyTypes) {
-        this.cursor = new FieldCursorForDelimitedDataParser(null, delimiter, ExternalDataConstants.QUOTE);
+            final ARecordType recordType, final int[] keyIndicator, final int[] keyIndexes, final IAType[] keyTypes,
+            IWarningCollector warningCollector) {
+        this.cursor = new FieldCursorForDelimitedDataParser(null, delimiter, ExternalDataConstants.QUOTE,
+                warningCollector, ExternalDataConstants.EMPTY_STRING);
         this.record = new CharArrayRecord();
         this.valueIndex = valueIndex;
         this.recordWithMetadata = new RecordWithMetadataAndPK<>(record, metaType.getFieldTypes(), recordType,
@@ -52,7 +57,8 @@ public class CSVToRecordWithMetadataAndPKConverter
         cursor.nextRecord(input.get(), input.size());
         int i = 0;
         int j = 0;
-        while (cursor.nextField()) {
+        FieldCursorForDelimitedDataParser.Result lastResult;
+        while ((lastResult = cursor.nextField()) == FieldCursorForDelimitedDataParser.Result.OK) {
             if (cursor.fieldHasDoubleQuote()) {
                 cursor.eliminateDoubleQuote();
             }
@@ -66,6 +72,9 @@ public class CSVToRecordWithMetadataAndPKConverter
             }
             i++;
         }
+        if (lastResult == FieldCursorForDelimitedDataParser.Result.ERROR) {
+            throw new RuntimeDataException(ErrorCode.FAILED_TO_PARSE_RECORD);
+        }
         return recordWithMetadata;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
index ee16228..a9e5bc7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
@@ -27,6 +27,7 @@ import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class CSVWithRecordConverterFactory implements IRecordConverterFactory<char[], RecordWithMetadataAndPK<char[]>> {
 
@@ -40,9 +41,9 @@ public class CSVWithRecordConverterFactory implements IRecordConverterFactory<ch
     private IAType[] keyTypes;
 
     @Override
-    public IRecordConverter<char[], RecordWithMetadataAndPK<char[]>> createConverter() {
+    public IRecordConverter<char[], RecordWithMetadataAndPK<char[]>> createConverter(IHyracksTaskContext ctx) {
         return new CSVToRecordWithMetadataAndPKConverter(recordIndex, delimiter, metaType, recordType, keyIndicators,
-                keyIndexes, keyTypes);
+                keyIndexes, keyTypes, ctx.getWarningCollector());
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
index dc93533..b228b94 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
@@ -24,6 +24,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IRecordConverter;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 import com.couchbase.client.core.message.dcp.DCPRequest;
 
@@ -50,7 +51,7 @@ public class DCPConverterFactory implements IRecordConverterFactory<DCPRequest,
     }
 
     @Override
-    public IRecordConverter<DCPRequest, RecordWithMetadataAndPK<char[]>> createConverter() {
+    public IRecordConverter<DCPRequest, RecordWithMetadataAndPK<char[]>> createConverter(IHyracksTaskContext ctx) {
         return new DCPMessageToRecordConverter();
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
index 4990527..875a331 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
@@ -24,10 +24,11 @@ import java.util.Map;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IRecordConverter;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface IRecordConverterFactory<I, O> extends Serializable {
 
-    public IRecordConverter<I, O> createConverter();
+    public IRecordConverter<I, O> createConverter(IHyracksTaskContext ctx);
 
     public void configure(Map<String, String> configuration) throws AsterixException;
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index cfa1f6a..78b0797 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -21,13 +21,11 @@ package org.apache.asterix.external.input.record.reader.aws;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
 import org.apache.hyracks.api.util.CleanupUtils;
 
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -37,7 +35,7 @@ import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 
-public class AwsS3InputStream extends AsterixInputStream {
+public class AwsS3InputStream extends AbstractMultipleInputStream {
 
     // Configuration
     private final Map<String, String> configuration;
@@ -48,67 +46,39 @@ public class AwsS3InputStream extends AsterixInputStream {
     private final List<String> filePaths;
     private int nextFileIndex = 0;
 
-    // File reading fields
-    private InputStream inputStream;
-
     public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) {
         this.configuration = configuration;
         this.filePaths = filePaths;
-
         this.s3Client = buildAwsS3Client(configuration);
     }
 
     @Override
-    public int read() throws IOException {
-        throw new HyracksDataException(
-                "read() is not supported with this stream. use read(byte[] b, int off, int len)");
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        if (inputStream == null) {
-            if (!advance()) {
-                return -1;
-            }
-        }
-
-        int result = inputStream.read(b, off, len);
-
-        // If file reading is done, go to the next file, or finish up if no files are left
-        if (result < 0) {
-            if (advance()) {
-                result = inputStream.read(b, off, len);
-            } else {
-                return -1;
-            }
-        }
-
-        return result;
-    }
-
-    private boolean advance() throws IOException {
+    protected boolean advance() throws IOException {
         // No files to read for this partition
         if (filePaths == null || filePaths.isEmpty()) {
             return false;
         }
 
         // Finished reading all the files
-        if (nextFileIndex == filePaths.size()) {
-            if (inputStream != null) {
-                inputStream.close();
+        if (nextFileIndex >= filePaths.size()) {
+            if (in != null) {
+                in.close();
             }
             return false;
         }
 
         // Close the current stream before going to the next one
-        if (inputStream != null) {
-            inputStream.close();
+        if (in != null) {
+            in.close();
         }
 
         String bucket = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME);
         GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
         GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
-        inputStream = s3Client.getObject(getObjectRequest);
+        in = s3Client.getObject(getObjectRequest);
+        if (notificationHandler != null) {
+            notificationHandler.notifyNewSource();
+        }
 
         // Current file ready, point to the next file
         nextFileIndex++;
@@ -127,11 +97,17 @@ public class AwsS3InputStream extends AsterixInputStream {
 
     @Override
     public void close() throws IOException {
-        if (inputStream != null) {
-            CleanupUtils.close(inputStream, null);
+        if (in != null) {
+            CleanupUtils.close(in, null);
         }
     }
 
+    @Override
+    public String getStreamName() {
+        int currentFileIndex = nextFileIndex - 1;
+        return currentFileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(currentFileIndex);
+    }
+
     /**
      * Prepares and builds the Amazon S3 client with the provided configuration
      *
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index a27397e..0d16e0c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -32,11 +32,11 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class LineRecordReader extends StreamRecordReader {
 
-    private boolean hasHeader;
+    protected boolean hasHeader;
     protected boolean prevCharCR;
     protected int newlineLength;
     protected int recordNumber = 0;
-    protected boolean nextIsHeader = false;
+    protected boolean newSource = false;
     private static final List<String> recordReaderFormats =
             Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT,
                     ExternalDataConstants.FORMAT_CSV, ExternalDataConstants.FORMAT_TSV));
@@ -47,17 +47,22 @@ public class LineRecordReader extends StreamRecordReader {
             throws HyracksDataException {
         super.configure(inputStream);
         this.hasHeader = ExternalDataUtils.hasHeader(config);
-        if (hasHeader) {
-            // TODO(ali): revisit this and notifyNewSource
-            inputStream.setNotificationHandler(this);
-        }
+        this.newSource = true;
+        inputStream.setNotificationHandler(this);
     }
 
     @Override
     public void notifyNewSource() {
-        if (hasHeader) {
-            nextIsHeader = true;
-        }
+        resetForNewSource();
+    }
+
+    @Override
+    public void resetForNewSource() {
+        super.resetForNewSource();
+        newSource = true;
+        recordNumber = 0;
+        prevCharCR = false;
+        newlineLength = 0;
     }
 
     @Override
@@ -108,11 +113,7 @@ public class LineRecordReader extends StreamRecordReader {
                             return false; //EOF
                         }
                         record.endRecord();
-                        if (record.isEmptyRecord()) {
-                            return false;
-                        }
-                        recordNumber++;
-                        return true;
+                        break;
                     }
                 }
                 for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
@@ -128,10 +129,6 @@ public class LineRecordReader extends StreamRecordReader {
                     prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
                 }
                 readLength = bufferPosn - startPosn;
-                if (prevCharCR && newlineLength == 0) {
-                    --readLength; //CR at the end of the buffer
-                    prevCharCR = false;
-                }
                 if (readLength > 0) {
                     record.append(inputBuffer, startPosn, readLength);
                 }
@@ -139,8 +136,8 @@ public class LineRecordReader extends StreamRecordReader {
             if (record.isEmptyRecord()) {
                 continue;
             }
-            if (nextIsHeader) {
-                nextIsHeader = false;
+            if (newSource && hasHeader) {
+                newSource = false;
                 continue;
             }
             recordNumber++;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 564df4b..0ed1238 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -18,28 +18,30 @@
  */
 package org.apache.asterix.external.input.record.reader.stream;
 
+import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_IN_Q;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.WarningUtil;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.ParseUtil;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class QuotedLineRecordReader extends LineRecordReader {
 
     private char quote;
     private char quoteEscape;
-    private IWarningCollector warningCollector;
-    private final SourceLocation srcLoc = new SourceLocation(-1, -1);
+    private boolean prevCharEscape;
+    private int readLength;
+    private boolean inQuote;
+    private IWarningCollector warnings;
     private static final List<String> recordReaderFormats = Collections.unmodifiableList(
             Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV));
     private static final String REQUIRED_CONFIGS = ExternalDataConstants.KEY_QUOTE;
@@ -48,7 +50,7 @@ public class QuotedLineRecordReader extends LineRecordReader {
     public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
             throws HyracksDataException {
         super.configure(ctx, inputStream, config);
-        this.warningCollector = ctx.getWarningCollector();
+        this.warnings = ctx.getWarningCollector();
         String quoteString = config.get(ExternalDataConstants.KEY_QUOTE);
         ExternalDataUtils.validateQuote(quoteString);
         this.quote = quoteString.charAt(0);
@@ -56,6 +58,23 @@ public class QuotedLineRecordReader extends LineRecordReader {
     }
 
     @Override
+    public void notifyNewSource() {
+        if (!record.isEmptyRecord() && warnings.shouldWarn()) {
+            ParseUtil.warn(warnings, reader.getStreamName(), recordNumber, 0, REC_ENDED_IN_Q);
+        }
+        // restart for a new record from a new source
+        resetForNewSource();
+    }
+
+    @Override
+    public void resetForNewSource() {
+        super.resetForNewSource();
+        prevCharEscape = false;
+        readLength = 0;
+        inQuote = false;
+    }
+
+    @Override
     public List<String> getRecordReaderFormats() {
         return recordReaderFormats;
     }
@@ -73,10 +92,10 @@ public class QuotedLineRecordReader extends LineRecordReader {
             }
             newlineLength = 0;
             prevCharCR = false;
-            boolean prevCharEscape = false;
+            prevCharEscape = false;
             record.reset();
-            int readLength = 0;
-            boolean inQuote = false;
+            readLength = 0;
+            inQuote = false;
             do {
                 int startPosn = bufferPosn;
                 if (bufferPosn >= bufferLength) {
@@ -86,19 +105,14 @@ public class QuotedLineRecordReader extends LineRecordReader {
                         // reached end of stream
                         if (readLength <= 0 || inQuote) {
                             // haven't read anything previously OR have read and in the middle and hit the end
-                            if (inQuote && warningCollector.shouldWarn()) {
-                                warningCollector
-                                        .warn(WarningUtil.forAsterix(srcLoc, ErrorCode.MALFORMED_RECORD, recordNumber));
+                            if (inQuote && warnings.shouldWarn()) {
+                                ParseUtil.warn(warnings, reader.getStreamName(), recordNumber, 0, REC_ENDED_IN_Q);
                             }
                             close();
                             return false;
                         }
                         record.endRecord();
-                        if (record.isEmptyRecord()) {
-                            return false;
-                        }
-                        recordNumber++;
-                        return true;
+                        break;
                     }
                 }
                 boolean maybeInQuote = false;
@@ -119,10 +133,8 @@ public class QuotedLineRecordReader extends LineRecordReader {
                             break;
                         }
                         prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
-                        if (inputBuffer[bufferPosn] == quote && !prevCharEscape) {
-                            // this is an opening quote
-                            inQuote = true;
-                        }
+                        // if this is an opening quote, mark it
+                        inQuote = inputBuffer[bufferPosn] == quote && !prevCharEscape;
                         // the quoteEscape != quote is for making an opening quote not an escape
                         prevCharEscape =
                                 inputBuffer[bufferPosn] == quoteEscape && !prevCharEscape && quoteEscape != quote;
@@ -138,9 +150,6 @@ public class QuotedLineRecordReader extends LineRecordReader {
                     }
                 }
                 readLength = bufferPosn - startPosn;
-                if (prevCharCR && newlineLength == 0) {
-                    --readLength;
-                }
                 if (readLength > 0) {
                     record.append(inputBuffer, startPosn, readLength);
                 }
@@ -148,8 +157,8 @@ public class QuotedLineRecordReader extends LineRecordReader {
             if (record.isEmptyRecord()) {
                 continue;
             }
-            if (nextIsHeader) {
-                nextIsHeader = false;
+            if (newSource && hasHeader) {
+                newSource = false;
                 continue;
             }
             recordNumber++;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 4aed741..a70f1fe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.input.record.reader.stream;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IRawRecord;
@@ -56,10 +57,13 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre
 
     @Override
     public void close() throws IOException {
-        if (!done) {
-            reader.close();
+        try {
+            if (!done) {
+                reader.close();
+            }
+        } finally {
+            done = true;
         }
-        done = true;
     }
 
     @Override
@@ -97,6 +101,15 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre
         throw new UnsupportedOperationException();
     }
 
+    protected void resetForNewSource() {
+        record.reset();
+    }
+
+    @Override
+    public Supplier<String> getDataSourceName() {
+        return reader::getStreamName;
+    }
+
     public abstract List<String> getRecordReaderFormats();
 
     public abstract String getRequiredConfigs();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java
new file mode 100644
index 0000000..8f032d8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IStreamNotificationHandler;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Base class for a source stream that is composed of multiple separate input streams. Reading proceeds one stream at
+ * a time.
+ */
+public abstract class AbstractMultipleInputStream extends AsterixInputStream {
+
+    protected InputStream in;
+    private byte lastByte;
+
+    protected AbstractMultipleInputStream() {
+    }
+
+    /**
+     * Closes the current input stream and opens the next one, if any. Implementations should call
+     * {@link IStreamNotificationHandler#notifyNewSource()} using {@link #notificationHandler} if there exists a
+     * notification handler and the handler needs to know when a new input stream has started. Obviously, this method
+     * should populate the {@link #in} upon successfully opening the stream.
+     */
+    protected abstract boolean advance() throws IOException;
+
+    @Override
+    public int read() throws IOException {
+        throw new HyracksDataException(
+                "read() is not supported with this stream. use read(byte[] b, int off, int len)");
+    }
+
+    @Override
+    public final int read(byte[] b, int off, int len) throws IOException {
+        if (in == null) {
+            if (!advance()) {
+                return -1;
+            }
+        }
+        int result = in.read(b, off, len);
+        if (result < 0 && (lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_CR)) {
+            // return a new line at the end of every file <--Might create problems for some cases
+            // depending on the parser implementation-->
+            lastByte = ExternalDataConstants.BYTE_LF;
+            b[off] = ExternalDataConstants.BYTE_LF;
+            return 1;
+        }
+        while ((result < 0) && advance()) {
+            result = in.read(b, off, len);
+        }
+        if (result > 0) {
+            lastByte = b[(off + result) - 1];
+        }
+        return result;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index d35ad26..f5f68fe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@ -127,4 +127,8 @@ public class AsterixInputStreamReader extends Reader {
     public void reset() throws IOException {
         byteBuffer.limit(0);
     }
+
+    public String getStreamName() {
+        return in.getStreamName();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 2207bd7..3d23b24 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -24,9 +24,7 @@ import java.io.IOException;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,12 +32,10 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-public class LocalFSInputStream extends AsterixInputStream {
+public class LocalFSInputStream extends AbstractMultipleInputStream {
 
     private static final Logger LOGGER = LogManager.getLogger();
     private final FileSystemWatcher watcher;
-    private FileInputStream in;
-    private byte lastByte;
     private File currentFile;
 
     public LocalFSInputStream(FileSystemWatcher watcher) {
@@ -92,10 +88,8 @@ public class LocalFSInputStream extends AsterixInputStream {
         }
     }
 
-    /**
-     * Closes the current input stream and opens the next one, if any.
-     */
-    private boolean advance() throws IOException {
+    @Override
+    protected boolean advance() throws IOException {
         closeFile();
         currentFile = watcher.poll();
         if (currentFile == null) {
@@ -115,37 +109,6 @@ public class LocalFSInputStream extends AsterixInputStream {
     }
 
     @Override
-    public int read() throws IOException {
-        throw new HyracksDataException(
-                "read() is not supported with this stream. use read(byte[] b, int off, int len)");
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        if (in == null) {
-            if (!advance()) {
-                return -1;
-            }
-        }
-        int result = in.read(b, off, len);
-        while ((result < 0) && advance()) {
-            // return a new line at the end of every file <--Might create problems for some cases
-            // depending on the parser implementation-->
-            if ((lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_LF)) {
-                lastByte = ExternalDataConstants.BYTE_LF;
-                b[off] = ExternalDataConstants.BYTE_LF;
-                return 1;
-            }
-            // recursive call
-            result = in.read(b, off, len);
-        }
-        if (result > 0) {
-            lastByte = b[(off + result) - 1];
-        }
-        return result;
-    }
-
-    @Override
     public boolean stop() throws Exception {
         closeFile();
         watcher.close();
@@ -178,4 +141,9 @@ public class LocalFSInputStream extends AsterixInputStream {
         LOGGER.log(Level.WARN, "Failed to recover from failure", th);
         return false;
     }
+
+    @Override
+    public String getStreamName() {
+        return currentFile == null ? "" : currentFile.getPath();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 42caf12..4a46717 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -74,8 +74,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
 
     private final TokenImage tmpTokenImage = new TokenImage();
 
-    private final String mismatchErrorMessage = "Mismatch Type, expecting a value of type ";
-    private final String mismatchErrorMessage2 = " got a value of type ";
+    private static final String mismatchErrorMessage = "Mismatch Type, expecting a value of type ";
+    private static final String mismatchErrorMessage2 = " got a value of type ";
 
     public ADMDataParser(ARecordType recordType, boolean isStream) {
         this(null, recordType, isStream);
@@ -103,11 +103,11 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
     }
 
     @Override
-    public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
+    public boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
         try {
             resetPools();
             admLexer.setBuffer(record.get());
-            parseAdmInstance(recordType, out);
+            return parseAdmInstance(recordType, out);
         } catch (ParseException e) {
             e.setLocation(filename, admLexer.getLine(), admLexer.getColumn());
             throw e;
@@ -121,7 +121,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
         admLexer = new AdmLexer(new java.io.InputStreamReader(in));
     }
 
-    protected boolean parseAdmInstance(IAType objectType, DataOutput out) throws IOException {
+    private boolean parseAdmInstance(IAType objectType, DataOutput out) throws IOException {
         int token = admLexer.next();
         if (token == AdmLexer.TOKEN_EOF) {
             return false;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 505acbd..86b95e1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -18,10 +18,13 @@
  */
 package org.apache.asterix.external.parser;
 
+import static org.apache.asterix.external.util.ExternalDataConstants.MISSING_FIELDS;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.function.Supplier;
 
 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.RecordBuilder;
@@ -31,12 +34,15 @@ import org.apache.asterix.external.api.IDataParser;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ParseUtil;
 import org.apache.asterix.om.base.AMutableString;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -44,7 +50,7 @@ import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
 
 public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
 
-    private final IHyracksTaskContext ctx;
+    private final IWarningCollector warnings;
     private final char fieldDelimiter;
     private final char quote;
     private final boolean hasHeader;
@@ -54,13 +60,15 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
     private final DataOutput fieldValueBufferOutput;
     private final IValueParser[] valueParsers;
     private FieldCursorForDelimitedDataParser cursor;
+    private Supplier<String> dataSourceName;
     private final byte[] fieldTypeTags;
     private final int[] fldIds;
     private final ArrayBackedValueStorage[] nameBuffers;
 
     public DelimitedDataParser(IHyracksTaskContext ctx, IValueParserFactory[] valueParserFactories, char fieldDelimiter,
             char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
-        this.ctx = ctx;
+        this.dataSourceName = ExternalDataConstants.EMPTY_STRING;
+        this.warnings = ctx.getWarningCollector();
         this.fieldDelimiter = fieldDelimiter;
         this.quote = quote;
         this.hasHeader = hasHeader;
@@ -100,7 +108,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
             }
         }
         if (!isStreamParser) {
-            cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote);
+            cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote, warnings, dataSourceName);
         }
     }
 
@@ -108,9 +116,13 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
     public boolean parse(DataOutput out) throws HyracksDataException {
         try {
             while (cursor.nextRecord()) {
-                parseRecord();
-                recBuilder.write(out, true);
-                return true;
+                if (parseRecord()) {
+                    recBuilder.write(out, true);
+                    return true;
+                } else {
+                    // keeping the behaviour of throwing exception for stream parsers
+                    throw new RuntimeDataException(ErrorCode.FAILED_TO_PARSE_RECORD);
+                }
             }
             return false;
         } catch (IOException e) {
@@ -118,21 +130,29 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
         }
     }
 
-    private void parseRecord() throws HyracksDataException {
+    private boolean parseRecord() throws HyracksDataException {
         recBuilder.reset(recordType);
         recBuilder.init();
 
         for (int i = 0; i < valueParsers.length; ++i) {
             try {
-                if (!cursor.nextField()) {
-                    break;
+                FieldCursorForDelimitedDataParser.Result result = cursor.nextField();
+                switch (result) {
+                    case OK:
+                        break;
+                    case END:
+                        if (warnings.shouldWarn()) {
+                            ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(),
+                                    cursor.getFieldCount(), MISSING_FIELDS);
+                        }
+                        return false;
+                    case ERROR:
+                        return false;
+                    default:
+                        throw new IllegalStateException();
                 }
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
-            }
-            fieldValueBuffer.reset();
+                fieldValueBuffer.reset();
 
-            try {
                 if (cursor.isFieldEmpty() && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
                         && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.NULL) {
                     // if the field is empty and the type is optional, insert
@@ -161,32 +181,46 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
                 throw HyracksDataException.create(e);
             }
         }
-        if (valueParsers.length != cursor.getFieldCount()) {
-            throw new HyracksDataException("Record #" + cursor.getRecordCount() + " is missing some fields");
-        }
+        return true;
     }
 
     @Override
-    public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
+    public boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
         cursor.nextRecord(record.get(), record.size());
-        parseRecord();
-        recBuilder.write(out, true);
+        if (parseRecord()) {
+            recBuilder.write(out, true);
+            return true;
+        }
+        return false;
     }
 
     @Override
     public void setInputStream(InputStream in) throws IOException {
-        cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
-        if (in != null && hasHeader) {
+        // TODO(ali): revisit this in regards to stream
+        cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote, warnings,
+                dataSourceName);
+        if (hasHeader) {
             cursor.nextRecord();
-            while (cursor.nextField()) {
-                ;
+            FieldCursorForDelimitedDataParser.Result result;
+            do {
+                result = cursor.nextField();
+            } while (result == FieldCursorForDelimitedDataParser.Result.OK);
+            if (result == FieldCursorForDelimitedDataParser.Result.ERROR) {
+                throw new RuntimeDataException(ErrorCode.FAILED_TO_PARSE_RECORD);
             }
         }
     }
 
     @Override
     public boolean reset(InputStream in) throws IOException {
-        cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
+        // TODO(ali): revisit this in regards to stream
+        cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote, warnings,
+                dataSourceName);
         return true;
     }
+
+    @Override
+    public void setDataSourceName(Supplier<String> dataSourceName) {
+        this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java
index 1a88d08..9ab6c7d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java
@@ -110,7 +110,7 @@ public class HiveRecordParser implements IRecordDataParser<Writable> {
     }
 
     @Override
-    public void parse(IRawRecord<? extends Writable> record, DataOutput out) throws HyracksDataException {
+    public boolean parse(IRawRecord<? extends Writable> record, DataOutput out) throws HyracksDataException {
         try {
             Writable hiveRawRecord = record.get();
             Object hiveObject = hiveSerde.deserialize(hiveRawRecord);
@@ -129,6 +129,7 @@ public class HiveRecordParser implements IRecordDataParser<Writable> {
                 recBuilder.addField(i, fieldValueBuffer);
             }
             recBuilder.write(out, true);
+            return true;
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
index 72f86c1..3216aef 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
@@ -90,13 +90,14 @@ public class JSONDataParser extends AbstractNestedDataParser<ADMToken>
      */
 
     @Override
-    public final void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
+    public final boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
         try {
             //TODO(wyk): find a way to reset byte[] instead of creating a new parser for each record.
             jsonParser = jsonFactory.createParser(record.get(), 0, record.size());
             geometryCoParser.reset(jsonParser);
             nextToken();
             parseObject(rootType, out);
+            return true;
         } catch (IOException e) {
             throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java
index e260083..84a2d90 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java
@@ -48,7 +48,7 @@ public class RSSParser implements IRecordDataParser<SyndEntry> {
     }
 
     @Override
-    public void parse(IRawRecord<? extends SyndEntry> record, DataOutput out) throws HyracksDataException {
+    public boolean parse(IRawRecord<? extends SyndEntry> record, DataOutput out) throws HyracksDataException {
         SyndEntry entry = record.get();
         tupleFieldValues[0] = String.valueOf(id);
         tupleFieldValues[1] = entry.getTitle();
@@ -62,5 +62,6 @@ public class RSSParser implements IRecordDataParser<SyndEntry> {
         recordBuilder.init();
         IDataParser.writeRecord(mutableRecord, out, recordBuilder);
         id++;
+        return true;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
index 6c9298e..d799f22 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -72,14 +72,15 @@ public class RecordWithMetadataParser<T, O> implements IRecordWithMetadataParser
     }
 
     @Override
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException {
+    public boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException {
         try {
             rwm = converter.convert(record);
             if (rwm.getRecord().size() == 0) {
                 // null record
                 out.writeByte(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
+                return true;
             } else {
-                recordParser.parse(rwm.getRecord(), out);
+                return recordParser.parse(rwm.getRecord(), out);
             }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithPKDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithPKDataParser.java
index aa0db53..8c00a9f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithPKDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithPKDataParser.java
@@ -37,15 +37,16 @@ public class RecordWithPKDataParser<T> implements IRecordWithPKDataParser<T> {
     }
 
     @Override
-    public void parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException {
+    public boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException {
         if (record.size() == 0) {
             try {
                 out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                return true;
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
             }
         } else {
-            recordParser.parse(record, out);
+            return recordParser.parse(record, out);
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
index 1cbf0be..4726a50 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -259,12 +259,13 @@ public class TweetParser extends AbstractDataParser implements IRecordDataParser
     }
 
     @Override
-    public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
+    public boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
         try {
             //TODO get rid of this temporary json
             resetPools();
             ObjectMapper om = new ObjectMapper();
             writeRecord(om.readTree(record.getBytes()), out, recordType);
+            return true;
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
index 704ea29..f940f3d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -99,7 +99,7 @@ public class RecordWithMetadataParserFactory<I, O> implements IRecordDataParserF
     @Override
     public IRecordDataParser<I> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
         IRecordDataParser<O> recordParser = recordParserFactory.createRecordParser(ctx);
-        return new RecordWithMetadataParser<I, O>(metaType, recordParser, converterFactory.createConverter());
+        return new RecordWithMetadataParser<I, O>(metaType, recordParser, converterFactory.createConverter(ctx));
     }
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index f9b012e..2644f3d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -54,6 +54,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class DataflowControllerProvider {
 
+    private DataflowControllerProvider() {
+    }
+
     // TODO: Instead, use a factory just like data source and data parser.
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
@@ -67,6 +70,8 @@ public class DataflowControllerProvider {
                     IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
                     IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
                     IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
+                    // TODO(ali): revisit to think about passing data source name via setter or via createRecordParser
+                    dataParser.setDataSourceName(recordReader.getDataSourceName());
                     if (indexingOp) {
                         return new IndexingDataFlowController(ctx, dataParser, recordReader,
                                 ((IIndexingDatasource) recordReader).getIndexer());
@@ -96,6 +101,7 @@ public class DataflowControllerProvider {
                     IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory;
                     AsterixInputStream stream = streamFactory.createInputStream(ctx, partition);
                     IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
+                    // TODO(ali): revisit to think about passing data source name to parser
                     IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
                     streamParser.setInputStream(stream);
                     if (isFeed) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a05ad77..f08c6e6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.util;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.Supplier;
 
 public class ExternalDataConstants {
 
@@ -245,6 +246,8 @@ public class ExternalDataConstants {
     public static final int DEFAULT_QUEUE_SIZE = 64;
     public static final int MAX_RECORD_SIZE = 32000000;
 
+    public static final Supplier<String> EMPTY_STRING = () -> "";
+
     /**
      * Expected parameter values
      */
@@ -260,6 +263,8 @@ public class ExternalDataConstants {
     public static final String READER_RSS = "rss_feed";
 
     public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
+    public static final String MISSING_FIELDS = "some fields are missing";
+    public static final String REC_ENDED_IN_Q = "malformed input record ended inside quote";
 
     public static class AwsS3Constants {
         public static final String REGION_FIELD_NAME = "region";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java
similarity index 53%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java
index 4990527..129f28a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java
@@ -16,27 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.converter;
+package org.apache.asterix.external.util;
 
-import java.io.Serializable;
-import java.util.Map;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
 
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IRecordConverter;
-import org.apache.asterix.om.types.ARecordType;
+public class ParseUtil {
 
-public interface IRecordConverterFactory<I, O> extends Serializable {
+    private static final SourceLocation SRC_LOC = new SourceLocation(-1, -1);
 
-    public IRecordConverter<I, O> createConverter();
-
-    public void configure(Map<String, String> configuration) throws AsterixException;
-
-    public Class<?> getInputClass();
-
-    public Class<?> getOutputClass();
-
-    public void setRecordType(ARecordType recordType);
-
-    public void setMetaType(ARecordType metaType);
+    private ParseUtil() {
+    }
 
+    public static void warn(IWarningCollector warningCollector, String dataSourceName, int recordNum, int fieldNum,
+            String warnMessage) {
+        warningCollector.warn(
+                Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName, recordNum, fieldNum, warnMessage));
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index 8ef6273..0bbb2b8 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@ -132,9 +132,10 @@ public class ClassAdToADMTest extends TestCase {
                 while (recordReader.hasNext()) {
                     tb.reset();
                     IRawRecord<char[]> record = recordReader.next();
-                    parser.parse(record, tb.getDataOutput());
-                    tb.addFieldEndOffset();
-                    printTuple(tb, printers, printStream);
+                    if (parser.parse(record, tb.getDataOutput())) {
+                        tb.addFieldEndOffset();
+                        printTuple(tb, printers, printStream);
+                    }
                 }
                 recordReader.close();
                 printStream.close();
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
index 4a1c740..70116b3 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
@@ -290,7 +290,7 @@ public class ClassAdParser extends AbstractDataParser implements IRecordDataPars
         return null;
     }
 
-    private void parseRecord(ARecordType recType, ClassAd pAd, DataOutput out) throws IOException, AsterixException {
+    private boolean parseRecord(ARecordType recType, ClassAd pAd, DataOutput out) throws IOException, AsterixException {
         ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
         ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
         IARecordBuilder recBuilder = getRecordBuilder();
@@ -359,6 +359,7 @@ public class ClassAdParser extends AbstractDataParser implements IRecordDataPars
             }
         }
         recBuilder.write(out, true);
+        return true;
     }
 
     private void writeFieldValueToBuffer(IAType fieldType, DataOutput out, String name, ExprTree tree, ClassAd pAd)
@@ -1742,7 +1743,7 @@ public class ClassAdParser extends AbstractDataParser implements IRecordDataPars
     }
 
     @Override
-    public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
+    public boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
         try {
             resetPools();
             if (oldFormat) {
@@ -1768,7 +1769,7 @@ public class ClassAdParser extends AbstractDataParser implements IRecordDataPars
                 rootAd.reset();
                 asterixParseClassAd(rootAd);
             }
-            parseRecord(recordType, rootAd, out);
+            return parseRecord(recordType, rootAd, out);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
index dad9cfd..925eeee 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
@@ -36,9 +36,9 @@ public class TestRecordWithPKParser<T> implements IRecordWithPKDataParser<Record
     }
 
     @Override
-    public void parse(final IRawRecord<? extends RecordWithPK<T>> record, final DataOutput out)
+    public boolean parse(final IRawRecord<? extends RecordWithPK<T>> record, final DataOutput out)
             throws HyracksDataException {
-        recordParser.parse(record.get().getRecord(), out);
+        return recordParser.parse(record.get().getRecord(), out);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 47c6ffe..888de08 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -95,8 +95,9 @@ public class RecordWithMetaTest {
             LineRecordReader lineReader = new LineRecordReader();
             lineReader.configure(ctx, inputStream, config);
             // create csv with json record reader
-            CSVToRecordWithMetadataAndPKConverter recordConverter = new CSVToRecordWithMetadataAndPKConverter(
-                    valueIndex, delimiter, metaType, recordType, pkIndicators, pkIndexes, keyTypes);
+            CSVToRecordWithMetadataAndPKConverter recordConverter =
+                    new CSVToRecordWithMetadataAndPKConverter(valueIndex, delimiter, metaType, recordType, pkIndicators,
+                            pkIndexes, keyTypes, ctx.getWarningCollector());
             // create the value parser <ADM in this case>
             ADMDataParser valueParser = new ADMDataParser(recordType, false);
             // create parser.
@@ -124,14 +125,14 @@ public class RecordWithMetaTest {
             while (lineReader.hasNext()) {
                 IRawRecord<char[]> record = lineReader.next();
                 tb.reset();
-                parser.parse(record, tb.getDataOutput());
-                tb.addFieldEndOffset();
-                parser.parseMeta(tb.getDataOutput());
-                tb.addFieldEndOffset();
-                parser.appendLastParsedPrimaryKeyToTuple(tb);
-                //print tuple
-                printTuple(tb, printers, printStream);
-
+                if (parser.parse(record, tb.getDataOutput())) {
+                    tb.addFieldEndOffset();
+                    parser.parseMeta(tb.getDataOutput());
+                    tb.addFieldEndOffset();
+                    parser.appendLastParsedPrimaryKeyToTuple(tb);
+                    //print tuple
+                    printTuple(tb, printers, printStream);
+                }
             }
             lineReader.close();
             printStream.close();
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
index e183a84..b39ca45 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
@@ -72,7 +72,9 @@ public class TweetParserTest {
         for (int iter1 = 0; iter1 < lines.size(); iter1++) {
             record.set(lines.get(iter1));
             try {
-                parser.parse(record, output);
+                if (!parser.parse(record, output)) {
+                    Assert.fail("Unexpected failure in parser.");
+                }
             } catch (HyracksDataException e) {
                 e.printStackTrace();
                 Assert.fail("Unexpected failure in parser.");
@@ -98,8 +100,9 @@ public class TweetParserTest {
         for (int iter1 = 0; iter1 < lines.size(); iter1++) {
             record.set(lines.get(iter1));
             try {
-                parser.parse(record, output);
-                regularCount++;
+                if (parser.parse(record, output)) {
+                    regularCount++;
+                }
             } catch (HyracksDataException e) {
                 Assert.assertTrue(e.toString().contains("Non-null") && (iter1 == 0 || iter1 == 1));
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 38f49dc..16ba168 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -157,6 +157,7 @@ public class ErrorCode {
     public static final int NUMERIC_PROMOTION_ERROR = 121;
     public static final int ERROR_PRINTING_PLAN = 122;
     public static final int INSUFFICIENT_MEMORY = 123;
+    public static final int PARSING_ERROR = 124;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 87b963a..bed21f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -140,6 +140,7 @@
 121 = A numeric type promotion error has occurred: %1$s
 122 = Encountered an error while printing the plan
 123 = Insufficient memory is provided for the join operators, please increase the join memory budget.
+124 = Parsing error in %1$s at record %2$s field %3$s: %4$s
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 2eb882a..e91992d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -28,12 +28,16 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 
+/**
+ * Currently used by tests only. Reconsider the code when it will be used otherwise.
+ */
 public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
     private static final long serialVersionUID = 1L;
     private IValueParserFactory[] valueParserFactories;
@@ -54,6 +58,8 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
     @Override
     public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
         return new ITupleParser() {
+            final IWarningCollector warningCollector = ctx.getWarningCollector();
+
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
                 try {
@@ -67,13 +73,20 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
                     ArrayTupleBuilder tb = new ArrayTupleBuilder(valueParsers.length);
                     DataOutput dos = tb.getDataOutput();
 
-                    FieldCursorForDelimitedDataParser cursor =
-                            new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
+                    FieldCursorForDelimitedDataParser cursor = new FieldCursorForDelimitedDataParser(
+                            new InputStreamReader(in), fieldDelimiter, quote, warningCollector, () -> "");
                     while (cursor.nextRecord()) {
                         tb.reset();
                         for (int i = 0; i < valueParsers.length; ++i) {
-                            if (!cursor.nextField()) {
-                                break;
+                            FieldCursorForDelimitedDataParser.Result result = cursor.nextField();
+                            switch (result) {
+                                case OK:
+                                    break;
+                                case END:
+                                case ERROR:
+                                    throw new HyracksDataException("Failed to parse record");
+                                default:
+                                    throw new IllegalStateException();
                             }
                             // Eliminate double quotes in the field that we are going to parse
                             if (cursor.fieldHasDoubleQuote()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index fd3e4c3..ed2777b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -21,6 +21,12 @@ package org.apache.hyracks.dataflow.std.file;
 import java.io.IOException;
 import java.io.Reader;
 import java.util.Arrays;
+import java.util.function.Supplier;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
 
 public class FieldCursorForDelimitedDataParser {
 
@@ -29,9 +35,22 @@ public class FieldCursorForDelimitedDataParser {
         IN_RECORD, //cursor is inside record
         EOR, //cursor is at end of record
         CR, //cursor at carriage return
-        EOF //end of stream reached
+        EOF, //end of stream reached
+        FAILED // cursor failed to parse a field
+    }
+
+    public enum Result {
+        OK,
+        ERROR,
+        END
     }
 
+    private static final SourceLocation SRC_LOC = new SourceLocation(-1, -1);
+    private static final String CLOSING_Q = "missing a closing quote";
+    private static final String OPENING_Q = "a quote should be in the beginning";
+    private static final String DELIMITER_AFTER_Q = "a quote enclosing a field needs to be followed by the delimiter";
+    private final IWarningCollector warnings;
+    private final Supplier<String> dataSourceName;
     private char[] buffer; //buffer to holds the input coming form the underlying input stream
     private int fStart; //start position for field
     private int fEnd; //end position for field
@@ -58,7 +77,10 @@ public class FieldCursorForDelimitedDataParser {
     private final char quote; //the quote character
     private final char fieldDelimiter; //the delimiter
 
-    public FieldCursorForDelimitedDataParser(Reader in, char fieldDelimiter, char quote) {
+    public FieldCursorForDelimitedDataParser(Reader in, char fieldDelimiter, char quote,
+            IWarningCollector warningCollector, Supplier<String> dataSourceName) {
+        this.warnings = warningCollector;
+        this.dataSourceName = dataSourceName;
         this.in = in;
         if (in != null) {
             buffer = new char[INITIAL_BUFFER_SIZE];
@@ -217,17 +239,21 @@ public class FieldCursorForDelimitedDataParser {
 
                 case EOF:
                     return false;
+                case FAILED:
+                    return false;
             }
         }
     }
 
-    public boolean nextField() throws IOException {
+    public Result nextField() throws IOException {
         switch (state) {
             case INIT:
             case EOR:
             case EOF:
             case CR:
-                return false;
+                return Result.END;
+            case FAILED:
+                return Result.ERROR;
 
             case IN_RECORD:
                 fieldCount++;
@@ -260,11 +286,14 @@ public class FieldCursorForDelimitedDataParser {
                                     fStart = start + 1;
                                     fEnd = p - 1;
                                 } else {
-                                    throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
-                                            + " - missing a closing quote");
+                                    state = State.FAILED;
+                                    if (warnings.shouldWarn()) {
+                                        warn(CLOSING_Q);
+                                    }
+                                    return Result.ERROR;
                                 }
                             }
-                            return true;
+                            return Result.OK;
                         }
                     }
                     char ch = buffer[p];
@@ -275,8 +304,11 @@ public class FieldCursorForDelimitedDataParser {
                                 startedQuote = true;
                             } else {
                                 // In this case, we don't have a quote in the beginning of a field.
-                                throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
-                                        + " - a quote enclosing a field needs to be placed in the beginning of that field");
+                                state = State.FAILED;
+                                if (warnings.shouldWarn()) {
+                                    warn(OPENING_Q);
+                                }
+                                return Result.ERROR;
                             }
                         }
                         // Check double quotes - "". We check [start != p-2]
@@ -300,7 +332,7 @@ public class FieldCursorForDelimitedDataParser {
                             fEnd = p;
                             start = p + 1;
                             lastDelimiterPosition = p;
-                            return true;
+                            return Result.OK;
                         }
 
                         if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
@@ -313,14 +345,17 @@ public class FieldCursorForDelimitedDataParser {
                             start = p + 1;
                             lastDelimiterPosition = p;
                             startedQuote = false;
-                            return true;
+                            return Result.OK;
                         } else if (lastQuotePosition < p - 1 && lastQuotePosition != lastDoubleQuotePosition
                                 && quoteCount == doubleQuoteCount * 2 + 2) {
                             // There is a quote before the delimiter, however it is not directly placed before the delimiter.
                             // In this case, we throw an exception.
                             // quoteCount == doubleQuoteCount * 2 + 2 : only true when we have two quotes except double-quotes.
-                            throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
-                                    + " -  A quote enclosing a field needs to be followed by the delimiter.");
+                            state = State.FAILED;
+                            if (warnings.shouldWarn()) {
+                                warn(DELIMITER_AFTER_Q);
+                            }
+                            return Result.ERROR;
                         }
                         // If the control flow reaches here: we have a delimiter in this field and
                         // there should be a quote in the beginning and the end of
@@ -332,7 +367,7 @@ public class FieldCursorForDelimitedDataParser {
                             start = p + 1;
                             state = ch == '\n' ? State.EOR : State.CR;
                             lastDelimiterPosition = p;
-                            return true;
+                            return Result.OK;
                         } else if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
                                 && quoteCount == doubleQuoteCount * 2 + 2) {
                             // set the position of fStart to +1, fEnd to -1 to remove quote character
@@ -342,7 +377,7 @@ public class FieldCursorForDelimitedDataParser {
                             start = p + 1;
                             state = ch == '\n' ? State.EOR : State.CR;
                             startedQuote = false;
-                            return true;
+                            return Result.OK;
                         }
                     }
                     ++p;
@@ -397,4 +432,9 @@ public class FieldCursorForDelimitedDataParser {
         fEnd -= doubleQuoteCount;
         isDoubleQuoteIncludedInThisField = false;
     }
+
+    private void warn(String message) {
+        warnings.warn(Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName.get(), recordCount,
+                fieldCount, message));
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
index 8edcafc..5561ad1 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
@@ -30,6 +30,7 @@ import org.junit.Assert;
 public class CursorTest {
 
     // @Test commented out due to ASTERIXDB-1881
+    // fix the code if it is to be enabled
     public void test() {
         FileInputStream in = null;
         BufferedReader reader = null;
@@ -38,24 +39,28 @@ public class CursorTest {
                     Paths.get(getClass().getResource("/data/beer.txt").toURI()).toAbsolutePath().toString());
             reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
             // skip header
-            final FieldCursorForDelimitedDataParser cursor = new FieldCursorForDelimitedDataParser(reader, ',', '"');
+            final FieldCursorForDelimitedDataParser cursor =
+                    new FieldCursorForDelimitedDataParser(reader, ',', '"', null, () -> "");
             // get number of fields from header (first record is header)
             cursor.nextRecord();
             int numOfFields = 0;
             int expectedNumberOfRecords = 7307;
-            while (cursor.nextField()) {
+            FieldCursorForDelimitedDataParser.Result lastResult = cursor.nextField();
+            while ((lastResult = cursor.nextField()) == FieldCursorForDelimitedDataParser.Result.OK) {
                 numOfFields++;
             }
+            Assert.assertNotEquals(lastResult, FieldCursorForDelimitedDataParser.Result.ERROR);
 
             int recordNumber = 0;
             while (cursor.nextRecord()) {
                 int fieldNumber = 0;
-                while (cursor.nextField()) {
+                while ((lastResult = cursor.nextField()) == FieldCursorForDelimitedDataParser.Result.OK) {
                     if (cursor.fieldHasDoubleQuote()) {
                         cursor.eliminateDoubleQuote();
                     }
                     fieldNumber++;
                 }
+                Assert.assertNotEquals(lastResult, FieldCursorForDelimitedDataParser.Result.ERROR);
                 if ((fieldNumber > numOfFields) || (fieldNumber < numOfFields)) {
                     System.err.println("Test case failed. Expected number of fields in each record is " + numOfFields
                             + " and record number " + recordNumber + " was found to have " + fieldNumber);