You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/09/17 07:20:28 UTC

[flink] branch release-1.11 updated: [FLINK-18604][hbase] HBase ConnectorDescriptor can not work in Table API

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new e69b262  [FLINK-18604][hbase] HBase ConnectorDescriptor can not work in Table API
e69b262 is described below

commit e69b26242d69df19d90962a1bf472d16bfb58cc1
Author: fangliang <56...@qq.com>
AuthorDate: Thu Sep 17 15:19:51 2020 +0800

    [FLINK-18604][hbase] HBase ConnectorDescriptor can not work in Table API
    
    This closes #13398
---
 .../org/apache/flink/table/descriptors/HBase.java  |  2 +-
 .../connector/hbase/HBaseConnectorITCase.java      | 31 +++++++++++++++++++
 .../flink/connector/hbase/HBaseDescriptorTest.java | 35 ++++++++++++++++++++++
 3 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/table/descriptors/HBase.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/table/descriptors/HBase.java
index 9dcdff4..471ec73 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/table/descriptors/HBase.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/table/descriptors/HBase.java
@@ -41,7 +41,7 @@ public class HBase extends ConnectorDescriptor {
 	private DescriptorProperties properties = new DescriptorProperties();
 
 	public HBase() {
-		super(CONNECTOR_TYPE_VALUE_HBASE, 1, true);
+		super(CONNECTOR_TYPE_VALUE_HBASE, 1, false);
 	}
 
 	/**
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
index 9ab36cc..ff231f8 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.connector.hbase.util.HBaseTestBase;
 import org.apache.flink.connector.hbase.util.PlannerType;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
@@ -39,6 +40,8 @@ import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.descriptors.HBase;
+import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil;
 import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
@@ -245,6 +248,34 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 	}
 
 	@Test
+	public void testTableSourceWithTableAPI() throws Exception {
+		TableEnvironment tEnv = createBatchTableEnv();
+		tEnv.connect(new HBase()
+			.version("1.4.3")
+			.tableName(TEST_TABLE_1)
+			.zookeeperQuorum(getZookeeperQuorum()))
+			.withSchema(new Schema()
+				.field("rowkey", DataTypes.INT())
+				.field("family2", DataTypes.ROW(DataTypes.FIELD("col1", DataTypes.STRING()), DataTypes.FIELD("col2", DataTypes.BIGINT())))
+				.field("family3", DataTypes.ROW(DataTypes.FIELD("col1", DataTypes.DOUBLE()), DataTypes.FIELD("col2", DataTypes.BOOLEAN()), DataTypes.FIELD("col3", DataTypes.STRING())))
+				.field("family1", DataTypes.ROW(DataTypes.FIELD("col1", DataTypes.INT()))))
+			.createTemporaryTable("hTable");
+		Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h");
+		List<Row> results = collectBatchResult(table);
+		String expected =
+			"1,Hello-1,100,1.01,false,Welt-1,10\n" +
+				"2,Hello-2,200,2.02,true,Welt-2,20\n" +
+				"3,Hello-3,300,3.03,false,Welt-3,30\n" +
+				"4,null,400,4.04,true,Welt-4,40\n" +
+				"5,Hello-5,500,5.05,false,Welt-5,50\n" +
+				"6,Hello-6,600,6.06,true,Welt-6,60\n" +
+				"7,Hello-7,700,7.07,false,Welt-7,70\n" +
+				"8,null,800,8.08,true,Welt-8,80\n";
+
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	@Test
 	public void testTableSourceReadAsByteArray() throws Exception {
 		TableEnvironment tEnv = createBatchTableEnv();
 
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDescriptorTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDescriptorTest.java
index 427a091..d715ffb 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDescriptorTest.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDescriptorTest.java
@@ -18,13 +18,21 @@
 
 package org.apache.flink.connector.hbase;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.Registration;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.descriptors.ConnectTableDescriptor;
 import org.apache.flink.table.descriptors.Descriptor;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.DescriptorTestBase;
 import org.apache.flink.table.descriptors.DescriptorValidator;
+import org.apache.flink.table.descriptors.FormatDescriptor;
 import org.apache.flink.table.descriptors.HBase;
 import org.apache.flink.table.descriptors.HBaseValidator;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.StreamTableDescriptor;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -33,6 +41,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Test case for {@link HBase} descriptor.
@@ -117,4 +126,30 @@ public class HBaseDescriptorTest extends DescriptorTestBase {
 			Assert.assertTrue("The case#" + i + " didn't get the expected error", caughtExpectedException);
 		}
 	}
+
+	@Test
+	public void testFormatNeed(){
+		String expected = "The connector org.apache.flink.table.descriptors.HBase does not require a format description but org.apache.flink.connector.hbase.HBaseDescriptorTest$1 found.";
+		AtomicReference<CatalogTableImpl> reference = new AtomicReference<>();
+		HBase hBase = new HBase();
+		Registration registration = (path, table) -> reference.set((CatalogTableImpl) table);
+		ConnectTableDescriptor descriptor = new StreamTableDescriptor(
+			registration, hBase)
+			.withFormat(new FormatDescriptor("myFormat", 1) {
+				@Override
+				protected Map<String, String> toFormatProperties() {
+					return new HashMap<>();
+				}
+			})
+			.withSchema(new Schema()
+				.field("f0", DataTypes.INT())
+				.rowtime(new Rowtime().timestampsFromField("f0")));
+		String actual = null;
+		try {
+			descriptor.toProperties();
+		} catch (Exception e) {
+			actual = e.getMessage();
+		}
+		Assert.assertEquals(expected, actual);
+	}
 }