You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/06/19 07:19:23 UTC

[nifi] branch master updated: NIFI-7551 Add support for VARCHAR to Kudu NAR bundle - update Kudu dependencies to Kudu 1.12.0 - add VARCHAR to Kudu Lookup Service and Processor - add tests for VARCHAR columns

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new a1b245e  NIFI-7551 Add support for VARCHAR to Kudu NAR bundle  - update Kudu dependencies to Kudu 1.12.0  - add VARCHAR to Kudu Lookup Service and Processor  - add tests for VARCHAR columns
a1b245e is described below

commit a1b245e051245bb6c65e7b5ffc6ee982669b7ab7
Author: Greg Solovyev <fi...@gmail.com>
AuthorDate: Wed Jun 17 18:08:32 2020 -0700

    NIFI-7551 Add support for VARCHAR to Kudu NAR bundle
     - update Kudu dependencies to Kudu 1.12.0
     - add VARCHAR to Kudu Lookup Service and Processor
     - add tests for VARCHAR columns
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4347.
---
 .../nifi-kudu-controller-service/pom.xml           |  2 +-
 .../nifi/controller/kudu/KuduLookupService.java    |  1 +
 .../controller/kudu/ITestKuduLookupService.java    |  8 ++++++
 .../nifi-kudu-bundle/nifi-kudu-processors/pom.xml  |  2 +-
 .../processors/kudu/AbstractKuduProcessor.java     |  3 +++
 .../org/apache/nifi/processors/kudu/ITPutKudu.java |  9 +++++--
 .../apache/nifi/processors/kudu/TestPutKudu.java   | 31 +++++++++++++++-------
 7 files changed, 42 insertions(+), 14 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
index 2572bae..776f8f7 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
@@ -27,7 +27,7 @@
 
     <properties>
         <exclude.tests>None</exclude.tests>
-        <kudu.version>1.10.0</kudu.version>
+        <kudu.version>1.12.0</kudu.version>
     </properties>
     <build>
         <extensions>
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
index bc59b12..406c6e9 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -321,6 +321,7 @@ public class KuduLookupService extends AbstractControllerService implements Reco
                     break;
                 case BINARY:
                 case STRING:
+                case VARCHAR:
                     fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
                     break;
                 case DOUBLE:
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java
index ae995af..93bf7d7 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/ITestKuduLookupService.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.kudu;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.CreateTableOptions;
@@ -99,6 +100,9 @@ public class ITestKuduLookupService {
         columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("varchar_3", Type.VARCHAR).typeAttributes(
+                new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build()
+        ).build());
         Schema schema = new Schema(columns);
 
         CreateTableOptions opts = new CreateTableOptions().setRangePartitionColumns(Collections.singletonList("string"));
@@ -120,6 +124,7 @@ public class ITestKuduLookupService {
         row.addInt("int32",3);
         row.addLong("int64",4L);
         row.addTimestamp("unixtime_micros", new Timestamp(nowMillis));
+        row.addVarchar("varchar_3", "SFO");
         session.apply(insert);
 
         insert = table.newInsert();
@@ -135,6 +140,7 @@ public class ITestKuduLookupService {
         row.addInt("int32",13);
         row.addLong("int64",14L);
         row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year
+        row.addVarchar("varchar_3", "SJC");
         session.apply(insert);
 
         session.close();
@@ -198,6 +204,7 @@ public class ITestKuduLookupService {
         map.put("int32",3);
         map.put("int64",4L);
         map.put("unixtime_micros", new Timestamp(nowMillis));
+        map.put("varchar_3", "SFO");
         Record result = kuduLookupService.lookup(map).get();
         validateRow1(result);
     }
@@ -229,6 +236,7 @@ public class ITestKuduLookupService {
         assertEquals(3, (int)result.getAsInt("int32"));
         assertEquals(4L, (long)result.getAsLong("int64"));
         assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros"));
+        assertEquals("SFO", result.getValue("varchar_3"));
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 5c1e752..1bddd07 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -26,7 +26,7 @@
 
     <properties>
         <exclude.tests>None</exclude.tests>
-        <kudu.version>1.10.0</kudu.version>
+        <kudu.version>1.12.0</kudu.version>
     </properties>
     <build>
         <extensions>
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 36268d5..ea82d85 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -349,6 +349,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
                         case DECIMAL:
                             row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
                             break;
+                        case VARCHAR:
+                            row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName));
+                            break;
                         default:
                             throw new IllegalStateException(String.format("unknown column type %s", colType));
                     }
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
index 1be21cb..7d6aa0a 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.kudu;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.CreateTableOptions;
@@ -103,6 +104,9 @@ public class ITPutKudu {
         List<ColumnSchema> columns = new ArrayList<>();
         columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", Type.STRING).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("varcharval", Type.VARCHAR).typeAttributes(
+                new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(256).build()
+        ).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build());
         columns.add(new ColumnSchema.ColumnSchemaBuilder("timestampval", Type.UNIXTIME_MICROS).build());
         Schema schema = new Schema(columns);
