You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/08/25 06:55:27 UTC

[ignite] branch master updated: IGNITE-12852 SQL command COPY: fix parsing quoted delimiters in CSV fields content (#10141)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e2008f46525 IGNITE-12852 SQL command COPY: fix parsing quoted delimiters in CSV fields content (#10141)
e2008f46525 is described below

commit e2008f4652544e51c75a8586ef8aee82fb2923b9
Author: Anton K <an...@gmail.com>
AuthorDate: Thu Aug 25 09:55:17 2022 +0300

    IGNITE-12852 SQL command COPY: fix parsing quoted delimiters in CSV fields content (#10141)
---
 .../ignite/jdbc/thin/JdbcThinBulkLoadSelfTest.java | 428 ++++++++++++++++++++-
 .../src/test/resources/bulkload1_unmatched1.csv    |   2 +
 .../src/test/resources/bulkload1_unmatched2.csv    |   2 +
 .../src/test/resources/bulkload1_unmatched3.csv    |   2 +
 .../src/test/resources/bulkload1_unmatched4.csv    |   2 +
 .../src/test/resources/bulkload1_unmatched5.csv    |   2 +
 .../src/test/resources/bulkload_empty_numeric.csv  |   3 +
 .../bulkload_empty_numeric_with_null_string.csv    |   3 +
 .../bulkload_empty_numeric_with_trim_off.csv       |   3 +
 .../src/test/resources/bulkload_rfc4180_comma.csv  |   7 +
 .../src/test/resources/bulkload_rfc4180_pipe.csv   |   7 +
 .../processors/bulkload/BulkLoadCsvFormat.java     |  48 +++
 .../processors/bulkload/BulkLoadCsvParser.java     |   2 +-
 .../bulkload/pipeline/CsvLineProcessorBlock.java   | 170 ++++++--
 .../org/apache/ignite/internal/sql/SqlKeyword.java |   9 +
 .../internal/sql/command/SqlBulkLoadCommand.java   |  56 ++-
 .../internal/sql/SqlParserBulkLoadSelfTest.java    |  20 +
 .../indexing/src/test/resources/bulkload_bad.csv   |   4 +-
 .../indexing/src/test/resources/bulkload_ok.csv    |   4 +-
 19 files changed, 730 insertions(+), 44 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadSelfTest.java
index 1442b01c5ec..b58105a4c4c 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadSelfTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvParser;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.ComparisonFailure;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -86,6 +87,46 @@ public class JdbcThinBulkLoadSelfTest extends JdbcThinAbstractDmlStatementSelfTe
     private static final String BULKLOAD_CP1251_CSV_FILE =
         Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload2_windows1251.csv")).getAbsolutePath();
 
+    /** A CSV file in windows-1251. */
+    private static final String BULKLOAD_RFC4180_COMMA_CSV_FILE =
+            Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload_rfc4180_comma.csv")).getAbsolutePath();
+
+    /** A CSV file in windows-1251. */
+    private static final String BULKLOAD_RFC4180_PIPE_CSV_FILE_ =
+            Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload_rfc4180_pipe.csv")).getAbsolutePath();
+
+    /** A CSV file with one record and unmatched quote at the start of field. */
+    private static final String BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE1 =
+        Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload1_unmatched1.csv")).getAbsolutePath();
+
+    /** A CSV file with one record and unmatched quote at the end of the line. */
+    private static final String BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE2 =
+        Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload1_unmatched2.csv")).getAbsolutePath();
+
+    /** A CSV file with one record and unmatched quote as the field content. */
+    private static final String BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE3 =
+        Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload1_unmatched3.csv")).getAbsolutePath();
+
+    /** A CSV file with one record and unmatched quote in the unquoted field content. */
+    private static final String BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE4 =
+        Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload1_unmatched4.csv")).getAbsolutePath();
+
+    /** A CSV file with one record and unmatched quote in the quoted field content. */
+    private static final String BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE5 =
+        Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload1_unmatched5.csv")).getAbsolutePath();
+
+    /** A CSV file with one record and unmatched quote in the quoted field content. */
+    private static final String BULKLOAD_THREE_LINE_CSV_FILE_EMPTY_NUMERIC =
+        Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload_empty_numeric.csv")).getAbsolutePath();
+
+    /** A CSV file with one record and unmatched quote in the quoted field content. */
+    private static final String BULKLOAD_WITH_NULL_STRING =
+        Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload_empty_numeric_with_null_string.csv")).getAbsolutePath();
+
+    /** A CSV file with one record and unmatched quote in the quoted field content. */
+    private static final String BULKLOAD_WITH_TRIM_OFF =
+        Objects.requireNonNull(resolveIgnitePath(CSV_FILE_SUBDIR + "bulkload_empty_numeric_with_trim_off.csv")).getAbsolutePath();
+
     /** Basic COPY statement used in majority of the tests. */
     public static final String BASIC_SQL_COPY_STMT =
         "copy from '" + BULKLOAD_TWO_LINES_CSV_FILE + "'" +
@@ -251,6 +292,255 @@ public class JdbcThinBulkLoadSelfTest extends JdbcThinAbstractDmlStatementSelfTe
         checkCacheContents(TBL_NAME, true, 1);
     }
 
+    /**
+     * Imports three-entry CSV file into a table and checks the entry created using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testThreeLineFileWithEmptyNumericColumn() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from '" + BULKLOAD_THREE_LINE_CSV_FILE_EMPTY_NUMERIC + "' into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv");
+
+        assertEquals(3, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 3);
+    }
+
+    /**
+     * Imports three-entry CSV file into a table and checks the entry created using SELECT statement with specified
+     * null string and trim mode (ON).
+     * This test verifies that specific null string values will be correctly interpreted as null and will be inserted.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testThreeLineFileWithEmptyNumericColumnWithNullString() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from '" + BULKLOAD_WITH_NULL_STRING + "' into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv nullstring 'a'");
+
+        assertEquals(3, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 3);
+    }
+
+    /**
+     * Imports three-entry CSV file into a table and checks the entry created using SELECT statement with default null
+     * string and trim mode (ON).
+     * This test verifies that it is expected to fail on second value in case if there is unexpected text value
+     * in the fields that are expected to be numeric.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testThreeLineFileWithEmptyNumericColumnWithEmptyNullString() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from '" + BULKLOAD_WITH_NULL_STRING + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Value conversion failed");
+
+        checkCacheContents(TBL_NAME, true, 1);
+    }
+
+    /**
+     * Imports three-entry CSV file into a table and checks the entry created using SELECT statement with null string
+     * specified and trim OFF.
+     * This test verifies that the field which is equal to nullstring after trimming whitespaces will fail on insert
+     * with trim turned off with 'Value conversion failed' message.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testThreeLineFileWithEmptyNumericColumnWithNullStringAndTrimOff() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from '" + BULKLOAD_WITH_NULL_STRING + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv nullstring 'a' trim off");
+
+                return null;
+            }
+        }, SQLException.class, "Value conversion failed");
+    }
+
+    /**
+     * Imports three-entry CSV file into a table and checks the entry created using SELECT statement with null string
+     * specified and trim ON.
+     * This test verifies that the field which is equal to nullstring after trimming whitespaces will be correctly
+     * interpreted as null and will result in integer default value (0).
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testThreeLineFileWithEmptyNumericColumnWithNullStringAndTrimOn() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from '" + BULKLOAD_WITH_NULL_STRING + "' into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv nullstring 'a' trim on");
+
+        assertEquals(3, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 3);
+    }
+
+    /**
+     * Imports three-entry CSV file into a table and checks the entry created using SELECT statement with trim OFF.
+     * This test verifies that values will be inserted and whitespace in the field content is expected in this case.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testThreeLineFileWithEmptyNumericColumnWithTrimOff() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from '" + BULKLOAD_WITH_TRIM_OFF + "' into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv nullstring 'a' trim off");
+
+        assertEquals(3, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 3);
+    }
+
+    /**
+     * Imports three-entry CSV file into a table and checks the entry created using SELECT statement with trim OFF.
+     * This test verifies that values will be inserted, but value conversion will fail on whitespace in the field ([ ]).
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testThreeLineFileWithEmptyNumericColumnWithTrimOn() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int updatesCnt = stmt.executeUpdate(
+                    "copy from '" + BULKLOAD_WITH_TRIM_OFF + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv nullstring 'a' trim on");
+
+                assertEquals(3, updatesCnt);
+
+                checkCacheContents(TBL_NAME, true, 3);
+
+                return null;
+            }
+        }, ComparisonFailure.class, "expected:<[ ]FirstName104");
+    }
+
+    /**
+     * Verifies exception thrown if CSV row contains unmatched quote at the beginning of the field content.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testOneLineFileForUnmatchedStartQuote() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from '" + BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE1 + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Unmatched quote found at the end of line");
+
+        checkCacheContents(TBL_NAME, true, 0);
+    }
+
+    /**
+     * Verifies exception thrown if CSV row contains unmatched quote in end of the field content.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testOneLineFileForUnmatchedEndQuote() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from '" + BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE2 + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Unexpected quote in the field, line");
+
+        checkCacheContents(TBL_NAME, true, 0);
+    }
+
+    /**
+     * Verifies exception thrown if CSV row contains unmatched quote as the only field content.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testOneLineFileForSingleEndQuote() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from '" + BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE3 + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Unmatched quote found at the end of line");
+
+        checkCacheContents(TBL_NAME, true, 0);
+    }
+
+    /**
+     * Verifies exception thrown if CSV row contains single unmatched quote as the field content.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testOneLineFileForQuoteInContent() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from '" + BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE4 + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Unmatched quote found at the end of line");
+
+        checkCacheContents(TBL_NAME, true, 0);
+    }
+
+    /**
+     * Verifies exception thrown if CSV row contains unmatched quote in the quoted field content.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testOneLineFileForQuoteInQuotedContent() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from '" + BULKLOAD_ONE_LINE_CSV_FILE_UNMATCHED_QUOTE5 + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Unexpected quote in the field, line");
+
+        checkCacheContents(TBL_NAME, true, 0);
+    }
+
     /**
      * Verifies that error is reported for empty charset name.
      */
@@ -320,6 +610,60 @@ public class JdbcThinBulkLoadSelfTest extends JdbcThinAbstractDmlStatementSelfTe
         checkCacheContents(TBL_NAME, true, 2);
     }
 
