You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/02 02:09:07 UTC
[flink] 03/03: [FLINK-13290][hbase] Enable blink planner for
integration tests of HBase
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5d981ebc16ce6a1089e021fcb7a634ebe0167be5
Author: Jark Wu <im...@gmail.com>
AuthorDate: Tue Jul 30 18:17:39 2019 +0800
[FLINK-13290][hbase] Enable blink planner for integration tests of HBase
This commit combines HBaseTableSourceITCase and HBaseLookupFunctionITCase and HBaseConnectorITCase into one class.
This can save much cluster initialization time for us.
This closes #9275
---
flink-connectors/flink-hbase/pom.xml | 27 +-
.../flink/addons/hbase/HBaseTableSource.java | 5 +-
.../flink/addons/hbase/HBaseConnectorITCase.java | 601 +++++++++++++--------
.../addons/hbase/HBaseLookupFunctionITCase.java | 203 -------
.../apache/flink/addons/hbase/HBaseSinkITCase.java | 186 -------
.../flink/addons/hbase/util/HBaseTestBase.java | 150 +++++
.../HBaseTestingClusterAutoStarter.java} | 11 +-
.../flink/addons/hbase/util/PlannerType.java | 27 +
8 files changed, 571 insertions(+), 639 deletions(-)
diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml
index a9d3c22..eba6028 100644
--- a/flink-connectors/flink-hbase/pom.xml
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -99,14 +99,6 @@ under the License.
<scope>provided</scope>
<optional>true</optional>
</dependency>
- <!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- <optional>true</optional>
- </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -299,6 +291,25 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index 3fff5fa..b1e7161 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -180,7 +180,10 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
- throw new UnsupportedOperationException("HBase table can not convert to DataStream currently.");
+ HBaseTableSchema projectedSchema = hbaseSchema.getProjectedHBaseTableSchema(projectFields);
+ return execEnv
+ .createInput(new HBaseRowInputFormat(conf, tableName, projectedSchema), getReturnType())
+ .name(explainSource());
}
@VisibleForTesting
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index bd992c5..6a0909e 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -1,6 +1,4 @@
/*
- * Copyright The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,212 +18,138 @@
package org.apache.flink.addons.hbase;
+import org.apache.flink.addons.hbase.util.HBaseTestBase;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.runtime.utils.TableUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-
+import java.util.Map;
+
+import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.junit.Assert.assertEquals;
/**
- * This class contains integrations tests for multiple HBase connectors:
- * - TableInputFormat
- * - HBaseTableSource
- *
- * <p>These tests are located in a single test file to avoided unnecessary initializations of the
- * HBaseTestingCluster which takes about half a minute.
- *
+ * IT cases for HBase connector (including HBaseTableSource and HBaseTableSink).
*/
-public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
-
- private static final String TEST_TABLE = "testTable";
-
- private static final String FAMILY1 = "family1";
- private static final String F1COL1 = "col1";
-
- private static final String FAMILY2 = "family2";
- private static final String F2COL1 = "col1";
- private static final String F2COL2 = "col2";
-
- private static final String FAMILY3 = "family3";
- private static final String F3COL1 = "col1";
- private static final String F3COL2 = "col2";
- private static final String F3COL3 = "col3";
-
- @BeforeClass
- public static void activateHBaseCluster() throws IOException {
- registerHBaseMiniClusterInClasspath();
- prepareTable();
- LimitNetworkBuffersTestEnvironment.setAsContext();
- }
-
- @AfterClass
- public static void resetExecutionEnvironmentFactory() {
- LimitNetworkBuffersTestEnvironment.unsetAsContext();
- }
-
- private static void prepareTable() throws IOException {
-
- // create a table
- TableName tableName = TableName.valueOf(TEST_TABLE);
- // column families
- byte[][] families = new byte[][]{
- Bytes.toBytes(FAMILY1),
- Bytes.toBytes(FAMILY2),
- Bytes.toBytes(FAMILY3)
- };
- // split keys
- byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) };
- createTable(tableName, families, splitKeys);
-
- // get the HTable instance
- HTable table = openTable(tableName);
- List<Put> puts = new ArrayList<>();
- // add some data
- puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
- puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
- puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
- puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
- puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
- puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
- puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
- puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
-
- // append rows to table
- table.put(puts);
- table.close();
- }
-
- private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
- Put put = new Put(Bytes.toBytes(rowKey));
- // family 1
- put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
- // family 2
- if (f2c1 != null) {
- put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
- }
- put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
- // family 3
- put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
- put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
- put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
-
- return put;
- }
+public class HBaseConnectorITCase extends HBaseTestBase {
- // ######## HBaseTableSource tests ############
+ // -------------------------------------------------------------------------------------
+ // HBaseTableSource tests
+ // -------------------------------------------------------------------------------------
@Test
public void testTableSourceFullScan() throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
- HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+ TableEnvironment tEnv = createBatchTableEnv();
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
- tableEnv.registerTableSource("hTable", hbaseTable);
-
- Table result = tableEnv.sqlQuery(
- "SELECT " +
- " h.family1.col1, " +
- " h.family2.col1, " +
- " h.family2.col2, " +
- " h.family3.col1, " +
- " h.family3.col2, " +
- " h.family3.col3 " +
- "FROM hTable AS h"
- );
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
-
+ tEnv.registerTableSource("hTable", hbaseTable);
+
+ Table table = tEnv.sqlQuery("SELECT " +
+ " h.family1.col1, " +
+ " h.family2.col1, " +
+ " h.family2.col2, " +
+ " h.family3.col1, " +
+ " h.family3.col2, " +
+ " h.family3.col3 " +
+ "FROM hTable AS h");
+
+ List<Row> results = collectBatchResult(table);
String expected =
"10,Hello-1,100,1.01,false,Welt-1\n" +
- "20,Hello-2,200,2.02,true,Welt-2\n" +
- "30,Hello-3,300,3.03,false,Welt-3\n" +
- "40,null,400,4.04,true,Welt-4\n" +
- "50,Hello-5,500,5.05,false,Welt-5\n" +
- "60,Hello-6,600,6.06,true,Welt-6\n" +
- "70,Hello-7,700,7.07,false,Welt-7\n" +
- "80,null,800,8.08,true,Welt-8\n";
+ "20,Hello-2,200,2.02,true,Welt-2\n" +
+ "30,Hello-3,300,3.03,false,Welt-3\n" +
+ "40,null,400,4.04,true,Welt-4\n" +
+ "50,Hello-5,500,5.05,false,Welt-5\n" +
+ "60,Hello-6,600,6.06,true,Welt-6\n" +
+ "70,Hello-7,700,7.07,false,Welt-7\n" +
+ "80,null,800,8.08,true,Welt-8\n";
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
public void testTableSourceProjection() throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
- HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+ TableEnvironment tEnv = createBatchTableEnv();
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
- tableEnv.registerTableSource("hTable", hbaseTable);
+ tEnv.registerTableSource("hTable", hbaseTable);
- Table result = tableEnv.sqlQuery(
- "SELECT " +
- " h.family1.col1, " +
- " h.family3.col1, " +
- " h.family3.col2, " +
- " h.family3.col3 " +
- "FROM hTable AS h"
- );
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
+ Table table = tEnv.sqlQuery("SELECT " +
+ " h.family1.col1, " +
+ " h.family3.col1, " +
+ " h.family3.col2, " +
+ " h.family3.col3 " +
+ "FROM hTable AS h");
+ List<Row> results = collectBatchResult(table);
String expected =
"10,1.01,false,Welt-1\n" +
- "20,2.02,true,Welt-2\n" +
- "30,3.03,false,Welt-3\n" +
- "40,4.04,true,Welt-4\n" +
- "50,5.05,false,Welt-5\n" +
- "60,6.06,true,Welt-6\n" +
- "70,7.07,false,Welt-7\n" +
- "80,8.08,true,Welt-8\n";
+ "20,2.02,true,Welt-2\n" +
+ "30,3.03,false,Welt-3\n" +
+ "40,4.04,true,Welt-4\n" +
+ "50,5.05,false,Welt-5\n" +
+ "60,6.06,true,Welt-6\n" +
+ "70,7.07,false,Welt-7\n" +
+ "80,8.08,true,Welt-8\n";
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
public void testTableSourceFieldOrder() throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
- HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+ TableEnvironment tEnv = createBatchTableEnv();
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
// shuffle order of column registration
hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
@@ -233,68 +157,319 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
- tableEnv.registerTableSource("hTable", hbaseTable);
+ tEnv.registerTableSource("hTable", hbaseTable);
- Table result = tableEnv.sqlQuery(
- "SELECT * FROM hTable AS h"
- );
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
+ Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h");
+ List<Row> results = collectBatchResult(table);
String expected =
"Hello-1,100,1.01,false,Welt-1,10\n" +
- "Hello-2,200,2.02,true,Welt-2,20\n" +
- "Hello-3,300,3.03,false,Welt-3,30\n" +
- "null,400,4.04,true,Welt-4,40\n" +
- "Hello-5,500,5.05,false,Welt-5,50\n" +
- "Hello-6,600,6.06,true,Welt-6,60\n" +
- "Hello-7,700,7.07,false,Welt-7,70\n" +
- "null,800,8.08,true,Welt-8,80\n";
+ "Hello-2,200,2.02,true,Welt-2,20\n" +
+ "Hello-3,300,3.03,false,Welt-3,30\n" +
+ "null,400,4.04,true,Welt-4,40\n" +
+ "Hello-5,500,5.05,false,Welt-5,50\n" +
+ "Hello-6,600,6.06,true,Welt-6,60\n" +
+ "Hello-7,700,7.07,false,Welt-7,70\n" +
+ "null,800,8.08,true,Welt-8,80\n";
TestBaseUtils.compareResultAsText(results, expected);
}
@Test
public void testTableSourceReadAsByteArray() throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
+ TableEnvironment tEnv = createBatchTableEnv();
// fetch row2 from the table till the end
- HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1);
hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
+ tEnv.registerTableSource("hTable", hbaseTable);
+ tEnv.registerFunction("toUTF8", new ToUTF8());
+ tEnv.registerFunction("toLong", new ToLong());
- tableEnv.registerTableSource("hTable", hbaseTable);
- tableEnv.registerFunction("toUTF8", new ToUTF8());
- tableEnv.registerFunction("toLong", new ToLong());
-
- Table result = tableEnv.sqlQuery(
+ Table table = tEnv.sqlQuery(
"SELECT " +
" toUTF8(h.family2.col1), " +
" toLong(h.family2.col2) " +
"FROM hTable AS h"
);
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
+ List<Row> results = collectBatchResult(table);
String expected =
"Hello-1,100\n" +
- "Hello-2,200\n" +
- "Hello-3,300\n" +
- "null,400\n" +
- "Hello-5,500\n" +
- "Hello-6,600\n" +
- "Hello-7,700\n" +
- "null,800\n";
+ "Hello-2,200\n" +
+ "Hello-3,300\n" +
+ "null,400\n" +
+ "Hello-5,500\n" +
+ "Hello-6,600\n" +
+ "Hello-7,700\n" +
+ "null,800\n";
TestBaseUtils.compareResultAsText(results, expected);
}
+ @Test
+ public void testTableInputFormat() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple1<Integer>> result = env
+ .createInput(new InputFormatForTestTable())
+ .reduce((ReduceFunction<Tuple1<Integer>>) (v1, v2) -> Tuple1.of(v1.f0 + v2.f0));
+
+ List<Tuple1<Integer>> resultSet = result.collect();
+
+ assertEquals(1, resultSet.size());
+ assertEquals(360, (int) resultSet.get(0).f0);
+ }
+
+ // -------------------------------------------------------------------------------------
+ // HBaseTableSink tests
+ // -------------------------------------------------------------------------------------
+
+ // prepare a source collection.
+ private static final List<Row> testData1 = new ArrayList<>();
+ private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
+ new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
+ new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
+
+ static {
+ testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+ testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+ testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+ testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
+ testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+ testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+ testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+ testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
+ }
+
+ @Test
+ public void testTableSink() throws Exception {
+ HBaseTableSchema schema = new HBaseTableSchema();
+ schema.addColumn(FAMILY1, F1COL1, Integer.class);
+ schema.addColumn(FAMILY2, F2COL1, String.class);
+ schema.addColumn(FAMILY2, F2COL2, Long.class);
+ schema.setRowKey("rk", Integer.class);
+ schema.addColumn(FAMILY3, F3COL1, Double.class);
+ schema.addColumn(FAMILY3, F3COL2, Boolean.class);
+ schema.addColumn(FAMILY3, F3COL3, String.class);
+
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put("connector.type", "hbase");
+ tableProperties.put("connector.version", "1.4.3");
+ tableProperties.put("connector.property-version", "1");
+ tableProperties.put("connector.table-name", TEST_TABLE_2);
+ tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum());
+ tableProperties.put("connector.zookeeper.znode.parent", "/hbase");
+ DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema());
+ descriptorProperties.putProperties(tableProperties);
+ TableSink tableSink = TableFactoryService
+ .find(HBaseTableFactory.class, descriptorProperties.asMap())
+ .createTableSink(descriptorProperties.asMap());
+
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+ DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
+ tEnv.registerDataStream("src", ds);
+ tEnv.registerTableSink("hbase", tableSink);
+
+ String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
+ tEnv.sqlUpdate(query);
+
+ // wait to finish
+ tEnv.execute("HBase Job");
+
+ // start a batch scan job to verify contents in HBase table
+ // start a batch scan job to verify contents in HBase table
+ TableEnvironment batchTableEnv = createBatchTableEnv();
+
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_2);
+ hbaseTable.setRowKey("rowkey", Integer.class);
+ hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+ hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+ hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+ hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+ hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+ hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+ batchTableEnv.registerTableSource("hTable", hbaseTable);
+
+ Table table = batchTableEnv.sqlQuery(
+ "SELECT " +
+ " h.rowkey, " +
+ " h.family1.col1, " +
+ " h.family2.col1, " +
+ " h.family2.col2, " +
+ " h.family3.col1, " +
+ " h.family3.col2, " +
+ " h.family3.col3 " +
+ "FROM hTable AS h"
+ );
+
+ List<Row> results = collectBatchResult(table);
+ String expected =
+ "1,10,Hello-1,100,1.01,false,Welt-1\n" +
+ "2,20,Hello-2,200,2.02,true,Welt-2\n" +
+ "3,30,Hello-3,300,3.03,false,Welt-3\n" +
+ "4,40,,400,4.04,true,Welt-4\n" +
+ "5,50,Hello-5,500,5.05,false,Welt-5\n" +
+ "6,60,Hello-6,600,6.06,true,Welt-6\n" +
+ "7,70,Hello-7,700,7.07,false,Welt-7\n" +
+ "8,80,,800,8.08,true,Welt-8\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+
+ // -------------------------------------------------------------------------------------
+ // HBase lookup source tests
+ // -------------------------------------------------------------------------------------
+
+ // prepare a source collection.
+ private static final List<Row> testData2 = new ArrayList<>();
+ private static final RowTypeInfo testTypeInfo2 = new RowTypeInfo(
+ new TypeInformation[]{Types.INT, Types.LONG, Types.STRING},
+ new String[]{"a", "b", "c"});
+
+ static {
+ testData2.add(Row.of(1, 1L, "Hi"));
+ testData2.add(Row.of(2, 2L, "Hello"));
+ testData2.add(Row.of(3, 2L, "Hello world"));
+ testData2.add(Row.of(3, 3L, "Hello world!"));
+ }
+
+ @Test
+ public void testHBaseLookupFunction() throws Exception {
+ StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
+ StreamITCase.clear();
+
+ // prepare a source table
+ DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2);
+ Table in = streamTableEnv.fromDataStream(ds, "a, b, c");
+ streamTableEnv.registerTable("src", in);
+
+ Map<String, String> tableProperties = hbaseTableProperties();
+ TableSource source = TableFactoryService
+ .find(HBaseTableFactory.class, tableProperties)
+ .createTableSource(tableProperties);
+
+ streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));
+
+ // perform a temporal table join query
+ String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM src, LATERAL TABLE(hbaseLookup(a))";
+ Table result = streamTableEnv.sqlQuery(sqlQuery);
+
+ DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink<>());
+
+ streamEnv.execute();
+
+ List<String> expected = new ArrayList<>();
+ expected.add("1,10,Welt-1");
+ expected.add("2,20,Welt-2");
+ expected.add("3,30,Welt-3");
+ expected.add("3,30,Welt-3");
+
+ StreamITCase.compareWithList(expected);
+ }
+
+ @Test
+ public void testHBaseLookupTableSource() throws Exception {
+ if (OLD_PLANNER.equals(planner)) {
+ // lookup table source is only supported in blink planner, skip for old planner
+ return;
+ }
+ StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
+ StreamITCase.clear();
+
+ // prepare a source table
+ String srcTableName = "src";
+ DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2);
+ Table in = streamTableEnv.fromDataStream(ds, "a, b, c, proc.proctime");
+ streamTableEnv.registerTable(srcTableName, in);
+
+ Map<String, String> tableProperties = hbaseTableProperties();
+ TableSource source = TableFactoryService
+ .find(HBaseTableFactory.class, tableProperties)
+ .createTableSource(tableProperties);
+ streamTableEnv.registerTableSource("hbaseLookup", source);
+ // perform a temporal table join query
+ String query = "SELECT a,family1.col1, family3.col3 FROM src " +
+ "JOIN hbaseLookup FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rk";
+ Table result = streamTableEnv.sqlQuery(query);
+
+ DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink<>());
+
+ streamEnv.execute();
+
+ List<String> expected = new ArrayList<>();
+ expected.add("1,10,Welt-1");
+ expected.add("2,20,Welt-2");
+ expected.add("3,30,Welt-3");
+ expected.add("3,30,Welt-3");
+
+ StreamITCase.compareWithList(expected);
+ }
+
+ private static Map<String, String> hbaseTableProperties() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
+ properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
+ properties.put(CONNECTOR_PROPERTY_VERSION, "1");
+ properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
+ // get zk quorum from "hbase-site.xml" in classpath
+ String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
+ properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
+ // schema
+ String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
+ TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
+ TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
+ TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
+ TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};
+
+ DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
+ descriptorProperties.putTableSchema(SCHEMA, tableSchema);
+ descriptorProperties.putProperties(properties);
+ return descriptorProperties.asMap();
+ }
+
+ // ------------------------------- Utilities -------------------------------------------------
+
+ /**
+ * Creates a Batch {@link TableEnvironment} depends on the {@link #planner} context.
+ */
+ private TableEnvironment createBatchTableEnv() {
+ if (OLD_PLANNER.equals(planner)) {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ return BatchTableEnvironment.create(env, new TableConfig());
+ } else {
+ return TableEnvironment.create(batchSettings);
+ }
+ }
+
+ /**
+ * Collects batch result depends on the {@link #planner} context.
+ */
+ private List<Row> collectBatchResult(Table table) throws Exception {
+ TableImpl tableImpl = (TableImpl) table;
+ if (OLD_PLANNER.equals(planner)) {
+ BatchTableEnvironment batchTableEnv = (BatchTableEnvironment) tableImpl.getTableEnvironment();
+ DataSet<Row> resultSet = batchTableEnv.toDataSet(table, Row.class);
+ return resultSet.collect();
+ } else {
+ return JavaScalaConversionUtil.toJava(TableUtil.collect(tableImpl));
+ }
+ }
+
/**
* A {@link ScalarFunction} that maps byte arrays to UTF-8 strings.
*/
public static class ToUTF8 extends ScalarFunction {
+ private static final long serialVersionUID = 1L;
public String eval(byte[] bytes) {
return Bytes.toString(bytes);
@@ -305,15 +480,18 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
* A {@link ScalarFunction} that maps byte array to longs.
*/
public static class ToLong extends ScalarFunction {
+ private static final long serialVersionUID = 1L;
public long eval(byte[] bytes) {
return Bytes.toLong(bytes);
}
}
- // ######## TableInputFormat tests ############
-
- class InputFormatForTestTable extends TableInputFormat<Tuple1<Integer>> {
+ /**
+ * A {@link TableInputFormat} for testing.
+ */
+ public static class InputFormatForTestTable extends TableInputFormat<Tuple1<Integer>> {
+ private static final long serialVersionUID = 1L;
@Override
protected Scan getScanner() {
@@ -322,7 +500,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
@Override
protected String getTableName() {
- return TEST_TABLE;
+ return TEST_TABLE_1;
}
@Override
@@ -331,51 +509,4 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
}
}
- @Test
- public void testTableInputFormat() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- DataSet<Tuple1<Integer>> result = env
- .createInput(new InputFormatForTestTable())
- .reduce(new ReduceFunction<Tuple1<Integer>>(){
-
- @Override
- public Tuple1<Integer> reduce(Tuple1<Integer> v1, Tuple1<Integer> v2) throws Exception {
- return Tuple1.of(v1.f0 + v2.f0);
- }
- });
-
- List<Tuple1<Integer>> resultSet = result.collect();
-
- assertEquals(1, resultSet.size());
- assertEquals(360, (int) resultSet.get(0).f0);
- }
-
- /**
- * Allows the tests to use {@link ExecutionEnvironment#getExecutionEnvironment()} but with a
- * configuration that limits the maximum memory used for network buffers since the current
- * defaults are too high for Travis-CI.
- */
- private abstract static class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment {
-
- public static void setAsContext() {
- Configuration config = new Configuration();
- // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
- config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
- final LocalEnvironment le = new LocalEnvironment(config);
-
- initializeContextEnvironment(new ExecutionEnvironmentFactory() {
- @Override
- public ExecutionEnvironment createExecutionEnvironment() {
- return le;
- }
- });
- }
-
- public static void unsetAsContext() {
- resetContextEnvironment();
- }
- }
-
}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
deleted file mode 100644
index 731cf31..0000000
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.runtime.utils.StreamITCase;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-
-/**
- * IT case Test HBaseLookupFunction.
- */
-public class HBaseLookupFunctionITCase extends HBaseTestingClusterAutostarter {
- private static final String ROWKEY = "rk";
- private static final String FAMILY1 = "family1";
- private static final String F1COL1 = "col1";
-
- private static final String FAMILY2 = "family2";
- private static final String F2COL1 = "col1";
- private static final String F2COL2 = "col2";
-
- private static final String FAMILY3 = "family3";
- private static final String F3COL1 = "col1";
- private static final String F3COL2 = "col2";
- private static final String F3COL3 = "col3";
-
- private static final String HTABLE_NAME = "testSrcHBaseTable1";
-
- // prepare a source collection.
- private static final List<Row> testData1 = new ArrayList<>();
-
- static {
- testData1.add(Row.of(1, 1L, "Hi"));
- testData1.add(Row.of(2, 2L, "Hello"));
- testData1.add(Row.of(3, 2L, "Hello world"));
- testData1.add(Row.of(3, 3L, "Hello world!"));
- }
-
- private static final TypeInformation<?>[] testTypes1 = {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
- private static final String[] testColumns1 = {"a", "b", "c"};
- private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(testTypes1, testColumns1);
-
- @BeforeClass
- public static void activateHBaseCluster() throws IOException {
- registerHBaseMiniClusterInClasspath();
- prepareHBaseTableWithData();
- }
-
- private static void prepareHBaseTableWithData() throws IOException {
- // create a table
- TableName tableName = TableName.valueOf(HTABLE_NAME);
- // column families
- byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
- // split keys
- byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
- createTable(tableName, families, splitKeys);
-
- // get the HTable instance
- HTable table = openTable(tableName);
- List<Put> puts = new ArrayList<>();
- // add some data
- puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
- puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
- puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
- puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
- puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
- puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
- puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
- puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
-
- // append rows to table
- table.put(puts);
- table.close();
- }
-
- private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
- Put put = new Put(Bytes.toBytes(rowKey));
- // family 1
- put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
- // family 2
- if (f2c1 != null) {
- put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
- }
- put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
- // family 3
- put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
- put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
- put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
-
- return put;
- }
-
- private static Map<String, String> hbaseTableProperties() {
- Map<String, String> properties = new HashMap<>();
- properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
- properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
- properties.put(CONNECTOR_PROPERTY_VERSION, "1");
- properties.put(CONNECTOR_TABLE_NAME, HTABLE_NAME);
- // get zk quorum from "hbase-site.xml" in classpath
- String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
- properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
- // schema
- String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
- TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
- TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
- TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
- TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};
-
- DescriptorProperties descriptorProperties = new DescriptorProperties(true);
- TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
- descriptorProperties.putTableSchema(SCHEMA, tableSchema);
- descriptorProperties.putProperties(properties);
- return descriptorProperties.asMap();
- }
-
- @Test
- public void testHBaseLookupFunction() throws Exception {
- StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- streamEnv.setParallelism(4);
- StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv);
- StreamITCase.clear();
-
- // prepare a source table
- String srcTableName = "testStreamSrcTable1";
- DataStream<Row> ds = streamEnv.fromCollection(testData1).returns(testTypeInfo1);
- Table in = streamTableEnv.fromDataStream(ds, String.join(",", testColumns1));
- streamTableEnv.registerTable(srcTableName, in);
-
- Map<String, String> tableProperties = hbaseTableProperties();
- TableSource source = TableFactoryService
- .find(HBaseTableFactory.class, tableProperties)
- .createTableSource(tableProperties);
-
- streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));
-
- // perform a temporal table join query
- String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM testStreamSrcTable1, LATERAL TABLE(hbaseLookup(a))";
- Table result = streamTableEnv.sqlQuery(sqlQuery);
-
- DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
- resultSet.addSink(new StreamITCase.StringSink<>());
-
- streamEnv.execute();
-
- List<String> expected = new ArrayList<>();
- expected.add("1,10,Welt-1");
- expected.add("2,20,Welt-2");
- expected.add("3,30,Welt-3");
- expected.add("3,30,Welt-3");
-
- StreamITCase.compareWithList(expected);
- }
-}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
deleted file mode 100644
index ef1445d..0000000
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.runtime.utils.StreamITCase;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-
-/**
- * IT case Test for {@link HBaseUpsertTableSink}.
- */
-public class HBaseSinkITCase extends HBaseTestingClusterAutostarter {
- private static final long serialVersionUID = 1L;
-
- private static final String TEST_TABLE = "testTable";
-
- private static final String FAMILY1 = "family1";
- private static final String F1COL1 = "col1";
-
- private static final String FAMILY2 = "family2";
- private static final String F2COL1 = "col1";
- private static final String F2COL2 = "col2";
-
- private static final String FAMILY3 = "family3";
- private static final String F3COL1 = "col1";
- private static final String F3COL2 = "col2";
- private static final String F3COL3 = "col3";
-
- // prepare a source collection.
- private static final List<Row> testData1 = new ArrayList<>();
- private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
- new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
- new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
-
- static {
- testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
- testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
- testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
- testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
- testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
- testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
- testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
- testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
- }
-
- @BeforeClass
- public static void activateHBaseCluster() throws IOException {
- registerHBaseMiniClusterInClasspath();
- createHBaseTable();
- }
-
- private static void createHBaseTable() {
- // create a table
- TableName tableName = TableName.valueOf(TEST_TABLE);
- // column families
- byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
- // split keys
- byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
- createTable(tableName, families, splitKeys);
- }
-
- @Test
- public void testTableSink() throws Exception {
- HBaseTableSchema schema = new HBaseTableSchema();
- schema.addColumn(FAMILY1, F1COL1, Integer.class);
- schema.addColumn(FAMILY2, F2COL1, String.class);
- schema.addColumn(FAMILY2, F2COL2, Long.class);
- schema.setRowKey("rk", Integer.class);
- schema.addColumn(FAMILY3, F3COL1, Double.class);
- schema.addColumn(FAMILY3, F3COL2, Boolean.class);
- schema.addColumn(FAMILY3, F3COL3, String.class);
-
- Map<String, String> tableProperties = new HashMap<>();
- tableProperties.put("connector.type", "hbase");
- tableProperties.put("connector.version", "1.4.3");
- tableProperties.put("connector.property-version", "1");
- tableProperties.put("connector.table-name", TEST_TABLE);
- tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum());
- tableProperties.put("connector.zookeeper.znode.parent", "/hbase");
- DescriptorProperties descriptorProperties = new DescriptorProperties(true);
- descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema());
- descriptorProperties.putProperties(tableProperties);
- TableSink tableSink = TableFactoryService
- .find(HBaseTableFactory.class, descriptorProperties.asMap())
- .createTableSink(descriptorProperties.asMap());
-
- StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- execEnv.setParallelism(4);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv);
- StreamITCase.clear();
-
- DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
- tEnv.registerDataStream("src", ds);
- tEnv.registerTableSink("hbase", tableSink);
-
- String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
- tEnv.sqlUpdate(query);
-
- // wait to finish
- tEnv.execute("HBase Job");
-
- // start a batch scan job to verify contents in HBase table
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
- BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
-
- HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
- hbaseTable.setRowKey("rowkey", Integer.class);
- hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
- hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
- hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
- hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
- hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
- hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
- tableEnv.registerTableSource("hTable", hbaseTable);
-
- Table result = tableEnv.sqlQuery(
- "SELECT " +
- " h.rowkey, " +
- " h.family1.col1, " +
- " h.family2.col1, " +
- " h.family2.col2, " +
- " h.family3.col1, " +
- " h.family3.col2, " +
- " h.family3.col3 " +
- "FROM hTable AS h"
- );
- DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- List<Row> results = resultSet.collect();
-
- String expected =
- "1,10,Hello-1,100,1.01,false,Welt-1\n" +
- "2,20,Hello-2,200,2.02,true,Welt-2\n" +
- "3,30,Hello-3,300,3.03,false,Welt-3\n" +
- "4,40,,400,4.04,true,Welt-4\n" +
- "5,50,Hello-5,500,5.05,false,Welt-5\n" +
- "6,60,Hello-6,600,6.06,true,Welt-6\n" +
- "7,70,Hello-7,700,7.07,false,Welt-7\n" +
- "8,80,,800,8.08,true,Welt-8\n";
-
- TestBaseUtils.compareResultAsText(results, expected);
- }
-}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
new file mode 100644
index 0000000..32baf0c
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.addons.hbase.util.PlannerType.BLINK_PLANNER;
+import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
+
+/**
+ * Abstract IT case class for HBase.
+ */
+@RunWith(Parameterized.class)
+public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
+
+ protected static final String TEST_TABLE_1 = "testTable1";
+ protected static final String TEST_TABLE_2 = "testTable2";
+
+ protected static final String ROWKEY = "rk";
+ protected static final String FAMILY1 = "family1";
+ protected static final String F1COL1 = "col1";
+
+ protected static final String FAMILY2 = "family2";
+ protected static final String F2COL1 = "col1";
+ protected static final String F2COL2 = "col2";
+
+ protected static final String FAMILY3 = "family3";
+ protected static final String F3COL1 = "col1";
+ protected static final String F3COL2 = "col2";
+ protected static final String F3COL3 = "col3";
+
+ private static final byte[][] FAMILIES = new byte[][]{
+ Bytes.toBytes(FAMILY1),
+ Bytes.toBytes(FAMILY2),
+ Bytes.toBytes(FAMILY3)
+ };
+
+ private static final byte[][] SPLIT_KEYS = new byte[][]{Bytes.toBytes(4)};
+
+ @Parameterized.Parameter
+ public PlannerType planner;
+ protected EnvironmentSettings streamSettings;
+ protected EnvironmentSettings batchSettings;
+
+ @Parameterized.Parameters(name = "planner = {0}")
+ public static PlannerType[] getPlanner() {
+ return new PlannerType[]{BLINK_PLANNER, OLD_PLANNER};
+ }
+
+ @BeforeClass
+ public static void activateHBaseCluster() throws IOException {
+ registerHBaseMiniClusterInClasspath();
+ prepareTables();
+ }
+
+ @Before
+ public void before() {
+ EnvironmentSettings.Builder streamBuilder = EnvironmentSettings.newInstance().inStreamingMode();
+ EnvironmentSettings.Builder batchBuilder = EnvironmentSettings.newInstance().inBatchMode();
+ if (BLINK_PLANNER.equals(planner)) {
+ this.streamSettings = streamBuilder.useBlinkPlanner().build();
+ this.batchSettings = batchBuilder.useBlinkPlanner().build();
+ } else if (OLD_PLANNER.equals(planner)) {
+ this.streamSettings = streamBuilder.useOldPlanner().build();
+ this.batchSettings = batchBuilder.useOldPlanner().build();
+ } else {
+ throw new IllegalArgumentException("Unsupported planner name " + planner);
+ }
+ }
+
+ private static void prepareTables() throws IOException {
+ createHBaseTable1();
+ createHBaseTable2();
+ }
+
+ private static void createHBaseTable1() throws IOException {
+ // create a table
+ TableName tableName = TableName.valueOf(TEST_TABLE_1);
+ createTable(tableName, FAMILIES, SPLIT_KEYS);
+
+ // get the HTable instance
+ HTable table = openTable(tableName);
+ List<Put> puts = new ArrayList<>();
+ // add some data
+ puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+ puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+ puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+ puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
+ puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+ puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+ puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+ puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+
+ // append rows to table
+ table.put(puts);
+ table.close();
+ }
+
+ private static void createHBaseTable2() {
+ // create a table
+ TableName tableName = TableName.valueOf(TEST_TABLE_2);
+ createTable(tableName, FAMILIES, SPLIT_KEYS);
+ }
+
+ private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) {
+ Put put = new Put(Bytes.toBytes(rowKey));
+ // family 1
+ put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1));
+ // family 2
+ if (f2c1 != null) {
+ put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
+ }
+ put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2));
+ // family 3
+ put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1));
+ put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2));
+ put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3));
+
+ return put;
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java
similarity index 97%
rename from flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
rename to flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java
index 905f80d..b515c2b 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java
@@ -18,9 +18,9 @@
* limitations under the License.
*/
-package org.apache.flink.addons.hbase;
+package org.apache.flink.addons.hbase.util;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,7 +42,6 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
@@ -74,9 +73,9 @@ import static org.junit.Assert.fail;
//
// https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
//
-public class HBaseTestingClusterAutostarter extends TestLogger implements Serializable {
+public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
- private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutostarter.class);
+ private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutoStarter.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static HBaseAdmin admin = null;
@@ -160,7 +159,7 @@ public class HBaseTestingClusterAutostarter extends TestLogger implements Serial
/**
* Returns zookeeper quorum value contains the right port number (varies per run).
*/
- static String getZookeeperQuorum() {
+ protected static String getZookeeperQuorum() {
return "localhost:" + TEST_UTIL.getZkCluster().getClientPort();
}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java
new file mode 100644
index 0000000..cd1bf39
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+/**
+ * Planner type to use.
+ */
+public enum PlannerType {
+ BLINK_PLANNER,
+ OLD_PLANNER
+}