@@ -115,6 +119,7 @@ public class ITPutKudu {
         readerFactory = new MockRecordParser();
         readerFactory.addSchemaField("id", RecordFieldType.INT);
         readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
+        readerFactory.addSchemaField("varcharval", RecordFieldType.STRING);
         readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
         readerFactory.addSchemaField("timestampVal", RecordFieldType.TIMESTAMP);
         // Add two extra columns to test handleSchemaDrift = true.
@@ -122,7 +127,7 @@ public class ITPutKudu {
         readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
 
         for (int i = 0; i < numOfRecord; i++) {
-            readerFactory.addRecord(i, "val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i);
+            readerFactory.addRecord(i, "val_" + i, "varchar_val_" + i, 1000 + i, NOW, 100.88 + i, 100.88 + i);
         }
 
         testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -188,7 +193,7 @@ public class ITPutKudu {
         KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
 
         // Verify the extra field was added.
-        Assert.assertEquals(6, kuduTable.getSchema().getColumnCount());
+        Assert.assertEquals(7, kuduTable.getSchema().getColumnCount());
         Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
         Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
 
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 97e3e3d..da42ec8 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -417,49 +417,55 @@ public class TestPutKudu {
 
     @Test
     public void testBuildRow() {
-        buildPartialRow((long) 1, "foo", (short) 10, "id", "id", false);
+        buildPartialRow((long) 1, "foo", (short) 10, "id", "id", "SFO",false);
     }
 
     @Test
     public void testBuildPartialRowNullable() {
-        buildPartialRow((long) 1, null, (short) 10, "id", "id",  false);
+        buildPartialRow((long) 1, null, (short) 10, "id", "id", null, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowNullPrimaryKey() {
-        buildPartialRow(null, "foo", (short) 10, "id", "id",  false);
+        buildPartialRow(null, "foo", (short) 10, "id", "id", "SFO", false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowNotNullable() {
-        buildPartialRow((long) 1, "foo", null, "id", "id", false);
+        buildPartialRow((long) 1, "foo", null, "id", "id", "SFO",false);
     }
 
     @Test
     public void testBuildPartialRowLowercaseFields() {
-        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", true);
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",true);
         row.getLong("id");
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowLowercaseFieldsFalse() {
-        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", false);
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "SFO",false);
         row.getLong("id");
     }
 
     @Test
     public void testBuildPartialRowLowercaseFieldsKuduUpper() {
-        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", false);
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", false);
         row.getLong("ID");
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowLowercaseFieldsKuduUpperFail() {
-        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", true);
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", "SFO", true);
         row.getLong("ID");
     }
 
-    private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, Boolean lowercaseFields) {
+    @Test
+    public void testBuildPartialRowVarCharTooLong() {
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", "San Francisco", true);
+        Assert.assertEquals("Kudu client should truncate VARCHAR value to expected length", "San", row.getVarchar("airport_code"));
+    }
+
+    private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, String airport_code, Boolean lowercaseFields) {
         final Schema kuduSchema = new Schema(Arrays.asList(
             new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
             new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
@@ -467,6 +473,9 @@ public class TestPutKudu {
             new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
             new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes(
                 new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
+            ).build(),
+            new ColumnSchema.ColumnSchemaBuilder("airport_code", Type.VARCHAR).nullable(true).typeAttributes(
+                    new ColumnTypeAttributes.ColumnTypeAttributesBuilder().length(3).build()
             ).build()));
 
         final RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
@@ -474,7 +483,8 @@ public class TestPutKudu {
             new RecordField("name", RecordFieldType.STRING.getDataType()),
             new RecordField("age", RecordFieldType.SHORT.getDataType()),
             new RecordField("updated_at", RecordFieldType.TIMESTAMP.getDataType()),
-            new RecordField("score", RecordFieldType.LONG.getDataType())));
+            new RecordField("score", RecordFieldType.LONG.getDataType()),
+            new RecordField("airport_code", RecordFieldType.STRING.getDataType())));
 
         Map<String, Object> values = new HashMap<>();
         PartialRow row = kuduSchema.newPartialRow();
@@ -483,6 +493,7 @@ public class TestPutKudu {
         values.put("age", age);
         values.put("updated_at", new Timestamp(System.currentTimeMillis()));
         values.put("score", 10000L);
+        values.put("airport_code", airport_code);
         processor.buildPartialRow(
             kuduSchema,
             row,