+    /**
+     * Imports four-entry CSV file with default delimiter into a table and checks that
+     * the entry is created using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testCsvLoadWithDefaultDelimiter() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+                "copy from '" + BULKLOAD_RFC4180_COMMA_CSV_FILE + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+        assertEquals(7, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 7);
+    }
+
+    /**
+     * Imports four-entry CSV file with comma delimiter into a table and checks that
+     * the entry is created using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testCsvLoadWithCommaDelimiter() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+                "copy from '" + BULKLOAD_RFC4180_COMMA_CSV_FILE + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv delimiter ','");
+
+        assertEquals(7, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 7, ',');
+    }
+
+    /**
+     * Imports four-entry CSV file with pipe delimiter into a table and checks that
+     * the entry is created using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    @Test
+    public void testCsvLoadWithPipeDelimiter() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+                "copy from '" + BULKLOAD_RFC4180_PIPE_CSV_FILE_ + "' into " + TBL_NAME +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv delimiter '|'");
+
+        assertEquals(7, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 7, '|');
+    }
+
     /**
      * Imports two-entry CSV file with UTF-8 characters into a table and checks
      * the created entries using SELECT statement.
@@ -757,6 +1101,21 @@ public class JdbcThinBulkLoadSelfTest extends JdbcThinAbstractDmlStatementSelfTe
      * @throws SQLException When one of checks has failed.
      */
     private void checkCacheContents(String tblName, boolean checkLastName, int recCnt) throws SQLException {
+        checkCacheContents(tblName, checkLastName, recCnt, ',');
+    }
+
+    /**
+     * Checks cache contents after bulk loading data in the above tests: ASCII version.
+     * <p>
+     * Uses SQL SELECT command for querying entries.
+     *
+     * @param tblName Table name to query.
+     * @param checkLastName Check 'lastName' column (not imported in some tests).
+     * @param recCnt Number of records to expect.
+     * @param delimiter The delimiter of fields.
+     * @throws SQLException When one of checks has failed.
+     */
+    private void checkCacheContents(String tblName, boolean checkLastName, int recCnt, char delimiter) throws SQLException {
         ResultSet rs = stmt.executeQuery("select _key, age, firstName, lastName from " + tblName);
 
         assert rs != null;
@@ -766,18 +1125,34 @@ public class JdbcThinBulkLoadSelfTest extends JdbcThinAbstractDmlStatementSelfTe
         while (rs.next()) {
             int id = rs.getInt("_key");
 
-            if (id == 123) {
-                assertEquals(12, rs.getInt("age"));
-                assertEquals("FirstName123 MiddleName123", rs.getString("firstName"));
-                if (checkLastName)
-                    assertEquals("LastName123", rs.getString("lastName"));
-            }
-            else if (id == 456) {
-                assertEquals(45, rs.getInt("age"));
-                assertEquals("FirstName456", rs.getString("firstName"));
-                if (checkLastName)
-                    assertEquals("LastName456", rs.getString("lastName"));
-            }
+            SyntheticPerson sp = new SyntheticPerson(rs.getInt("age"),
+                rs.getString("firstName"), rs.getString("lastName"));
+
+            if (id == 101)
+                sp.validateValues(0, "FirstName101 MiddleName101", "LastName101", checkLastName);
+            else if (id == 102)
+                sp.validateValues(0, "FirstName102 MiddleName102", "LastName102", checkLastName);
+            else if (id == 103)
+                sp.validateValues(0, "FirstName103 MiddleName103", "LastName103", checkLastName);
+            else if (id == 104)
+                sp.validateValues(0, " FirstName104 MiddleName104", "LastName104", checkLastName);
+            else if (id == 123)
+                sp.validateValues(12, "FirstName123 MiddleName123", "LastName123", checkLastName);
+            else if (id == 234)
+                sp.validateValues(23, "FirstName|234", null, checkLastName);
+            else if (id == 345)
+                sp.validateValues(34, "FirstName,345", null, checkLastName);
+            else if (id == 456)
+                sp.validateValues(45, "FirstName456", "LastName456", checkLastName);
+            else if (id == 567)
+                sp.validateValues(56, null, null, checkLastName);
+            else if (id == 678)
+                sp.validateValues(67, null, null, checkLastName);
+            else if (id == 789)
+                sp.validateValues(78, "FirstName789 plus \"quoted\"", "LastName 789", checkLastName);
+            else if (id == 101112)
+                sp.validateValues(1011, "FirstName 101112",
+                    "LastName\"" + delimiter + "\" 1011" + delimiter + " 12", checkLastName);
             else
                 fail("Wrong ID: " + id);
 
@@ -920,4 +1295,33 @@ public class JdbcThinBulkLoadSelfTest extends JdbcThinAbstractDmlStatementSelfTe
             return appliedCharset.decode(encodedBuf).toString();
         }
     }
