You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/11/14 21:14:20 UTC

[iceberg] branch master updated: Flink: Fix and enable TestFlinkIcebergSink#testTwoSinksInDisjointedDAG (#3514)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d102f5  Flink: Fix and enable TestFlinkIcebergSink#testTwoSinksInDisjointedDAG (#3514)
2d102f5 is described below

commit 2d102f56bd74f3f5cd5b1111636a4b093b4bceb2
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Sun Nov 14 13:14:08 2021 -0800

    Flink: Fix and enable TestFlinkIcebergSink#testTwoSinksInDisjointedDAG (#3514)
    
    The purpose of this test method is that we can construct two Iceberg sinks in the same job/DAG with different UIDs. The custom source BoundedTestSource is not needed for this purpose.
---
 .../java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java    | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 99e8b75..04b0850 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -49,7 +49,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -265,7 +264,6 @@ public class TestFlinkIcebergSink {
   }
 
   @Test
-  @Ignore  // Ignored as one DAG completing first can cause an infinite checkpoint loop in the other and CI timeouts
   public void testTwoSinksInDisjointedDAG() throws Exception {
     Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
 
@@ -287,7 +285,7 @@ public class TestFlinkIcebergSink {
     env.getConfig().disableAutoGeneratedUIDs();
 
     List<Row> leftRows = createRows("left-");
-    DataStream<Row> leftStream = env.addSource(createBoundedSource(leftRows), ROW_TYPE_INFO)
+    DataStream<Row> leftStream = env.fromCollection(leftRows, ROW_TYPE_INFO)
         .name("leftCustomSource")
         .uid("leftCustomSource");
     FlinkSink.forRow(leftStream, SimpleDataUtil.FLINK_SCHEMA)
@@ -299,7 +297,7 @@ public class TestFlinkIcebergSink {
         .append();
 
     List<Row> rightRows = createRows("right-");
-    DataStream<Row> rightStream = env.addSource(createBoundedSource(rightRows), ROW_TYPE_INFO)
+    DataStream<Row> rightStream = env.fromCollection(rightRows, ROW_TYPE_INFO)
         .name("rightCustomSource")
         .uid("rightCustomSource");
     FlinkSink.forRow(rightStream, SimpleDataUtil.FLINK_SCHEMA)