You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/12/11 16:08:43 UTC

[flink] branch release-1.12 updated (85c39b4 -> b6ae903)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 85c39b4  [FLINK-25091][docs] Change ORC compression attribute reference error in FileSink doc
     new 8e3b291  [FLINK-24077][HBase/IT] Add check of row count after insert and wait explicitly for job to finish.
     new b6ae903  [FLINK-24077][HBase/IT] use MiniClusterWithClientResource as @ClassRule.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connector/hbase2/HBaseConnectorITCase.java     | 106 ++++++++++++++-------
 1 file changed, 74 insertions(+), 32 deletions(-)

[flink] 02/02: [FLINK-24077][HBase/IT] use MiniClusterWithClientResource as @ClassRule.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6ae90384b47a6274e2f746c76aa31387c2d5ee8
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Mon Dec 6 15:09:06 2021 +0100

    [FLINK-24077][HBase/IT] use MiniClusterWithClientResource as @ClassRule.
    
    while using TableEnvironment in the ITCase, a Flink MiniCluster will be started/stopped automatically in the background. Since the shutdown of the MiniCluster will be called asynchronously, CollectResultFetcher will get data lost sometimes based on race conditions and the unchecked RuntimeException java.lang.IllegalStateException will be thrown that we were not aware of.
    
    The solution is to control the lifecycle of the MiniCluster manually in this test. The MiniClusterWithClientResource could be a good fit in this case.
    
    (cherry picked from commit fca04c3aaf6346d61cf9fe022a7ac77ab4d66c91)
---
 .../flink/connector/hbase2/HBaseConnectorITCase.java      | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index bda0fc1..1d111e8 100644
--- a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase.util.PlannerType;
 import org.apache.flink.connector.hbase2.source.AbstractTableInputFormat;
@@ -33,6 +34,7 @@ import org.apache.flink.connector.hbase2.source.HBaseRowDataInputFormat;
 import org.apache.flink.connector.hbase2.source.HBaseRowInputFormat;
 import org.apache.flink.connector.hbase2.source.HBaseTableSource;
 import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
@@ -46,6 +48,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.descriptors.HBase;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
@@ -57,6 +60,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -77,6 +81,13 @@ import static org.junit.Assert.assertNull;
 @RunWith(Parameterized.class)
 public class HBaseConnectorITCase extends HBaseTestBase {
 
+    @ClassRule
+    public static final MiniClusterWithClientResource MINI_CLUSTER =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(new Configuration())
+                            .build());
+
     @Parameterized.Parameter public PlannerType planner;
 
     @Parameterized.Parameter(1)
@@ -436,8 +447,6 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                                 + " AS h");
 
         TableResult tableResult2 = table.execute();
-        // wait to finish
-        tableResult2.getJobClient().get().getJobExecutionResult().get();
 
         List<Row> results = CollectionUtil.iteratorToList(tableResult2.collect());
 
@@ -529,8 +538,6 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                         + " AS h";
 
         TableResult tableResult3 = batchEnv.executeSql(query);
-        // wait to finish
-        tableResult3.getJobClient().get().getJobExecutionResult().get();
 
         List<String> result =
                 Lists.newArrayList(tableResult3.collect()).stream()

[flink] 01/02: [FLINK-24077][HBase/IT] Add check of row count after insert and wait explicitly for job to finish.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e3b291fb5b04b8e774d603a517f2dfadc6095ed
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Nov 23 11:49:57 2021 +0100

    [FLINK-24077][HBase/IT] Add check of row count after insert and wait explicitly for job to finish.
    
    (cherry picked from commit 7976be0f8675a8753a5bb7e7a44dda6b4a347247)
---
 .../connector/hbase2/HBaseConnectorITCase.java     | 99 +++++++++++++++-------
 1 file changed, 67 insertions(+), 32 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index e58f53c..bda0fc1 100644
--- a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
@@ -47,6 +48,7 @@ import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CollectionUtil;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -392,13 +394,33 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                         + " FROM "
                         + TEST_TABLE_1;
 
+        TableResult tableResult = tEnv.executeSql(query);
+
         // wait to finish
-        tEnv.executeSql(query).await();
+        tableResult.await();
+
+        assertEquals(
+                "Expected INSERT rowKind", RowKind.INSERT, tableResult.collect().next().getKind());
 
         // start a batch scan job to verify contents in HBase table
         TableEnvironment batchEnv = createBatchTableEnv();
         batchEnv.executeSql(table2DDL);
 
+        List<String> expected = new ArrayList<>();
+        expected.add("1,10,Hello-1,100,1.01,false,Welt-1\n");
+        expected.add("2,20,Hello-2,200,2.02,true,Welt-2\n");
+        expected.add("3,30,Hello-3,300,3.03,false,Welt-3\n");
+        expected.add("4,40,null,400,4.04,true,Welt-4\n");
+        expected.add("5,50,Hello-5,500,5.05,false,Welt-5\n");
+        expected.add("6,60,Hello-6,600,6.06,true,Welt-6\n");
+        expected.add("7,70,Hello-7,700,7.07,false,Welt-7\n");
+        expected.add("8,80,null,800,8.08,true,Welt-8\n");
+
+        Table countTable =
+                batchEnv.sqlQuery("SELECT COUNT(h.rowkey) FROM " + TEST_TABLE_2 + " AS h");
+
+        assertEquals(new Long(expected.size()), countTable.execute().collect().next().getField(0));
+
         Table table =
                 batchEnv.sqlQuery(
                         "SELECT "
@@ -412,18 +434,14 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                                 + "FROM "
                                 + TEST_TABLE_2
                                 + " AS h");