+
+    /**
+     *
+     */
+    private class SyntheticPerson {
+        /** */
+        int age;
+
+        /** */
+        String firstName;
+
+        /** */
+        String lastName;
+
+        /** */
+        public SyntheticPerson(int age, String firstName, String lastName) {
+            this.age = age;
+            this.firstName = firstName;
+            this.lastName = lastName;
+        }
+
+        /** */
+        public void validateValues(int age, String firstName, String lastName, boolean checkLastName) {
+            assertEquals(age, this.age);
+            assertEquals(firstName, this.firstName);
+            if (checkLastName)
+                assertEquals(lastName, this.lastName);
+        }
+    }
 }
diff --git a/modules/clients/src/test/resources/bulkload1_unmatched1.csv b/modules/clients/src/test/resources/bulkload1_unmatched1.csv
new file mode 100644
index 00000000000..f47dba20c6e
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload1_unmatched1.csv
@@ -0,0 +1,2 @@
+123,12,"FirstName123 MiddleName123","LastName123
+456,45,"FirstName456","LastName456"
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload1_unmatched2.csv b/modules/clients/src/test/resources/bulkload1_unmatched2.csv
new file mode 100644
index 00000000000..fbcd3f308aa
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload1_unmatched2.csv
@@ -0,0 +1,2 @@
+123,12,"FirstName123 MiddleName123",LastName123"
+456,45,"FirstName456","LastName456"
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload1_unmatched3.csv b/modules/clients/src/test/resources/bulkload1_unmatched3.csv
new file mode 100644
index 00000000000..598bd6d2462
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload1_unmatched3.csv
@@ -0,0 +1,2 @@
+123,12,"FirstName123 MiddleName123","
+456,45,"FirstName456","LastName456"
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload1_unmatched4.csv b/modules/clients/src/test/resources/bulkload1_unmatched4.csv
new file mode 100644
index 00000000000..f47dba20c6e
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload1_unmatched4.csv
@@ -0,0 +1,2 @@
+123,12,"FirstName123 MiddleName123","LastName123
+456,45,"FirstName456","LastName456"
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload1_unmatched5.csv b/modules/clients/src/test/resources/bulkload1_unmatched5.csv
new file mode 100644
index 00000000000..610fec78396
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload1_unmatched5.csv
@@ -0,0 +1,2 @@
+123,12,"FirstName123 " MiddleName123",LastName123
+456,45,"FirstName456","LastName456"
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload_empty_numeric.csv b/modules/clients/src/test/resources/bulkload_empty_numeric.csv
new file mode 100644
index 00000000000..b3fb6fddbae
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload_empty_numeric.csv
@@ -0,0 +1,3 @@
+101,"0","FirstName101 MiddleName101",LastName101
+102,,"FirstName102 MiddleName102",LastName102
+103,"","FirstName103 MiddleName103",LastName103
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload_empty_numeric_with_null_string.csv b/modules/clients/src/test/resources/bulkload_empty_numeric_with_null_string.csv
new file mode 100644
index 00000000000..fd7fd4aebe4
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload_empty_numeric_with_null_string.csv
@@ -0,0 +1,3 @@
+101,"0","FirstName101 MiddleName101",LastName101
+102,a,"FirstName102 MiddleName102",LastName102
+103," a"," FirstName103 MiddleName103",LastName103
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload_empty_numeric_with_trim_off.csv b/modules/clients/src/test/resources/bulkload_empty_numeric_with_trim_off.csv
new file mode 100644
index 00000000000..cf44d790b3e
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload_empty_numeric_with_trim_off.csv
@@ -0,0 +1,3 @@
+101,"0","FirstName101 MiddleName101",LastName101
+102,a,"FirstName102 MiddleName102",LastName102
+104,"a"," FirstName104 MiddleName104",LastName104
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload_rfc4180_comma.csv b/modules/clients/src/test/resources/bulkload_rfc4180_comma.csv
new file mode 100644
index 00000000000..91458b80a30
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload_rfc4180_comma.csv
@@ -0,0 +1,7 @@
+123,12,"FirstName123 MiddleName123",LastName123
+234,23,FirstName|234,""
+456,45,"FirstName456","LastName456"
+567,56,,
+678,67,""
+789,78,"FirstName789 plus ""quoted""","LastName 789"
+101112,1011,"FirstName 101112","LastName"","" 1011, 12"
\ No newline at end of file
diff --git a/modules/clients/src/test/resources/bulkload_rfc4180_pipe.csv b/modules/clients/src/test/resources/bulkload_rfc4180_pipe.csv
new file mode 100644
index 00000000000..ee97b14dbbf
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload_rfc4180_pipe.csv
@@ -0,0 +1,7 @@
+123|12|"FirstName123 MiddleName123"|LastName123
+345|34|FirstName,345||""
+456|45|"FirstName456"|"LastName456"
+567|56||
+678|67|""
+789|78|"FirstName789 plus ""quoted"""|"LastName 789"
+101112|1011|"FirstName 101112"|"LastName""|"" 1011| 12"
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java
index 73115acd43c..04998b62f89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java
@@ -40,6 +40,12 @@ public class BulkLoadCsvFormat extends BulkLoadFormat {
     /** Line comment start pattern. */
     @Nullable public static final Pattern DEFAULT_COMMENT_CHARS = null;
 
+    /** A string value to be replaced with NULL. */
+    @Nullable public static final String DEFAULT_NULL_STRING = "";
+
+    /** Whether leading and trailing spaces should be trimmed. */
+    @Nullable public static final boolean DEFAULT_TRIM_SPACES = true;
+
     /** Format name. */
     public static final String NAME = "CSV";
 
@@ -61,6 +67,12 @@ public class BulkLoadCsvFormat extends BulkLoadFormat {
     /** File charset. */
     @Nullable private String inputCharsetName;
 
+    /**    */
+    @Nullable private String nullString;
+
+    /**    */
+    @Nullable private boolean trim;
+
     /**
      * Returns the name of the format.
      *
@@ -177,4 +189,40 @@ public class BulkLoadCsvFormat extends BulkLoadFormat {
     public void inputCharsetName(@Nullable String inputCharsetName) {
         this.inputCharsetName = inputCharsetName;
     }
+
+    /**
+     * Returns the string to be used for null values, null if not specified.
+     *
+     * @return The string to be used for null values, null if not specified.
+     */
+    public String nullString() {
+        return nullString;
+    }
+
+    /**
+     * Sets the string to be used for null values.
+     *
+     * @param nullString The string to be used for null values.
+     */
+    public void nullString(@Nullable String nullString) {
+        this.nullString = nullString;
+    }
+
+    /**
+     * Returns whether the field content must be trimmed of leading and trailing spaces.
+     *
+     * @return Whether the field content must be trimmed.
+     */
+    public boolean trim() {
+        return trim;
+    }
+
+    /**
+     * Sets whether the field content must be trimmed of leading and trailing spaces.
+     *
+     * @param trim Whether the field content must be trimmed.
+     */
+    public void trim(@Nullable boolean trim) {
+        this.trim = trim;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java
index 05df6d1d113..079274b78ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java
@@ -63,7 +63,7 @@ public class BulkLoadCsvParser extends BulkLoadParser {
 
         // Handling of the other options is to be implemented in IGNITE-7537.
         inputBlock.append(new LineSplitterBlock(format.lineSeparator()))
-               .append(new CsvLineProcessorBlock(format.fieldSeparator(), format.quoteChars()))
+               .append(new CsvLineProcessorBlock(format))
                .append(collectorBlock);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java
index f932822e8df..8801e053c58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java
@@ -17,55 +17,173 @@
 
 package org.apache.ignite.internal.processors.bulkload.pipeline;
 
-import java.util.regex.Pattern;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvFormat;
 
 /**
- * A {@link PipelineBlock}, which splits line according to CSV format rules and unquotes fields.
- * The next block {@link PipelineBlock#accept(Object, boolean)} is called per-line.
+ * A {@link PipelineBlock}, which splits line according to CSV format rules and unquotes fields. The next block {@link
+ * PipelineBlock#accept(Object, boolean)} is called per-line.
  */
 public class CsvLineProcessorBlock extends PipelineBlock<String, String[]> {
+    /** Empty string array. */
+    public static final String[] EMPTY_STR_ARRAY = new String[0];
+
     /** Field delimiter pattern. */
-    private final Pattern fldDelim;
+    private final char fldDelim;
 
     /** Quote character. */
-    private final String quoteChars;
+    private final char quoteChars;
+
+    /** Null string. */
+    private final String nullString;
+
+    /** Trim field string content. */
+    private final boolean trim;
+
+    /** Lines count. */
+    private int line = 0;
+
+    /** Symbol count. */
+    private int symbol = 0;
 
     /**
      * Creates a CSV line parser.
-     *
-     * @param fldDelim The pattern for the field delimiter.
-     * @param quoteChars Quoting character.
      */
-    public CsvLineProcessorBlock(Pattern fldDelim, String quoteChars) {
-        this.fldDelim = fldDelim;
-        this.quoteChars = quoteChars;
+    public CsvLineProcessorBlock(BulkLoadCsvFormat format) {
+        this.fldDelim = format.fieldSeparator().toString().charAt(0);
+        this.quoteChars = format.quoteChars().charAt(0);
+        this.nullString = format.nullString();
+        this.trim = format.trim();
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override public void accept(String input, boolean isLastPortion) throws IgniteCheckedException {
-        // Currently we don't process quoted field delimiter properly, will be fixed in IGNITE-7537.
-        String[] fields = fldDelim.split(input);
+        List<String> fields = new ArrayList<>();
+
+        StringBuilder currentField = new StringBuilder(256);
+
+        ReaderState state = ReaderState.IDLE;
+
+        final int length = input.length();
+        int copy = 0;
+        int current = 0;
+        int prev = -1;
+        int copyStart = 0;
+
+        boolean quotesMatched = true;
+
+        line++;
+        symbol = 0;
+
+        while (true) {
+            if (current == length) {
+                if (!quotesMatched)
+                    throw new IgniteCheckedException(new SQLException("Unmatched quote found at the end of line "
+                        + line + ", symbol " + symbol));
+
+                if (copy > 0)
+                    currentField.append(input, copyStart, copyStart + copy);
+
+                addField(fields, currentField, prev == quoteChars);
+
+                break;
+            }
+
+            final char c = input.charAt(current++);
+            symbol++;
 
-        for (int i = 0; i < fields.length; i++)
-            fields[i] = trim(fields[i]);
+            if (state == ReaderState.QUOTED) {
+                if (c == quoteChars) {
+                    state = ReaderState.IDLE;
+                    quotesMatched = !quotesMatched;
+
+                    if (copy > 0) {
+                        currentField.append(input, copyStart, copyStart + copy);
+
+                        copy = 0;
+                    }
+
+                    copyStart = current;
+                }
+                else
+                    copy++;
+            }
+            else {
+                if (c == fldDelim) {
+                    if (copy > 0) {
+                        currentField.append(input, copyStart, copyStart + copy);
+
+                        copy = 0;
+                    }
+
+                    addField(fields, currentField, prev == quoteChars);
+
+                    currentField = new StringBuilder();
+                    copyStart = current;
+
+                    state = ReaderState.IDLE;
+                }
+                else if (c == quoteChars && state != ReaderState.UNQUOTED) {
+                    state = ReaderState.QUOTED;
+
+                    quotesMatched = !quotesMatched;
+
+                    if (prev == quoteChars)
+                        copy++;
+                    else
+                        copyStart = current;
+                }
+                else {
+                    if (c == quoteChars) {
+                        if (state == ReaderState.UNQUOTED)
+                            throw new IgniteCheckedException(
+                                new SQLException("Unexpected quote in the field, line " + line
+                                    + ", symbol " + symbol));
+
+                        quotesMatched = !quotesMatched;
+                    }
+
+                    copy++;
+
+                    if (state == ReaderState.IDLE)
+                        state = ReaderState.UNQUOTED;
+                }
+            }
+
+            prev = c;
+        }
+
+        nextBlock.accept(fields.toArray(EMPTY_STR_ARRAY), isLastPortion);
+    }
+
+    /**
+     *
+     * @param fields row fields.
+     * @param fieldVal field value.
+     */
+    private void addField(List<String> fields, StringBuilder fieldVal, boolean quoted) {
+        final String val = trim ? fieldVal.toString().trim() : fieldVal.toString();
 
-        nextBlock.accept(fields, isLastPortion);
+        fields.add(val.equals(nullString) ? null : val);
     }
 
     /**
-     * Trims quote characters from beginning and end of the line.
      *
-     * @param str String to trim.
-     * @return The trimmed string.
      */
-    private String trim(String str) {
-        if (str.isEmpty())
-            return null;
+    private enum ReaderState {
+        /** */
+        IDLE,
 
-        int startPos = quoteChars.indexOf(str.charAt(0)) != -1 ? 1 : 0;
-        int endPos = quoteChars.indexOf(str.charAt(str.length() - 1)) != -1 ? str.length() - 1 : str.length();
+        /** */
+        UNQUOTED,
 
-        return str.substring(startPos, endPos);
+        /** */
+        QUOTED
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
index 355095dbbc8..4b001b661fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
@@ -107,6 +107,9 @@ public class SqlKeyword {
     /** Keyword: DECIMAL. */
     public static final String DECIMAL = "DECIMAL";
 
+    /** Keyword: DELIMITER. */
+    public static final String DELIMITER = "DELIMITER";
+
     /** Keyword: DESC. */
     public static final String DESC = "DESC";
 
@@ -191,6 +194,9 @@ public class SqlKeyword {
     /** Keyword: NOT. */
     public static final String NOT = "NOT";
 
+    /** Keyword: NULLSTRING. */
+    public static final String NULLSTRING = "NULLSTRING";
+
     /** Keyword: NUMBER. */
     public static final String NUMBER = "NUMBER";
 
@@ -269,6 +275,9 @@ public class SqlKeyword {
     /** Keyword: TRANSACTION. */
     public static final String TRANSACTION = "TRANSACTION";
 
+    /** Keyword: TRIM. */
+    public static final String TRIM = "TRIM";
+
     /** Keyword: UNIQUE. */
     public static final String UNIQUE = "UNIQUE";
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java
index d50fbbcc6a8..54021bde93d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.command;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.regex.Pattern;
 import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
 import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvFormat;
 import org.apache.ignite.internal.processors.bulkload.BulkLoadFormat;
@@ -26,9 +27,9 @@ import org.apache.ignite.internal.sql.SqlKeyword;
 import org.apache.ignite.internal.sql.SqlLexer;
 import org.apache.ignite.internal.sql.SqlLexerTokenType;
 import org.apache.ignite.internal.util.typedef.internal.S;
-
 import static org.apache.ignite.internal.sql.SqlParserUtils.error;
 import static org.apache.ignite.internal.sql.SqlParserUtils.errorUnexpectedToken;
+import static org.apache.ignite.internal.sql.SqlParserUtils.parseBoolean;
 import static org.apache.ignite.internal.sql.SqlParserUtils.parseIdentifier;
 import static org.apache.ignite.internal.sql.SqlParserUtils.parseInt;
 import static org.apache.ignite.internal.sql.SqlParserUtils.parseQualifiedIdentifier;
@@ -149,9 +150,13 @@ public class SqlBulkLoadCommand implements SqlCommand {
                 fmt.quoteChars(BulkLoadCsvFormat.DEFAULT_QUOTE_CHARS);
                 fmt.commentChars(BulkLoadCsvFormat.DEFAULT_COMMENT_CHARS);
                 fmt.escapeChars(BulkLoadCsvFormat.DEFAULT_ESCAPE_CHARS);
+                fmt.nullString(BulkLoadCsvFormat.DEFAULT_NULL_STRING);
+                fmt.trim(BulkLoadCsvFormat.DEFAULT_TRIM_SPACES);
 
                 parseCsvOptions(lex, fmt);
 
+                validateCsvParserFormat(lex, fmt);
+
                 inputFormat = fmt;
 
                 break;
@@ -181,6 +186,36 @@ public class SqlBulkLoadCommand implements SqlCommand {
                     break;
                 }
 
+                case SqlKeyword.DELIMITER: {
+                    lex.shift();
+
+                    String delimiter = parseString(lex);
+
+                    format.fieldSeparator(Pattern.compile(delimiter));
+
+                    break;
+                }
+
+                case SqlKeyword.TRIM: {
+                    lex.shift();
+
+                    Boolean trim = parseBoolean(lex);
+
+                    format.trim(trim);
+
+                    break;
+                }
+
+                case SqlKeyword.NULLSTRING: {
+                    lex.shift();
+
+                    String nullString = parseString(lex);
+
+                    format.nullString(nullString);
+
+                    break;
+                }
+
                 default:
                     return;
             }
@@ -213,6 +248,25 @@ public class SqlBulkLoadCommand implements SqlCommand {
         }
     }
 
+    /**
+     * Parses the optional parameters.
+     *
+     * @param lex The lexer.
+     * @param format CSV format object to validate.
+     */
+    private void validateCsvParserFormat(SqlLexer lex, BulkLoadCsvFormat format) {
+        String delimiter = format.fieldSeparator().toString();
+        String quoteChars = format.quoteChars();
+
+        if (delimiter.length() > 1 || quoteChars.length() > 1)
+            throw error(lex, "Delimiter or quote chars must consist of single character: delim is '" + delimiter
+                + "', quote char is '" + quoteChars + "'");
+
+        if (delimiter.equals(quoteChars))
+            throw error(lex, "Invalid delimiter or quote chars: delim is '" + delimiter
+                + "', quote char is '" + quoteChars + "'");
+    }
+
     /**
      * Returns the schemaName.
      *
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java
index aec49fa9c4a..3b627e0a327 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java
@@ -58,6 +58,26 @@ public class SqlParserBulkLoadSelfTest extends SqlParserAbstractSelfTest {
             "copy from 'into' into Person (_key, age, firstName, lastName) format csv")
             .nextCommand();
 
+        new SqlParser(null,
+            "copy from 'into' into Person (_key, age, firstName, lastName) format csv delimiter ','")
+            .nextCommand();
+
+        new SqlParser(null,
+            "copy from 'into' into Person (_key, age, firstName, lastName) format csv trim on")
+            .nextCommand();
+
+        new SqlParser(null,
+            "copy from 'into' into Person (_key, age, firstName, lastName) format csv nullstring 'a'")
+            .nextCommand();
+
+        assertParseError(null,
+            "copy from 'into' into Person (_key, age, firstName, lastName) format csv delimiter '\"'",
+            "Invalid delimiter or quote chars: delim is '\"', quote char is '\"'");
+
+        assertParseError(null,
+            "copy from 'into' into Person (_key, age, firstName, lastName) format csv delimiter ',.'",
+            "Delimiter or quote chars must consist of single character: delim is ',.', quote char is '\"'");
+
         assertParseError(null,
             "copy from 'any.file' to Person (_key, age, firstName, lastName) format csv",
             "Unexpected token: \"TO\" (expected: \"INTO\")");
diff --git a/modules/indexing/src/test/resources/bulkload_bad.csv b/modules/indexing/src/test/resources/bulkload_bad.csv
index dc9e6923c15..f24060c591e 100644
--- a/modules/indexing/src/test/resources/bulkload_bad.csv
+++ b/modules/indexing/src/test/resources/bulkload_bad.csv
@@ -1,2 +1,2 @@
-"0xAGFASD", "OK1"
-"alsdkjfasd", "OK2"
\ No newline at end of file
+"0xAGFASD","OK1"
+"alsdkjfasd","OK2"
\ No newline at end of file
diff --git a/modules/indexing/src/test/resources/bulkload_ok.csv b/modules/indexing/src/test/resources/bulkload_ok.csv
index 349b10ba8ea..363dc56e016 100644
--- a/modules/indexing/src/test/resources/bulkload_ok.csv
+++ b/modules/indexing/src/test/resources/bulkload_ok.csv
@@ -1,2 +1,2 @@
-1, "OK1"
-2, "OK2"
\ No newline at end of file
+1,"OK1"
+2,"OK2"
\ No newline at end of file