You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/24 07:33:10 UTC

[flink-table-store] branch master updated: [FLINK-26834] Introduce BlockingIterator to help testing

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 210e96f  [FLINK-26834] Introduce BlockingIterator to help testing
210e96f is described below

commit 210e96f8d2c46b3474df635c7602d9aedc9645d3
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Mar 24 15:33:05 2022 +0800

    [FLINK-26834] Introduce BlockingIterator to help testing
    
    This closes #60
---
 .../store/connector/ContinuousFileStoreITCase.java |  77 ++++++-------
 .../table/store/connector/FileStoreITCase.java     |  35 ++----
 .../store/connector/sink/LogStoreSinkITCase.java   |  17 +--
 .../table/store/file/utils/BlockingIterator.java   | 119 +++++++++++++++++++++
 .../flink/table/store/kafka/KafkaLogITCase.java    |   7 +-
 5 files changed, 175 insertions(+), 80 deletions(-)

diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index d2187d4..8c10ab9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -22,19 +22,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
 
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
@@ -68,80 +66,87 @@ public class ContinuousFileStoreITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testWithoutPrimaryKey() throws ExecutionException, InterruptedException {
+    public void testWithoutPrimaryKey() throws Exception {
         testSimple("T1");
     }
 
     @Test
-    public void testWithPrimaryKey() throws ExecutionException, InterruptedException {
+    public void testWithPrimaryKey() throws Exception {
         testSimple("T2");
     }
 
     @Test
-    public void testProjectionWithoutPrimaryKey() throws ExecutionException, InterruptedException {
+    public void testProjectionWithoutPrimaryKey() throws Exception {
         testProjection("T1");
     }
 
     @Test
-    public void testProjectionWithPrimaryKey() throws ExecutionException, InterruptedException {
+    public void testProjectionWithPrimaryKey() throws Exception {
         testProjection("T2");
     }
 
-    private void testSimple(String table) throws ExecutionException, InterruptedException {
-        CloseableIterator<Row> iterator = sEnv.executeSql("SELECT * FROM " + table).collect();
+    private void testSimple(String table)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(sEnv.executeSql("SELECT * FROM " + table).collect());
 
         bEnv.executeSql(
                         String.format(
                                 "INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table))
                 .await();
-        assertThat(collectFromUnbounded(iterator, 2))
+        assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
 
         bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", table)).await();
-        assertThat(collectFromUnbounded(iterator, 1))
-                .containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
     }
 
-    private void testProjection(String table) throws ExecutionException, InterruptedException {
-        CloseableIterator<Row> iterator = sEnv.executeSql("SELECT b, c FROM " + table).collect();
+    private void testProjection(String table)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(sEnv.executeSql("SELECT b, c FROM " + table).collect());
 
         bEnv.executeSql(
                         String.format(
                                 "INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table))
                 .await();
-        assertThat(collectFromUnbounded(iterator, 2))
+        assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("2", "3"), Row.of("5", "6"));
 
         bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", table)).await();
-        assertThat(collectFromUnbounded(iterator, 1)).containsExactlyInAnyOrder(Row.of("8", "9"));
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("8", "9"));
     }
 
     @Test
-    public void testContinuousLatest() throws ExecutionException, InterruptedException {
+    public void testContinuousLatest()
+            throws ExecutionException, InterruptedException, TimeoutException {
         bEnv.executeSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')").await();
 
-        CloseableIterator<Row> iterator =
-                sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */").collect();
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */")
+                                .collect());
 
         bEnv.executeSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')").await();
-        assertThat(collectFromUnbounded(iterator, 2))
+        assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", "11", "12"));
     }
 
     @Test
-    public void testIgnoreOverwrite() throws ExecutionException, InterruptedException {
-        CloseableIterator<Row> iterator = sEnv.executeSql("SELECT * FROM T1").collect();
+    public void testIgnoreOverwrite()
+            throws ExecutionException, InterruptedException, TimeoutException {
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(sEnv.executeSql("SELECT * FROM T1").collect());
 
         bEnv.executeSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')").await();
-        assertThat(collectFromUnbounded(iterator, 2))
+        assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
 
         // should ignore this overwrite
         bEnv.executeSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')").await();
 
         bEnv.executeSql("INSERT INTO T1 VALUES ('9', '10', '11')").await();
-        assertThat(collectFromUnbounded(iterator, 1))
-                .containsExactlyInAnyOrder(Row.of("9", "10", "11"));
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", "10", "11"));
     }
 
     @Test
@@ -174,24 +179,4 @@ public class ContinuousFileStoreITCase extends AbstractTestBase {
                 "File store continuous reading dose not support from_timestamp scan mode, "
                         + "you can add timestamp filters instead.");
     }
-
-    private List<Row> collectFromUnbounded(CloseableIterator<Row> iterator, int numElements) {
-        if (numElements == 0) {
-            return Collections.emptyList();
-        }
-
-        List<Row> result = new ArrayList<>();
-        while (iterator.hasNext()) {
-            result.add(iterator.next());
-
-            if (result.size() == numElements) {
-                return result;
-            }
-        }
-
-        throw new IllegalArgumentException(
-                String.format(
-                        "The stream ended before reaching the requested %d records. Only %d records were received.",
-                        numElements, result.size()));
-    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index ed4c493..07bc759 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.connector.sink.StoreSink;
 import org.apache.flink.table.store.connector.source.FileStoreSource;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -276,8 +277,13 @@ public class FileStoreITCase extends AbstractTestBase {
     private void innerTestContinuous() throws Exception {
         Assume.assumeFalse(isBatch);
 
-        CloseableIterator<RowData> iterator =
-                store.sourceBuilder().withContinuousMode(true).build(env).executeAndCollect();
+        BlockingIterator<RowData, Row> iterator =
+                BlockingIterator.of(
+                        store.sourceBuilder()
+                                .withContinuousMode(true)
+                                .build(env)
+                                .executeAndCollect(),
+                        CONVERTER::toExternal);
         Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
 
         sinkAndValidate(
@@ -296,7 +302,7 @@ public class FileStoreITCase extends AbstractTestBase {
     }
 
     private void sinkAndValidate(
-            List<RowData> src, CloseableIterator<RowData> iterator, Row... expected)
+            List<RowData> src, BlockingIterator<RowData, Row> iterator, Row... expected)
             throws Exception {
         if (isBatch) {
             throw new UnsupportedOperationException();
@@ -305,34 +311,13 @@ public class FileStoreITCase extends AbstractTestBase {
                 env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE));
         store.sinkBuilder().withInput(source).build();
         env.execute();
-        assertThat(collectFromUnbounded(iterator, expected.length))
-                .containsExactlyInAnyOrder(expected);
+        assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected);
     }
 
     private static RowData srcRow(RowKind kind, int v, String p, int k) {
         return wrap(GenericRowData.ofKind(kind, v, StringData.fromString(p), k));
     }
 
-    private List<Row> collectFromUnbounded(CloseableIterator<RowData> iterator, int numElements) {
-        if (numElements == 0) {
-            return Collections.emptyList();
-        }
-
-        List<Row> result = new ArrayList<>();
-        while (iterator.hasNext()) {
-            result.add(CONVERTER.toExternal(iterator.next()));
-
-            if (result.size() == numElements) {
-                return result;
-            }
-        }
-
-        throw new IllegalArgumentException(
-                String.format(
-                        "The stream ended before reaching the requested %d records. Only %d records were received.",
-                        numElements, result.size()));
-    }
-
     public static StreamExecutionEnvironment buildStreamEnv() {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 15e7be3..a58f517 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
 import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
 import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
@@ -34,7 +35,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.List;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.flink.table.store.connector.FileStoreITCase.CONVERTER;
@@ -159,12 +159,15 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
 
             assertThat(results).containsExactlyInAnyOrder(expected);
 
-            results =
-                    store.sourceBuilder().withContinuousMode(true)
-                            .withLogSourceProvider(sourceProvider).build(buildStreamEnv())
-                            .executeAndCollect(expected.length).stream()
-                            .map(CONVERTER::toExternal)
-                            .collect(Collectors.toList());
+            BlockingIterator<RowData, Row> iterator =
+                    BlockingIterator.of(
+                            store.sourceBuilder()
+                                    .withContinuousMode(true)
+                                    .withLogSourceProvider(sourceProvider)
+                                    .build(buildStreamEnv())
+                                    .executeAndCollect(),
+                            CONVERTER::toExternal);
+            results = iterator.collectAndClose(expected.length);
             assertThat(results).containsExactlyInAnyOrder(expected);
         } finally {
             factory.onDropTable(context, true);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
new file mode 100644
index 0000000..7e973aa
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+/** Provides the ability to bring timeout to blocking iterators. */
+public class BlockingIterator<IN, OUT> implements AutoCloseable {
+
+    /**
+     * A static cached {@link ExecutorService}. We don't limit the number of threads since the work
+     * inside is I/O type.
+     */
+    private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
+
+    private final Iterator<IN> iterator;
+
+    private final Function<IN, OUT> converter;
+
+    public BlockingIterator(Iterator<IN> iterator, Function<IN, OUT> converter) {
+        this.iterator = iterator;
+        this.converter = converter;
+    }
+
+    public static <T> BlockingIterator<T, T> of(Iterator<T> iterator) {
+        return new BlockingIterator<>(iterator, t -> t);
+    }
+
+    public static <IN, OUT> BlockingIterator<IN, OUT> of(
+            Iterator<IN> iterator, Function<IN, OUT> converter) {
+        return new BlockingIterator<>(iterator, converter);
+    }
+
+    public List<OUT> collectAndClose(int limit) throws Exception {
+        try {
+            return collect(limit);
+        } finally {
+            close();
+        }
+    }
+
+    public List<OUT> collect() throws Exception {
+        return collect(Integer.MAX_VALUE);
+    }
+
+    public List<OUT> collect(int limit) throws TimeoutException {
+        return collect(limit, 1, TimeUnit.MINUTES);
+    }
+
+    public List<OUT> collect(int limit, long timeout, TimeUnit unit) throws TimeoutException {
+        Future<List<OUT>> future = EXECUTOR.submit(() -> doCollect(limit));
+        try {
+            return future.get(timeout, unit);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        } catch (TimeoutException e) {
+            future.cancel(true);
+            throw new TimeoutException(
+                    String.format("Cannot collect %s records in %s %s", limit, timeout, unit));
+        }
+    }
+
+    private List<OUT> doCollect(int limit) {
+        if (limit == 0) {
+            return Collections.emptyList();
+        }
+
+        List<OUT> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(converter.apply(iterator.next()));
+
+            if (result.size() == limit) {
+                return result;
+            }
+        }
+
+        if (limit != Integer.MAX_VALUE) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The stream ended before reaching the requested %d records. Only %d records were received.",
+                            limit, result.size()));
+        }
+
+        return result;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (this.iterator instanceof AutoCloseable) {
+            ((AutoCloseable) this.iterator).close();
+        }
+    }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index ab05fa7..bb8a788 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
 import org.apache.flink.table.store.log.LogOptions.LogConsistency;
 import org.apache.flink.types.RowKind;
@@ -216,8 +217,10 @@ public class KafkaLogITCase extends KafkaTableTestBase {
 
     private List<RowData> collect(KafkaSource<RowData> source, int numRecord) throws Exception {
         List<RowData> records =
-                env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
-                        .executeAndCollect(numRecord);
+                BlockingIterator.of(
+                                env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+                                        .executeAndCollect())
+                        .collectAndClose(numRecord);
         records.sort(Comparator.comparingInt(o -> o.getInt(0)));
         return records;
     }