You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/09/17 07:20:28 UTC
[flink] branch release-1.11 updated: [FLINK-18604][hbase] HBase
ConnectorDescriptor can not work in Table API
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new e69b262 [FLINK-18604][hbase] HBase ConnectorDescriptor can not work in Table API
e69b262 is described below
commit e69b26242d69df19d90962a1bf472d16bfb58cc1
Author: fangliang <56...@qq.com>
AuthorDate: Thu Sep 17 15:19:51 2020 +0800
[FLINK-18604][hbase] HBase ConnectorDescriptor can not work in Table API
This closes #13398
---
.../org/apache/flink/table/descriptors/HBase.java | 2 +-
.../connector/hbase/HBaseConnectorITCase.java | 31 +++++++++++++++++++
.../flink/connector/hbase/HBaseDescriptorTest.java | 35 ++++++++++++++++++++++
3 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/table/descriptors/HBase.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/table/descriptors/HBase.java
index 9dcdff4..471ec73 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/table/descriptors/HBase.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/table/descriptors/HBase.java
@@ -41,7 +41,7 @@ public class HBase extends ConnectorDescriptor {
private DescriptorProperties properties = new DescriptorProperties();
public HBase() {
- super(CONNECTOR_TYPE_VALUE_HBASE, 1, true);
+ super(CONNECTOR_TYPE_VALUE_HBASE, 1, false);
}
/**
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
index 9ab36cc..ff231f8 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.connector.hbase.util.HBaseTestBase;
import org.apache.flink.connector.hbase.util.PlannerType;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
@@ -39,6 +40,8 @@ import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.descriptors.HBase;
+import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
@@ -245,6 +248,34 @@ public class HBaseConnectorITCase extends HBaseTestBase {
}
@Test
+ public void testTableSourceWithTableAPI() throws Exception {
+ TableEnvironment tEnv = createBatchTableEnv();
+ tEnv.connect(new HBase()
+ .version("1.4.3")
+ .tableName(TEST_TABLE_1)
+ .zookeeperQuorum(getZookeeperQuorum()))
+ .withSchema(new Schema()
+ .field("rowkey", DataTypes.INT())
+ .field("family2", DataTypes.ROW(DataTypes.FIELD("col1", DataTypes.STRING()), DataTypes.FIELD("col2", DataTypes.BIGINT())))
+ .field("family3", DataTypes.ROW(DataTypes.FIELD("col1", DataTypes.DOUBLE()), DataTypes.FIELD("col2", DataTypes.BOOLEAN()), DataTypes.FIELD("col3", DataTypes.STRING())))
+ .field("family1", DataTypes.ROW(DataTypes.FIELD("col1", DataTypes.INT()))))
+ .createTemporaryTable("hTable");
+ Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h");
+ List<Row> results = collectBatchResult(table);
+ String expected =
+ "1,Hello-1,100,1.01,false,Welt-1,10\n" +
+ "2,Hello-2,200,2.02,true,Welt-2,20\n" +
+ "3,Hello-3,300,3.03,false,Welt-3,30\n" +
+ "4,null,400,4.04,true,Welt-4,40\n" +
+ "5,Hello-5,500,5.05,false,Welt-5,50\n" +
+ "6,Hello-6,600,6.06,true,Welt-6,60\n" +
+ "7,Hello-7,700,7.07,false,Welt-7,70\n" +
+ "8,null,800,8.08,true,Welt-8,80\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
public void testTableSourceReadAsByteArray() throws Exception {
TableEnvironment tEnv = createBatchTableEnv();
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDescriptorTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDescriptorTest.java
index 427a091..d715ffb 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDescriptorTest.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDescriptorTest.java
@@ -18,13 +18,21 @@
package org.apache.flink.connector.hbase;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.Registration;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.Descriptor;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.DescriptorTestBase;
import org.apache.flink.table.descriptors.DescriptorValidator;
+import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.HBase;
import org.apache.flink.table.descriptors.HBaseValidator;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.junit.Assert;
import org.junit.Test;
@@ -33,6 +41,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Test case for {@link HBase} descriptor.
@@ -117,4 +126,30 @@ public class HBaseDescriptorTest extends DescriptorTestBase {
Assert.assertTrue("The case#" + i + " didn't get the expected error", caughtExpectedException);
}
}
+
+ @Test
+ public void testFormatNeed(){
+ String expected = "The connector org.apache.flink.table.descriptors.HBase does not require a format description but org.apache.flink.connector.hbase.HBaseDescriptorTest$1 found.";
+ AtomicReference<CatalogTableImpl> reference = new AtomicReference<>();
+ HBase hBase = new HBase();
+ Registration registration = (path, table) -> reference.set((CatalogTableImpl) table);
+ ConnectTableDescriptor descriptor = new StreamTableDescriptor(
+ registration, hBase)
+ .withFormat(new FormatDescriptor("myFormat", 1) {
+ @Override
+ protected Map<String, String> toFormatProperties() {
+ return new HashMap<>();
+ }
+ })
+ .withSchema(new Schema()
+ .field("f0", DataTypes.INT())
+ .rowtime(new Rowtime().timestampsFromField("f0")));
+ String actual = null;
+ try {
+ descriptor.toProperties();
+ } catch (Exception e) {
+ actual = e.getMessage();
+ }
+ Assert.assertEquals(expected, actual);
+ }
}