You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/11 07:09:25 UTC

[GitHub] [flink] leonardBang commented on a change in pull request #14536: [FLINK-20812][Connector][Hbase] hbase in sql mode,can use 'properties.*' add Configuration parameter.

leonardBang commented on a change in pull request #14536:
URL: https://github.com/apache/flink/pull/14536#discussion_r554797775



##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
##########
@@ -114,4 +117,29 @@ public String asSummaryString() {
     public HBaseTableSchema getHBaseTableSchema() {
         return this.hbaseSchema;
     }
+
+    // get hbase table properties which start with prefix
+    public static Properties getSpecifyProperties(Map<String, String> tableOptions, String prefix) {

Review comment:
       How about rename `getSpecifyProperties` to `getHBaseClientProperties`?

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
##########
@@ -114,4 +117,29 @@ public String asSummaryString() {
     public HBaseTableSchema getHBaseTableSchema() {
         return this.hbaseSchema;
     }
+
+    // get hbase table properties which start with prefix
+    public static Properties getSpecifyProperties(Map<String, String> tableOptions, String prefix) {
+        final Properties hbaseProperties = new Properties();
+
+        if (hasSpecifyProperties(tableOptions, prefix)) {
+            tableOptions.keySet().stream()
+                    .filter(key -> key.startsWith(prefix))
+                    .forEach(
+                            key -> {
+                                final String value = tableOptions.get(key);
+                                final String subKey = key.substring((prefix).length());
+                                hbaseProperties.put(subKey, value);
+                            });
+        }
+        return hbaseProperties;
+    }
+
+    /**
+     * Decides if the table options contains hbase client properties that start with prefix

Review comment:
       ```suggestion
        * Returns wether the table options contains hbase client properties or not.

##########
File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
##########
@@ -95,7 +95,7 @@ public void testJarSubmission() throws Exception {
         final JobPlanInfo planResponse = showPlan(planHandler, storedJarName, restfulGateway);
         // we're only interested in the core functionality so checking for a small detail is
         // sufficient
-        Assert.assertThat(planResponse.getJsonPlan(), containsString("TestProgram.java:30"));
+        Assert.assertThat(planResponse.getJsonPlan(), containsString("TestProgram.java:28"));

Review comment:
       diito

##########
File path: flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
##########
@@ -117,7 +117,8 @@ public void join(
                     public boolean preVisit(Operator<?> visitable) {
                         if (visitable instanceof InnerJoinOperatorBase) {
                             Assert.assertEquals(
-                                    "Join at testJoinWith(NamesTest.java:93)", visitable.getName());

Review comment:
       diito

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/table/descriptors/HBase.java
##########
@@ -122,8 +127,47 @@ public HBase writeBufferFlushInterval(String interval) {
         return this;
     }
 
+    /**
+     * Sets the configuration properties for Hbase Configuration. Resets previously set properties.

Review comment:
       Note: Please use `HBase` rather than `Hbase` or `hbase` in code/note, please change them as well

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/table/descriptors/HBase.java
##########
@@ -122,8 +127,47 @@ public HBase writeBufferFlushInterval(String interval) {
         return this;
     }
 
+    /**
+     * Sets the configuration properties for Hbase Configuration. Resets previously set properties.
+     *
+     * @param properties The configuration properties for Hbase Configuration.
+     */
+    public HBase properties(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        if (this.hbaseProperties == null) {
+            this.hbaseProperties = new HashMap<>();
+        }
+        this.hbaseProperties.clear();
+        properties.forEach((k, v) -> this.hbaseProperties.put((String) k, (String) v));
+        return this;
+    }
+
+    /**
+     * Adds a configuration properties for Hbase Configuration.

Review comment:
       ```suggestion
        * Adds a configuration property for HBase Configuration.
   ```

##########
File path: flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
##########
@@ -114,4 +117,29 @@ public String asSummaryString() {
     public HBaseTableSchema getHBaseTableSchema() {
         return this.hbaseSchema;
     }
+
+    // get hbase table properties which start with prefix
+    public static Properties getSpecifyProperties(Map<String, String> tableOptions, String prefix) {
+        final Properties hbaseProperties = new Properties();
+
+        if (hasSpecifyProperties(tableOptions, prefix)) {
+            tableOptions.keySet().stream()
+                    .filter(key -> key.startsWith(prefix))
+                    .forEach(
+                            key -> {
+                                final String value = tableOptions.get(key);
+                                final String subKey = key.substring((prefix).length());
+                                hbaseProperties.put(subKey, value);
+                            });
+        }
+        return hbaseProperties;
+    }
+
+    /**
+     * Decides if the table options contains hbase client properties that start with prefix
+     * 'properties'.
+     */
+    private static boolean hasSpecifyProperties(Map<String, String> tableOptions, String prefix) {

Review comment:
       how about rename to `hasHBaseClientProperties` or `containsHBaseClientProperties ` ?
    

##########
File path: flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
##########
@@ -62,7 +62,7 @@ public boolean filter(String value) throws Exception {
                         })
                 .output(new DiscardingOutputFormat<String>());
         Plan plan = env.createProgramPlan();
-        testForName("Filter at testDefaultName(NamesTest.java:55)", plan);

Review comment:
       I don't think this PR will lead the test failure, why we change it?
   If it's an exited bug, I suggest to file a JIRA issue and open another PR to fix it.

##########
File path: flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1TableFactory.java
##########
@@ -50,15 +50,17 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
+import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_TABLE_NAME;
+import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
 import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
-import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_TABLE_NAME;
-import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
-import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL;
-import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS;
-import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE;
-import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_ZK_NODE_PARENT;
-import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_ZK_QUORUM;
+import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL;
+import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS;
+import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE;
+import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_ZK_NODE_PARENT;
+import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.AbstractHBaseValidator.CONNECTOR_PROPERTIES;

Review comment:
       
   I think current import order is fine, if you want to optimize the imports please keep them in same style i.e.:
   ```suggestion
   import static org.apache.flink.connector.hbase1.HBaseValidator.CONNECTOR_PROPERTIES.CONNECTOR_PROPERTIES;
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org