You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/09/28 13:08:20 UTC

[nifi] branch main updated: NIFI-7565 Add support for DATE to Kudu NAR bundle - update Kudu dependencies to Kudu 1.13.0 - add support for passing java.sql.Date for Kudu DATE columns - add tests for passing java.sql.Date to DATE columns

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e093172  NIFI-7565 Add support for DATE to Kudu NAR bundle - update Kudu dependencies to Kudu 1.13.0 - add support for passing java.sql.Date for Kudu DATE columns - add tests for passing java.sql.Date to DATE columns
e093172 is described below

commit e09317223e0ec0571e16247691497daf1cdbb736
Author: Greg Solovyev <fi...@gmail.com>
AuthorDate: Mon Jun 22 20:32:23 2020 -0700

    NIFI-7565 Add support for DATE to Kudu NAR bundle
    - update Kudu dependencies to Kudu 1.13.0
    - add support for passing java.sql.Date for Kudu DATE columns
    - add tests for passing java.sql.Date to DATE columns
    
    more about DATE type support in Kudu:
    https://issues.apache.org/jira/browse/KUDU-2632
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4549.
---
 .../nifi-kudu-controller-service/pom.xml           |   2 +-
 .../nifi/controller/kudu/KuduLookupService.java    |   3 +
 .../controller/kudu/ITestKuduLookupService.java    |  11 +-
 .../nifi-kudu-bundle/nifi-kudu-processors/pom.xml  |   2 +-
 .../processors/kudu/AbstractKuduProcessor.java     |   6 +-
 .../org/apache/nifi/processors/kudu/ITPutKudu.java |  12 +-
 .../apache/nifi/processors/kudu/TestPutKudu.java   | 143 ++++++++++++---------
 7 files changed, 110 insertions(+), 69 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
index d40fdb7..15f0c50 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
@@ -27,7 +27,7 @@
 
     <properties>
         <exclude.tests>None</exclude.tests>
-        <kudu.version>1.12.0</kudu.version>
+        <kudu.version>1.13.0</kudu.version>
     </properties>
     <build>
         <extensions>
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
index 406c6e9..93b0ce6 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -333,6 +333,9 @@ public class KuduLookupService extends AbstractControllerService implements Reco
                 case FLOAT:
                     fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType()));
                     break;
+                case DATE:
+                    fields.add(new RecordField(cs.getName(), RecordFieldType.DATE.getDataType()));
+                    break;
             }
         }
         return new SimpleRecordSchema(fields);
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java
index 93bf7d7..4ca3546 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java
@@ -67,9 +67,12 @@ public class ITestKuduLookupService {
                 .addTabletServerFlag("--use_hybrid_clock=false")
     );
     private TestRunner testRunner;
-    private long nowMillis = System.currentTimeMillis();
     private KuduLookupService kuduLookupService;
 
