You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/11/29 20:03:03 UTC

[bahir-flink] branch master updated: Add support "upsert part of columns of a kudu table" (#70)

This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cf8bf2b  Add support "upsert part of columns of a kudu table" (#70)
cf8bf2b is described below

commit cf8bf2ba33364d03e08dad7490ce463adbb204ce
Author: Darcy <33...@qq.com>
AuthorDate: Sat Nov 30 04:02:56 2019 +0800

    Add support "upsert part of columns of a kudu table" (#70)
    
    Sometimes we don't want to upsert all columns of a kudu table.
    So we need to support the function that upsert part of columns
    of a kudu table.
---
 .../flink/connectors/kudu/connector/KuduRow.java   |  4 ++++
 .../kudu/connector/writer/KuduWriter.java          |  5 ++++-
 .../connectors/kudu/batch/KuduOuputFormatTest.java |  2 ++
 .../connectors/kudu/connector/KuduDatabase.java    | 26 ++++++++++++++++++++--
 .../connectors/kudu/streaming/KuduSinkTest.java    |  3 ++-
 5 files changed, 36 insertions(+), 4 deletions(-)

diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
index af78361..78e6e6e 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
@@ -41,6 +41,10 @@ public class KuduRow extends Row {
         return super.getField(rowNames.get(name));
     }
 
+    public boolean hasField(String name) {
+        return rowNames.get(name) != null;
+    }
+
     public void setField(int pos, String name, Object value) {
         super.setField(pos, value);
         this.rowNames.put(name, pos);
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index f4e2a8a..57c0741 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -152,7 +152,10 @@ public class KuduWriter implements AutoCloseable {
 
         table.getSchema().getColumns().forEach(column -> {
             String columnName = column.getName();
-            Object value = row.getField(column.getName());
+            if (!row.hasField(columnName)) {
+                return;
+            }
+            Object value = row.getField(columnName);
 
             if (value == null) {
                 partialRow.setNull(columnName);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
index 963a8c0..f14eaa0 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
@@ -71,6 +71,7 @@ class KuduOuputFormatTest extends KuduDatabase {
 
         List<KuduRow> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
+        kuduRowsTest(rows);
 
         cleanDatabase(tableInfo);
     }
@@ -99,6 +100,7 @@ class KuduOuputFormatTest extends KuduDatabase {
 
         List<KuduRow> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
+        kuduRowsTest(rows);
 
         cleanDatabase(tableInfo);
     }
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
index 3d02a1d..cda8c21 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
@@ -56,14 +56,16 @@ public class KuduDatabase {
                 .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
                 .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
                 .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
-                .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
-                .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+                .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).asNullable().build())
+                .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).asNullable().build())
                 .build();
     }
 
     protected static List<KuduRow> booksDataRow() {
         return Arrays.stream(booksTableData)
                 .map(row -> {
+                    Integer rowId = (Integer)row[0];
+                    if (rowId % 2 == 1) {
                         KuduRow values = new KuduRow(5);
                         values.setField(0, "id", row[0]);
                         values.setField(1, "title", row[1]);
@@ -71,6 +73,13 @@ public class KuduDatabase {
                         values.setField(3, "price", row[3]);
                         values.setField(4, "quantity", row[4]);
                         return values;
+                    } else {
+                        KuduRow values = new KuduRow(3);
+                        values.setField(0, "id", row[0]);
+                        values.setField(1, "title", row[1]);
+                        values.setField(2, "author", row[2]);
+                        return values;
+                    }
                 })
                 .collect(Collectors.toList());
     }
@@ -126,4 +135,17 @@ public class KuduDatabase {
         return rows;
     }
 
+    protected void kuduRowsTest(List<KuduRow> rows) {
+        for (KuduRow row: rows) {
+            Integer rowId = (Integer)row.getField("id");
+            if (rowId % 2 == 1) {
+                Assertions.assertNotEquals(null, row.getField("price"));
+                Assertions.assertNotEquals(null, row.getField("quantity"));
+            }
+            else {
+                Assertions.assertNull(row.getField("price"));
+                Assertions.assertNull(row.getField("quantity"));
+            }
+        }
+    }
 }
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index ea49a91..38d0115 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -88,7 +88,7 @@ class KuduSinkTest extends KuduDatabase {
 
         List<KuduRow> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
-
+        kuduRowsTest(rows);
     }
 
     @Test
@@ -116,6 +116,7 @@ class KuduSinkTest extends KuduDatabase {
 
         List<KuduRow> rows = readRows(tableInfo);
         Assertions.assertEquals(5, rows.size());
+        kuduRowsTest(rows);
     }