You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ferenc-csaky (via GitHub)" <gi...@apache.org> on 2023/09/27 17:38:45 UTC

[GitHub] [flink-connector-hbase] ferenc-csaky commented on a diff in pull request #21: [FLINK-33164] Support write option sink.ignore-null-value

ferenc-csaky commented on code in PR #21:
URL: https://github.com/apache/flink-connector-hbase/pull/21#discussion_r1338968517


##########
flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java:
##########
@@ -260,6 +260,19 @@ public void testBufferFlushOptions() {
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testSinkIgnoreNullValueOptions() {
+        Map<String, String> options = getAllOptions();
+        options.put("sink.ignore-null-value", "true");
+
+        ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING()));
+
+        DynamicTableSink sink = createTableSink(schema, options);
+        HBaseWriteOptions expected = HBaseWriteOptions.builder().setIgnoreNullValue(true).build();
+        HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions();
+        assertEquals(expected.isIgnoreNullValue(), actual.isIgnoreNullValue());

Review Comment:
   I think instantiating a dummy expected object can be avoided, we can simply use `assertThat(actual.isIgnoreNullValue()).isTrue()`.



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java:
##########
@@ -31,12 +31,7 @@
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
+import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.*;

Review Comment:
   I suggest to avoid wildard imports.



##########
flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java:
##########
@@ -99,7 +104,57 @@ void convertToReusedRowTest() {
                         "+I(2,+I(20),+I(Hello-2,200),+I(2.02,true,Welt-2))");
     }
 
-    private HBaseSerde createHBaseSerde() {
+    @Test
+    public void writeIgnoreNullValueTest() {
+        HBaseSerde serde = createHBaseSerde(false);
+        Put m1 = serde.createPutMutation(prepareRowData());
+        assert m1 != null;
+        Assert.assertEquals(10, Bytes.toInt(m1.getRow()));

Review Comment:
   Pls. use `Assert4J`'s `assertThat` for all kinds of asserts. The community agreed that it should be the standard going forward as part of the [JUnit5 migration](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit#heading=h.n2pt1kpwyals). It is not applied completely to this repo yet, but IMO it makes sense to conform with that if we touch some tests.



##########
flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java:
##########
@@ -42,18 +42,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_TTL;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_MAX_RETRIES;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.NULL_STRING_LITERAL;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
+import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.*;

Review Comment:
   I suggest to avoid wildard imports.



##########
flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java:
##########
@@ -256,6 +256,19 @@ public void testBufferFlushOptions() {
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testSinkIgnoreNullValueOptions() {
+        Map<String, String> options = getAllOptions();
+        options.put("sink.ignore-null-value", "true");
+
+        ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING()));
+
+        DynamicTableSink sink = createTableSink(schema, options);
+        HBaseWriteOptions expected = HBaseWriteOptions.builder().setIgnoreNullValue(true).build();
+        HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions();
+        assertEquals(expected.isIgnoreNullValue(), actual.isIgnoreNullValue());

Review Comment:
   I think instantiating a dummy expected object can be avoided, we can simply use `assertThat(actual.isIgnoreNullValue()).isTrue()`.



##########
flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java:
##########
@@ -42,18 +42,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_TTL;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_MAX_RETRIES;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.NULL_STRING_LITERAL;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
+import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.*;

Review Comment:
   I suggest to avoid wildard imports.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org