You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/12/11 16:08:06 UTC

[flink] branch release-1.13 updated (8c3b96e -> 9d9842f)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8c3b96e  [FLINK-25096] Fixes empty exception history for JobInitializationException (#18036)
     new 2efb0df  [FLINK-24077][HBase/IT] Add check of row count after insert and wait explicitly for job to finish.
     new 9d9842f  [FLINK-24077][HBase/IT] use MiniClusterWithClientResource as @ClassRule.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connector/hbase2/HBaseConnectorITCase.java     | 106 ++++++++++++++-------
 1 file changed, 74 insertions(+), 32 deletions(-)

[flink] 02/02: [FLINK-24077][HBase/IT] use MiniClusterWithClientResource as @ClassRule.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d9842fb610a95d9f31ad432e2546d8ce61b106e
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Mon Dec 6 15:09:06 2021 +0100

    [FLINK-24077][HBase/IT] use MiniClusterWithClientResource as @ClassRule.
    
    while using TableEnvironment in the ITCase, a Flink MiniCluster will be started/stopped automatically in the background. Since the shutdown of the MiniCluster will be called asynchronously, CollectResultFetcher will get data lost sometimes based on race conditions and the unchecked RuntimeException java.lang.IllegalStateException will be thrown that we were not aware of.
    
    The solution is to control the lifecycle of the MiniCluster manually in this test. The MiniClusterWithClientResource could be a good fit in this case.
    
    (cherry picked from commit fca04c3aaf6346d61cf9fe022a7ac77ab4d66c91)
---
 .../flink/connector/hbase2/HBaseConnectorITCase.java    | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index 9125f17..6e78ef1 100644
--- a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase.util.PlannerType;
 import org.apache.flink.connector.hbase2.source.AbstractTableInputFormat;
@@ -33,19 +34,21 @@ import org.apache.flink.connector.hbase2.source.HBaseRowDataInputFormat;
 import org.apache.flink.connector.hbase2.source.HBaseRowInputFormat;
 import org.apache.flink.connector.hbase2.source.HBaseTableSource;
 import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.descriptors.HBase;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
@@ -57,6 +60,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -77,6 +81,13 @@ import static org.junit.Assert.assertNull;
 @RunWith(Parameterized.class)
 public class HBaseConnectorITCase extends HBaseTestBase {
 
+    @ClassRule
+    public static final MiniClusterWithClientResource MINI_CLUSTER =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(new Configuration())
+                            .build());
+
     @Parameterized.Parameter public PlannerType planner;
 
     @Parameterized.Parameter(1)
@@ -436,8 +447,6 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                                 + " AS h");
 
         TableResult tableResult2 = table.execute();
-        // wait to finish
-        tableResult2.getJobClient().get().getJobExecutionResult().get();
 
         List<Row> results = CollectionUtil.iteratorToList(tableResult2.collect());
 
@@ -529,8 +538,6 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                         + " AS h";
 
         TableResult tableResult3 = batchEnv.executeSql(query);
-        // wait to finish
-        tableResult3.getJobClient().get().getJobExecutionResult().get();
 
         List<String> result =
                 Lists.newArrayList(tableResult3.collect()).stream()

[flink] 01/02: [FLINK-24077][HBase/IT] Add check of row count after insert and wait explicitly for job to finish.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2efb0df1fb2d2a7bf081ae205c78698f3244c625
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Nov 23 11:49:57 2021 +0100

    [FLINK-24077][HBase/IT] Add check of row count after insert and wait explicitly for job to finish.
    
    (cherry picked from commit 7976be0f8675a8753a5bb7e7a44dda6b4a347247)
---
 .../connector/hbase2/HBaseConnectorITCase.java     | 99 +++++++++++++++-------
 1 file changed, 67 insertions(+), 32 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index 33ab7c2..9125f17 100644
--- a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.descriptors.HBase;
@@ -47,6 +48,7 @@ import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CollectionUtil;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -392,13 +394,33 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                         + " FROM "
                         + TEST_TABLE_1;
 
+        TableResult tableResult = tEnv.executeSql(query);
+
         // wait to finish
-        tEnv.executeSql(query).await();
+        tableResult.await();
+
+        assertEquals(
+                "Expected INSERT rowKind", RowKind.INSERT, tableResult.collect().next().getKind());
 
         // start a batch scan job to verify contents in HBase table
         TableEnvironment batchEnv = createBatchTableEnv();
         batchEnv.executeSql(table2DDL);
 
+        List<String> expected = new ArrayList<>();
+        expected.add("+I[1, 10, Hello-1, 100, 1.01, false, Welt-1]\n");
+        expected.add("+I[2, 20, Hello-2, 200, 2.02, true, Welt-2]\n");
+        expected.add("+I[3, 30, Hello-3, 300, 3.03, false, Welt-3]\n");
+        expected.add("+I[4, 40, null, 400, 4.04, true, Welt-4]\n");
+        expected.add("+I[5, 50, Hello-5, 500, 5.05, false, Welt-5]\n");
+        expected.add("+I[6, 60, Hello-6, 600, 6.06, true, Welt-6]\n");
+        expected.add("+I[7, 70, Hello-7, 700, 7.07, false, Welt-7]\n");
+        expected.add("+I[8, 80, null, 800, 8.08, true, Welt-8]\n");
+
+        Table countTable =
+                batchEnv.sqlQuery("SELECT COUNT(h.rowkey) FROM " + TEST_TABLE_2 + " AS h");
+
+        assertEquals(new Long(expected.size()), countTable.execute().collect().next().getField(0));
+
         Table table =
                 batchEnv.sqlQuery(
                         "SELECT "
@@ -412,18 +434,14 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                                 + "FROM "
                                 + TEST_TABLE_2
                                 + " AS h");
