You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/11/29 20:03:03 UTC
[bahir-flink] branch master updated: Add support "upsert part of
columns of a kudu table" (#70)
This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new cf8bf2b Add support "upsert part of columns of a kudu table" (#70)
cf8bf2b is described below
commit cf8bf2ba33364d03e08dad7490ce463adbb204ce
Author: Darcy <33...@qq.com>
AuthorDate: Sat Nov 30 04:02:56 2019 +0800
Add support "upsert part of columns of a kudu table" (#70)
Sometimes we don't want to upsert all columns of a kudu table.
So we need to support the function that upsert part of columns
of a kudu table.
---
.../flink/connectors/kudu/connector/KuduRow.java | 4 ++++
.../kudu/connector/writer/KuduWriter.java | 5 ++++-
.../connectors/kudu/batch/KuduOuputFormatTest.java | 2 ++
.../connectors/kudu/connector/KuduDatabase.java | 26 ++++++++++++++++++++--
.../connectors/kudu/streaming/KuduSinkTest.java | 3 ++-
5 files changed, 36 insertions(+), 4 deletions(-)
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
index af78361..78e6e6e 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduRow.java
@@ -41,6 +41,10 @@ public class KuduRow extends Row {
return super.getField(rowNames.get(name));
}
+ public boolean hasField(String name) {
+ return rowNames.get(name) != null;
+ }
+
public void setField(int pos, String name, Object value) {
super.setField(pos, value);
this.rowNames.put(name, pos);
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index f4e2a8a..57c0741 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -152,7 +152,10 @@ public class KuduWriter implements AutoCloseable {
table.getSchema().getColumns().forEach(column -> {
String columnName = column.getName();
- Object value = row.getField(column.getName());
+ if (!row.hasField(columnName)) {
+ return;
+ }
+ Object value = row.getField(columnName);
if (value == null) {
partialRow.setNull(columnName);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
index 963a8c0..f14eaa0 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java
@@ -71,6 +71,7 @@ class KuduOuputFormatTest extends KuduDatabase {
List<KuduRow> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
+ kuduRowsTest(rows);
cleanDatabase(tableInfo);
}
@@ -99,6 +100,7 @@ class KuduOuputFormatTest extends KuduDatabase {
List<KuduRow> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
+ kuduRowsTest(rows);
cleanDatabase(tableInfo);
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
index 3d02a1d..cda8c21 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduDatabase.java
@@ -56,14 +56,16 @@ public class KuduDatabase {
.addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
.addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
.addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
- .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
- .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+ .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).asNullable().build())
+ .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).asNullable().build())
.build();
}
protected static List<KuduRow> booksDataRow() {
return Arrays.stream(booksTableData)
.map(row -> {
+ Integer rowId = (Integer)row[0];
+ if (rowId % 2 == 1) {
KuduRow values = new KuduRow(5);
values.setField(0, "id", row[0]);
values.setField(1, "title", row[1]);
@@ -71,6 +73,13 @@ public class KuduDatabase {
values.setField(3, "price", row[3]);
values.setField(4, "quantity", row[4]);
return values;
+ } else {
+ KuduRow values = new KuduRow(3);
+ values.setField(0, "id", row[0]);
+ values.setField(1, "title", row[1]);
+ values.setField(2, "author", row[2]);
+ return values;
+ }
})
.collect(Collectors.toList());
}
@@ -126,4 +135,17 @@ public class KuduDatabase {
return rows;
}
+ protected void kuduRowsTest(List<KuduRow> rows) {
+ for (KuduRow row: rows) {
+ Integer rowId = (Integer)row.getField("id");
+ if (rowId % 2 == 1) {
+ Assertions.assertNotEquals(null, row.getField("price"));
+ Assertions.assertNotEquals(null, row.getField("quantity"));
+ }
+ else {
+ Assertions.assertNull(row.getField("price"));
+ Assertions.assertNull(row.getField("quantity"));
+ }
+ }
+ }
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index ea49a91..38d0115 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -88,7 +88,7 @@ class KuduSinkTest extends KuduDatabase {
List<KuduRow> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
-
+ kuduRowsTest(rows);
}
@Test
@@ -116,6 +116,7 @@ class KuduSinkTest extends KuduDatabase {
List<KuduRow> rows = readRows(tableInfo);
Assertions.assertEquals(5, rows.size());
+ kuduRowsTest(rows);
}