+    private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis());
+    private final java.sql.Date pastDate = java.sql.Date.valueOf("2019-01-01");
+    private long nowMillis = System.currentTimeMillis();
+
     public static class SampleProcessor extends AbstractProcessor {
         @Override
         public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@@ -100,6 +103,7 @@ public class ITestKuduLookupService {
         columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("sql_date", Type.DATE).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("varchar_3", Type.VARCHAR).typeAttributes(
                 new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build()
         ).build());
@@ -124,6 +128,7 @@ public class ITestKuduLookupService {
         row.addInt("int32",3);
         row.addLong("int64",4L);
         row.addTimestamp("unixtime_micros", new Timestamp(nowMillis));
+        row.addDate("sql_date", today);
         row.addVarchar("varchar_3", "SFO");
         session.apply(insert);
 
@@ -140,6 +145,7 @@ public class ITestKuduLookupService {
         row.addInt("int32",13);
         row.addLong("int64",14L);
         row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year
+        row.addDate("sql_date", pastDate);
         row.addVarchar("varchar_3", "SJC");
         session.apply(insert);
 
@@ -204,6 +210,7 @@ public class ITestKuduLookupService {
         map.put("int32",3);
         map.put("int64",4L);
         map.put("unixtime_micros", new Timestamp(nowMillis));
+        map.put("sql_date", today);
         map.put("varchar_3", "SFO");
         Record result = kuduLookupService.lookup(map).get();
         validateRow1(result);
@@ -224,7 +231,6 @@ public class ITestKuduLookupService {
         assertEquals(true, result.getAsBoolean("bool"));
     }
     private void validateRow1(Record result){
-
         assertEquals("string1", result.getAsString("string"));
         assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary"));
         assertEquals(true, result.getAsBoolean("bool"));
@@ -237,6 +243,7 @@ public class ITestKuduLookupService {
         assertEquals(4L, (long)result.getAsLong("int64"));
         assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros"));
         assertEquals("SFO", result.getValue("varchar_3"));
+        assertEquals(today.toString(), result.getValue("sql_date").toString());
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index cba2878..34dea1e 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -26,7 +26,7 @@
 
     <properties>
         <exclude.tests>None</exclude.tests>
-        <kudu.version>1.12.0</kudu.version>
+        <kudu.version>1.13.0</kudu.version>
     </properties>
     <build>
         <extensions>
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index ea82d85..520fca3 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -299,7 +299,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
             if (colIdx != -1) {
                 ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
                 Type colType = colSchema.getType();
-
                 if (record.getValue(recordFieldName) == null) {
                     if (schema.getColumnByIndex(colIdx).isKey()) {
                         throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
@@ -352,6 +351,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
                         case VARCHAR:
                             row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName));
                             break;
+                        case DATE:
+                            row.addDate(colIdx, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName));
+                            break;
                         default:
                             throw new IllegalStateException(String.format("unknown column type %s", colType));
                     }
@@ -386,6 +388,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
             case CHAR:
             case STRING:
                 return Type.STRING;
+            case DATE:
+                return Type.DATE;
             default:
                 throw new IllegalArgumentException(String.format("unsupported type %s", nifiType));
         }
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
index 7d6aa0a..b6fa409 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
@@ -76,6 +76,8 @@ public class ITPutKudu {
 
     private MockRecordParser readerFactory;
 
+    private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis());
+
     @Before
     public void setUp() throws Exception {
         processor = new PutKudu();
@@ -109,6 +111,7 @@ public class ITPutKudu {
         ).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("dateval", Type.DATE).build());
         Schema schema = new Schema(columns);
         CreateTableOptions opts = new CreateTableOptions()
             .addHashPartitions(Collections.singletonList("id"), 4);
@@ -122,12 +125,13 @@ public class ITPutKudu {
         readerFactory.addSchemaField("varcharval", RecordFieldType.STRING);
         readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
         readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP);
+        readerFactory.addSchemaField("dateval", RecordFieldType.DATE);
         // Add two extra columns to test handleSchemaDrift = true.
         readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
         readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
 
         for (int i = 0; i < numOfRecord; i++) {
-            readerFactory.addRecord(i, "val_" + i, "varchar_val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i);
+            readerFactory.addRecord(i, "val_" + i, "varchar_val_" + i, 1000 + i, NOW, today, 100.88 + i, 100.88 + i);
         }
 
         testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -193,7 +197,7 @@ public class ITPutKudu {
         KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
 
         // Verify the extra field was added.
-        Assert.assertEquals(7, kuduTable.getSchema().getColumnCount());
+        Assert.assertEquals(8, kuduTable.getSchema().getColumnCount());
         Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
         Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
 
@@ -202,6 +206,10 @@ public class ITPutKudu {
         int count = 0;
         for (RowResult row : scanner) {
             Assert.assertEquals(NOW, row.getTimestamp("timestampval"));
+            // Comparing string representations, because java.sql.Date does not override
+            // java.util.Date.equals method and therefore compares milliseconds instead of
+            // comparing dates, even though java.sql.Date is supposed to ignore time
+            Assert.assertEquals(today.toString(), row.getDate("dateval").toString());
             count++;
         }
         Assert.assertEquals(recordCount, count);
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index da42ec8..0c739ea 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -86,7 +86,7 @@ public class TestPutKudu {
     public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
     public static final String DEFAULT_MASTERS = "testLocalHost:7051";
     public static final String SKIP_HEAD_LINE = "false";
-    public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal";
+    public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal,dateVal";
 
     private TestRunner testRunner;
 
@@ -94,6 +94,8 @@ public class TestPutKudu {
 
     private MockRecordParser readerFactory;
 
+    private final java.sql.Date today = new java.sql.Date(System.currentTimeMillis());
+
     @Before
     public void setUp() throws InitializationException {
         processor = new MockPutKudu();
@@ -124,9 +126,10 @@ public class TestPutKudu {
         readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
         readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
         readerFactory.addSchemaField(new RecordField("decimalVal", RecordFieldType.DECIMAL.getDecimalDataType(6, 3)));
-
-        for (int i=0; i < numOfRecord; i++) {
-            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, new BigDecimal(111.111D).add(BigDecimal.valueOf(i)));
+        readerFactory.addSchemaField("dateVal", RecordFieldType.DATE);
+        for (int i = 0; i < numOfRecord; i++) {
+            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i,
+                    new BigDecimal(111.111D).add(BigDecimal.valueOf(i)), today);
         }
 
         testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -165,7 +168,7 @@ public class TestPutKudu {
 
         final String filename = "testWriteKudu-" + System.currentTimeMillis();
 
-        final Map<String,String> flowFileAttributes = new HashMap<>();
+        final Map<String, String> flowFileAttributes = new HashMap<>();
         flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
 
         testRunner.enqueue("trigger", flowFileAttributes);
@@ -238,7 +241,7 @@ public class TestPutKudu {
 
         final String filename = "testInvalidAvroShouldRouteToFailure-" + System.currentTimeMillis();
 
-        final Map<String,String> flowFileAttributes = new HashMap<>();
+        final Map<String, String> flowFileAttributes = new HashMap<>();
         flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
 
         testRunner.enqueue("trigger", flowFileAttributes);
@@ -252,7 +255,7 @@ public class TestPutKudu {
         final String filename = "testValidSchemaShouldBeSuccessful-" + System.currentTimeMillis();
 
         // don't provide my.schema as an attribute
-        final Map<String,String> flowFileAttributes = new HashMap<>();
+        final Map<String, String> flowFileAttributes = new HashMap<>();
         flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
         flowFileAttributes.put("my.schema", TABLE_SCHEMA);
 
@@ -268,7 +271,7 @@ public class TestPutKudu {
         createRecordReader(5);
         final String filename = "testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed-" + System.currentTimeMillis();
 
-        final Map<String,String> flowFileAttributes = new HashMap<>();
+        final Map<String, String> flowFileAttributes = new HashMap<>();
         flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
 
         testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, "true");
@@ -298,7 +301,7 @@ public class TestPutKudu {
 
         final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis();
 
-        final Map<String,String> flowFileAttributes = new HashMap<>();
+        final Map<String, String> flowFileAttributes = new HashMap<>();
         flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
 
         testRunner.enqueue("trigger", flowFileAttributes);
@@ -310,11 +313,11 @@ public class TestPutKudu {
     public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException {
         createRecordReader(0);
         // add the favorite color as a string
-        readerFactory.addRecord(1, "name0", "0", "89.89", "111.111");
+        readerFactory.addRecord(1, "name0", "0", "89.89", "111.111", today);
 
         final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis();
 
-        final Map<String,String> flowFileAttributes = new HashMap<>();
+        final Map<String, String> flowFileAttributes = new HashMap<>();
         flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
 
         testRunner.enqueue("trigger", flowFileAttributes);
@@ -325,11 +328,11 @@ public class TestPutKudu {
     @Test
     public void testMissingColumInReader() throws InitializationException, IOException {
         createRecordReader(0);
-        readerFactory.addRecord( "name0", "0", "89.89"); //missing id
+        readerFactory.addRecord("name0", "0", "89.89"); //missing id
 
         final String filename = "testMissingColumInReader-" + System.currentTimeMillis();
 
-        final Map<String,String> flowFileAttributes = new HashMap<>();
+        final Map<String, String> flowFileAttributes = new HashMap<>();
         flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
 
         testRunner.enqueue("trigger", flowFileAttributes);
@@ -384,7 +387,7 @@ public class TestPutKudu {
         createRecordReader(50);
         testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.record.delete}");
 
-        final Map<String,String> attributes = new HashMap<>();
+        final Map<String, String> attributes = new HashMap<>();
         attributes.put("kudu.record.delete", "DELETE");
 
         testRunner.enqueue("string".getBytes(), attributes);
@@ -402,7 +405,7 @@ public class TestPutKudu {
         createRecordReader(50);
         testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.record.update}");
 
-        final Map<String,String> attributes = new HashMap<>();
+        final Map<String, String> attributes = new HashMap<>();
         attributes.put("kudu.record.update", "UPDATE");
 
         testRunner.enqueue("string".getBytes(), attributes);
@@ -417,74 +420,89 @@ public class TestPutKudu {
 
     @Test
     public void testBuildRow() {
-        buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO",false);
+        buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO", null, false);
     }
 
     @Test
     public void testBuildPartialRowNullable() {
-        buildPartialRow((long) 1, null, (short) 10, "id", "id", null, false);
+        buildPartialRow((long) 1, null, (short) 10, "id", "id", null, null, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowNullPrimaryKey() {
-        buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", false);
+        buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", null, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowNotNullable() {
-        buildPartialRow((long) 1, "foo", null, "id", "id", "SFO",false);
+        buildPartialRow((long) 1, "foo", null, "id", "id", "SFO", null, false);
     }
 
     @Test
     public void testBuildPartialRowLowercaseFields() {
-        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",true);
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO", null, true);
         row.getLong("id");
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowLowercaseFieldsFalse() {
-        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",false);
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO", null, false);
         row.getLong("id");
     }
 
     @Test
     public void testBuildPartialRowLowercaseFieldsKuduUpper() {
-        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", false);
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", null, false);
         row.getLong("ID");
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowLowercaseFieldsKuduUpperFail() {
-        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", true);
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", null, true);
         row.getLong("ID");
     }
 
     @Test
     public void testBuildPartialRowVarCharTooLong() {
-        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", true);
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", null, true);
         Assert.assertEquals("Kudu client should truncate VARCHAR value to expected length", "San", row.getVarchar("airport_code"));
     }
 
-    private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, Boolean lowercaseFields) {
+    @Test
+    public void testBuildPartialRowWithDate() {
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", today, true);
+        // Comparing string representations of dates, because java.sql.Date does not override
+        // java.util.Date.equals method and therefore compares milliseconds instead of
+        // comparing dates, even though java.sql.Date is supposed to ignore time
+        Assert.assertEquals(String.format("Expecting the date to be %s, but got %s", today.toString(), row.getDate("sql_date").toString()),
+                row.getDate("sql_date").toString(), today.toString());
+    }
+
+    private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields) {
         final Schema kuduSchema = new Schema(Arrays.asList(
-            new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
-            new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
-            new ColumnSchema.ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
-            new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
-            new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes(
-                new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
-            ).build(),
-            new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes(
-                    new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build()
-            ).build()));
+                new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
+                new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
+                new ColumnSchema.ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
+                new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
+                new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes(
+                        new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
+                ).build(),
+                new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes(
+                        new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build()
+                ).build(),
+                new ColumnSchema.ColumnSchemaBuilder("sql_date", Type.DATE).nullable(true).build()
+        ));
+
 
         final RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
-            new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()),
-            new RecordField("name", RecordFieldType.STRING.getDataType()),
-            new RecordField("age", RecordFieldType.SHORT.getDataType()),
-            new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()),
-            new RecordField("score", RecordFieldType.LONG.getDataType()),
-            new RecordField("airport_code", RecordFieldType.STRING.getDataType())));
+                new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()),
+                new RecordField("name", RecordFieldType.STRING.getDataType()),
+                new RecordField("age", RecordFieldType.SHORT.getDataType()),
+                new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()),
+                new RecordField("score", RecordFieldType.LONG.getDataType()),
+                new RecordField("airport_code", RecordFieldType.STRING.getDataType()),
+                new RecordField("sql_date", RecordFieldType.DATE.getDataType())
+        ));
 
         Map<String, Object> values = new HashMap<>();
         PartialRow row = kuduSchema.newPartialRow();
@@ -494,13 +512,14 @@ public class TestPutKudu {
         values.put("updated_at", new Timestamp(System.currentTimeMillis()));
         values.put("score", 10000L);
         values.put("airport_code", airport_code);
+        values.put("sql_date", sql_date);
         processor.buildPartialRow(
-            kuduSchema,
-            row,
-            new MapRecord(schema, values),
-            schema.getFieldNames(),
+                kuduSchema,
+                row,
+                new MapRecord(schema, values),
+                schema.getFieldNames(),
                 true,
-            lowercaseFields
+                lowercaseFields
         );
         return row;
     }
@@ -555,17 +574,17 @@ public class TestPutKudu {
     private void testKuduPartialFailure(FlushMode flushMode, int batchSize) throws Exception {
         final int numFlowFiles = 4;
         final int numRecordsPerFlowFile = 3;
-        final ResultCode[][] flowFileResults = new ResultCode[][] {
-            new ResultCode[]{OK, OK, FAIL},
+        final ResultCode[][] flowFileResults = new ResultCode[][]{
+                new ResultCode[]{OK, OK, FAIL},
 
-            // The last operation will not be submitted to Kudu if flush mode is AUTO_FLUSH_SYNC
-            new ResultCode[]{OK, FAIL, OK},
+                // The last operation will not be submitted to Kudu if flush mode is AUTO_FLUSH_SYNC
+                new ResultCode[]{OK, FAIL, OK},
 
-            // Everything's okay
-            new ResultCode[]{OK, OK, OK},
+                // Everything's okay
+                new ResultCode[]{OK, OK, OK},
 
-            // The last operation will not be submitted due to an exception from apply() call
-            new ResultCode[]{OK, EXCEPTION, OK},
+                // The last operation will not be submitted due to an exception from apply() call
+                new ResultCode[]{OK, EXCEPTION, OK},
         };
 
         KuduSession session = mock(KuduSession.class);
@@ -596,10 +615,10 @@ public class TestPutKudu {
 
                     RowErrorsAndOverflowStatus pendingErrorResponse = mock(RowErrorsAndOverflowStatus.class);
                     RowError[] rowErrors = slice.stream()
-                        .flatMap(List::stream)
-                        .filter(OperationResponse::hasRowError)
-                        .map(OperationResponse::getRowError)
-                        .toArray(RowError[]::new);
+                            .flatMap(List::stream)
+                            .filter(OperationResponse::hasRowError)
+                            .map(OperationResponse::getRowError)
+                            .toArray(RowError[]::new);
                     when(pendingErrorResponse.getRowErrors()).thenReturn(rowErrors);
                     pendingErrorResponses.add(pendingErrorResponse);
 
@@ -665,10 +684,10 @@ public class TestPutKudu {
     }
 
     private void testKuduPartialFailure(FlushMode flushMode) throws Exception {
-      // Test against different batch sizes (up until the point where every record can be buffered at once)
-      for (int i = 1; i <= 11; i++) {
-          testKuduPartialFailure(flushMode, i);
-      }
+        // Test against different batch sizes (up until the point where every record can be buffered at once)
+        for (int i = 1; i <= 11; i++) {
+            testKuduPartialFailure(flushMode, i);
+        }
     }
 
     @Test