-        List<Row> results = CollectionUtil.iteratorToList(table.execute().collect());
-        String expected =
-                "+I[1, 10, Hello-1, 100, 1.01, false, Welt-1]\n"
-                        + "+I[2, 20, Hello-2, 200, 2.02, true, Welt-2]\n"
-                        + "+I[3, 30, Hello-3, 300, 3.03, false, Welt-3]\n"
-                        + "+I[4, 40, null, 400, 4.04, true, Welt-4]\n"
-                        + "+I[5, 50, Hello-5, 500, 5.05, false, Welt-5]\n"
-                        + "+I[6, 60, Hello-6, 600, 6.06, true, Welt-6]\n"
-                        + "+I[7, 70, Hello-7, 700, 7.07, false, Welt-7]\n"
-                        + "+I[8, 80, null, 800, 8.08, true, Welt-8]\n";
 
-        TestBaseUtils.compareResultAsText(results, expected);
+        TableResult tableResult2 = table.execute();
+        // wait to finish
+        tableResult2.getJobClient().get().getJobExecutionResult().get();
+
+        List<Row> results = CollectionUtil.iteratorToList(tableResult2.collect());
+
+        TestBaseUtils.compareResultAsText(results, String.join("", expected));
     }
 
     @Test
@@ -457,12 +475,42 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                         + " family4"
                         + " from "
                         + TEST_TABLE_1;
+
+        TableResult tableResult = tEnv.executeSql(insertStatement);
+
         // wait to finish
-        tEnv.executeSql(insertStatement).await();
+        tableResult.await();
+
+        assertEquals(
+                "Expected INSERT rowKind", RowKind.INSERT, tableResult.collect().next().getKind());
 
         // start a batch scan job to verify contents in HBase table
         TableEnvironment batchEnv = createBatchTableEnv();
         batchEnv.executeSql(table3DDL);
+
+        List<String> expected = new ArrayList<>();
+        expected.add(
+                "+I[1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
+        expected.add(
+                "+I[2, 20, Hello-2, 200, 2.02, true, Welt-2, 2019-08-18T19:01, 2019-08-18, 19:01, 12345678.0002]");
+        expected.add(
+                "+I[3, 30, Hello-3, 300, 3.03, false, Welt-3, 2019-08-18T19:02, 2019-08-18, 19:02, 12345678.0003]");
+        expected.add(
+                "+I[4, 40, null, 400, 4.04, true, Welt-4, 2019-08-18T19:03, 2019-08-18, 19:03, 12345678.0004]");
+        expected.add(
+                "+I[5, 50, Hello-5, 500, 5.05, false, Welt-5, 2019-08-19T19:10, 2019-08-19, 19:10, 12345678.0005]");
+        expected.add(
+                "+I[6, 60, Hello-6, 600, 6.06, true, Welt-6, 2019-08-19T19:20, 2019-08-19, 19:20, 12345678.0006]");
+        expected.add(
+                "+I[7, 70, Hello-7, 700, 7.07, false, Welt-7, 2019-08-19T19:30, 2019-08-19, 19:30, 12345678.0007]");
+        expected.add(
+                "+I[8, 80, null, 800, 8.08, true, Welt-8, 2019-08-19T19:40, 2019-08-19, 19:40, 12345678.0008]");
+
+        Table countTable =
+                batchEnv.sqlQuery("SELECT COUNT(h.rowkey) FROM " + TEST_TABLE_3 + " AS h");
+
+        assertEquals(new Long(expected.size()), countTable.execute().collect().next().getField(0));
+
         String query =
                 "SELECT "
                         + "  h.rowkey, "
@@ -479,30 +527,17 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                         + " FROM "
                         + TEST_TABLE_3
                         + " AS h";
-        Iterator<Row> collected = tEnv.executeSql(query).collect();
+
+        TableResult tableResult3 = batchEnv.executeSql(query);
+        // wait to finish
+        tableResult3.getJobClient().get().getJobExecutionResult().get();
+
         List<String> result =
-                Lists.newArrayList(collected).stream()
+                Lists.newArrayList(tableResult3.collect()).stream()
                         .map(Row::toString)
                         .sorted()
                         .collect(Collectors.toList());
 
-        List<String> expected = new ArrayList<>();
-        expected.add(
-                "+I[1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
-        expected.add(
-                "+I[2, 20, Hello-2, 200, 2.02, true, Welt-2, 2019-08-18T19:01, 2019-08-18, 19:01, 12345678.0002]");
-        expected.add(
-                "+I[3, 30, Hello-3, 300, 3.03, false, Welt-3, 2019-08-18T19:02, 2019-08-18, 19:02, 12345678.0003]");
-        expected.add(
-                "+I[4, 40, null, 400, 4.04, true, Welt-4, 2019-08-18T19:03, 2019-08-18, 19:03, 12345678.0004]");
-        expected.add(
-                "+I[5, 50, Hello-5, 500, 5.05, false, Welt-5, 2019-08-19T19:10, 2019-08-19, 19:10, 12345678.0005]");
-        expected.add(
-                "+I[6, 60, Hello-6, 600, 6.06, true, Welt-6, 2019-08-19T19:20, 2019-08-19, 19:20, 12345678.0006]");
-        expected.add(
-                "+I[7, 70, Hello-7, 700, 7.07, false, Welt-7, 2019-08-19T19:30, 2019-08-19, 19:30, 12345678.0007]");
-        expected.add(
-                "+I[8, 80, null, 800, 8.08, true, Welt-8, 2019-08-19T19:40, 2019-08-19, 19:40, 12345678.0008]");
         assertEquals(expected, result);
     }