-        List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
-        String expected =
-                "1,10,Hello-1,100,1.01,false,Welt-1\n"
-                        + "2,20,Hello-2,200,2.02,true,Welt-2\n"
-                        + "3,30,Hello-3,300,3.03,false,Welt-3\n"
-                        + "4,40,null,400,4.04,true,Welt-4\n"
-                        + "5,50,Hello-5,500,5.05,false,Welt-5\n"
-                        + "6,60,Hello-6,600,6.06,true,Welt-6\n"
-                        + "7,70,Hello-7,700,7.07,false,Welt-7\n"
-                        + "8,80,null,800,8.08,true,Welt-8\n";
 
-        TestBaseUtils.compareResultAsText(results, expected);
+        TableResult tableResult2 = table.execute();
+        // wait to finish
+        tableResult2.getJobClient().get().getJobExecutionResult().get();
+
+        List<Row> results = CollectionUtil.iteratorToList(tableResult2.collect());
+
+        TestBaseUtils.compareResultAsText(results, String.join("", expected));
     }
 
     @Test
@@ -457,12 +475,42 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                         + " family4"
                         + " from "
                         + TEST_TABLE_1;
+
+        TableResult tableResult = tEnv.executeSql(insertStatement);
+
         // wait to finish
-        tEnv.executeSql(insertStatement).await();
+        tableResult.await();
+
+        assertEquals(
+                "Expected INSERT rowKind", RowKind.INSERT, tableResult.collect().next().getKind());
 
         // start a batch scan job to verify contents in HBase table
         TableEnvironment batchEnv = createBatchTableEnv();
         batchEnv.executeSql(table3DDL);
+
+        List<String> expected = new ArrayList<>();
+        expected.add(
+                "1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001");
+        expected.add(
+                "2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002");
+        expected.add(
+                "3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
+        expected.add(
+                "4,40,null,400,4.04,true,Welt-4,2019-08-18T19:03,2019-08-18,19:03,12345678.0004");
+        expected.add(
+                "5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19T19:10,2019-08-19,19:10,12345678.0005");
+        expected.add(
+                "6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19T19:20,2019-08-19,19:20,12345678.0006");
+        expected.add(
+                "7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19T19:30,2019-08-19,19:30,12345678.0007");
+        expected.add(
+                "8,80,null,800,8.08,true,Welt-8,2019-08-19T19:40,2019-08-19,19:40,12345678.0008");
+
+        Table countTable =
+                batchEnv.sqlQuery("SELECT COUNT(h.rowkey) FROM " + TEST_TABLE_3 + " AS h");
+
+        assertEquals(new Long(expected.size()), countTable.execute().collect().next().getField(0));
+
         String query =
                 "SELECT "
                         + "  h.rowkey, "
@@ -479,30 +527,17 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                         + " FROM "
                         + TEST_TABLE_3
                         + " AS h";
-        Iterator<Row> collected = tEnv.executeSql(query).collect();
+
+        TableResult tableResult3 = batchEnv.executeSql(query);
+        // wait to finish
+        tableResult3.getJobClient().get().getJobExecutionResult().get();
+
         List<String> result =
-                Lists.newArrayList(collected).stream()
+                Lists.newArrayList(tableResult3.collect()).stream()
                         .map(Row::toString)
                         .sorted()
                         .collect(Collectors.toList());
 
-        List<String> expected = new ArrayList<>();
-        expected.add(
-                "1,10,Hello-1,100,1.01,false,Welt-1,2019-08-18T19:00,2019-08-18,19:00,12345678.0001");
-        expected.add(
-                "2,20,Hello-2,200,2.02,true,Welt-2,2019-08-18T19:01,2019-08-18,19:01,12345678.0002");
-        expected.add(
-                "3,30,Hello-3,300,3.03,false,Welt-3,2019-08-18T19:02,2019-08-18,19:02,12345678.0003");
-        expected.add(
-                "4,40,null,400,4.04,true,Welt-4,2019-08-18T19:03,2019-08-18,19:03,12345678.0004");
-        expected.add(
-                "5,50,Hello-5,500,5.05,false,Welt-5,2019-08-19T19:10,2019-08-19,19:10,12345678.0005");
-        expected.add(
-                "6,60,Hello-6,600,6.06,true,Welt-6,2019-08-19T19:20,2019-08-19,19:20,12345678.0006");
-        expected.add(
-                "7,70,Hello-7,700,7.07,false,Welt-7,2019-08-19T19:30,2019-08-19,19:30,12345678.0007");
-        expected.add(
-                "8,80,null,800,8.08,true,Welt-8,2019-08-19T19:40,2019-08-19,19:40,12345678.0008");
         assertEquals(expected, result);
     }