You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/24 07:33:10 UTC
[flink-table-store] branch master updated: [FLINK-26834] Introduce BlockingIterator to help testing
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 210e96f [FLINK-26834] Introduce BlockingIterator to help testing
210e96f is described below
commit 210e96f8d2c46b3474df635c7602d9aedc9645d3
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Mar 24 15:33:05 2022 +0800
[FLINK-26834] Introduce BlockingIterator to help testing
This closes #60
---
.../store/connector/ContinuousFileStoreITCase.java | 77 ++++++-------
.../table/store/connector/FileStoreITCase.java | 35 ++----
.../store/connector/sink/LogStoreSinkITCase.java | 17 +--
.../table/store/file/utils/BlockingIterator.java | 119 +++++++++++++++++++++
.../flink/table/store/kafka/KafkaLogITCase.java | 7 +-
5 files changed, 175 insertions(+), 80 deletions(-)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index d2187d4..8c10ab9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -22,19 +22,17 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
@@ -68,80 +66,87 @@ public class ContinuousFileStoreITCase extends AbstractTestBase {
}
@Test
- public void testWithoutPrimaryKey() throws ExecutionException, InterruptedException {
+ public void testWithoutPrimaryKey() throws Exception {
testSimple("T1");
}
@Test
- public void testWithPrimaryKey() throws ExecutionException, InterruptedException {
+ public void testWithPrimaryKey() throws Exception {
testSimple("T2");
}
@Test
- public void testProjectionWithoutPrimaryKey() throws ExecutionException, InterruptedException {
+ public void testProjectionWithoutPrimaryKey() throws Exception {
testProjection("T1");
}
@Test
- public void testProjectionWithPrimaryKey() throws ExecutionException, InterruptedException {
+ public void testProjectionWithPrimaryKey() throws Exception {
testProjection("T2");
}
- private void testSimple(String table) throws ExecutionException, InterruptedException {
- CloseableIterator<Row> iterator = sEnv.executeSql("SELECT * FROM " + table).collect();
+ private void testSimple(String table)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(sEnv.executeSql("SELECT * FROM " + table).collect());
bEnv.executeSql(
String.format(
"INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table))
.await();
- assertThat(collectFromUnbounded(iterator, 2))
+ assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", table)).await();
- assertThat(collectFromUnbounded(iterator, 1))
- .containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
}
- private void testProjection(String table) throws ExecutionException, InterruptedException {
- CloseableIterator<Row> iterator = sEnv.executeSql("SELECT b, c FROM " + table).collect();
+ private void testProjection(String table)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(sEnv.executeSql("SELECT b, c FROM " + table).collect());
bEnv.executeSql(
String.format(
"INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table))
.await();
- assertThat(collectFromUnbounded(iterator, 2))
+ assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("2", "3"), Row.of("5", "6"));
bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", table)).await();
- assertThat(collectFromUnbounded(iterator, 1)).containsExactlyInAnyOrder(Row.of("8", "9"));
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("8", "9"));
}
@Test
- public void testContinuousLatest() throws ExecutionException, InterruptedException {
+ public void testContinuousLatest()
+ throws ExecutionException, InterruptedException, TimeoutException {
bEnv.executeSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')").await();
- CloseableIterator<Row> iterator =
- sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */").collect();
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */")
+ .collect());
bEnv.executeSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')").await();
- assertThat(collectFromUnbounded(iterator, 2))
+ assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", "11", "12"));
}
@Test
- public void testIgnoreOverwrite() throws ExecutionException, InterruptedException {
- CloseableIterator<Row> iterator = sEnv.executeSql("SELECT * FROM T1").collect();
+ public void testIgnoreOverwrite()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(sEnv.executeSql("SELECT * FROM T1").collect());
bEnv.executeSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')").await();
- assertThat(collectFromUnbounded(iterator, 2))
+ assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
// should ignore this overwrite
bEnv.executeSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')").await();
bEnv.executeSql("INSERT INTO T1 VALUES ('9', '10', '11')").await();
- assertThat(collectFromUnbounded(iterator, 1))
- .containsExactlyInAnyOrder(Row.of("9", "10", "11"));
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", "10", "11"));
}
@Test
@@ -174,24 +179,4 @@ public class ContinuousFileStoreITCase extends AbstractTestBase {
"File store continuous reading dose not support from_timestamp scan mode, "
+ "you can add timestamp filters instead.");
}
-
- private List<Row> collectFromUnbounded(CloseableIterator<Row> iterator, int numElements) {
- if (numElements == 0) {
- return Collections.emptyList();
- }
-
- List<Row> result = new ArrayList<>();
- while (iterator.hasNext()) {
- result.add(iterator.next());
-
- if (result.size() == numElements) {
- return result;
- }
- }
-
- throw new IllegalArgumentException(
- String.format(
- "The stream ended before reaching the requested %d records. Only %d records were received.",
- numElements, result.size()));
- }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index ed4c493..07bc759 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.connector.sink.StoreSink;
import org.apache.flink.table.store.connector.source.FileStoreSource;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -276,8 +277,13 @@ public class FileStoreITCase extends AbstractTestBase {
private void innerTestContinuous() throws Exception {
Assume.assumeFalse(isBatch);
- CloseableIterator<RowData> iterator =
- store.sourceBuilder().withContinuousMode(true).build(env).executeAndCollect();
+ BlockingIterator<RowData, Row> iterator =
+ BlockingIterator.of(
+ store.sourceBuilder()
+ .withContinuousMode(true)
+ .build(env)
+ .executeAndCollect(),
+ CONVERTER::toExternal);
Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
sinkAndValidate(
@@ -296,7 +302,7 @@ public class FileStoreITCase extends AbstractTestBase {
}
private void sinkAndValidate(
- List<RowData> src, CloseableIterator<RowData> iterator, Row... expected)
+ List<RowData> src, BlockingIterator<RowData, Row> iterator, Row... expected)
throws Exception {
if (isBatch) {
throw new UnsupportedOperationException();
@@ -305,34 +311,13 @@ public class FileStoreITCase extends AbstractTestBase {
env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE));
store.sinkBuilder().withInput(source).build();
env.execute();
- assertThat(collectFromUnbounded(iterator, expected.length))
- .containsExactlyInAnyOrder(expected);
+ assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected);
}
private static RowData srcRow(RowKind kind, int v, String p, int k) {
return wrap(GenericRowData.ofKind(kind, v, StringData.fromString(p), k));
}
- private List<Row> collectFromUnbounded(CloseableIterator<RowData> iterator, int numElements) {
- if (numElements == 0) {
- return Collections.emptyList();
- }
-
- List<Row> result = new ArrayList<>();
- while (iterator.hasNext()) {
- result.add(CONVERTER.toExternal(iterator.next()));
-
- if (result.size() == numElements) {
- return result;
- }
- }
-
- throw new IllegalArgumentException(
- String.format(
- "The stream ended before reaching the requested %d records. Only %d records were received.",
- numElements, result.size()));
- }
-
public static StreamExecutionEnvironment buildStreamEnv() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 15e7be3..a58f517 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
@@ -34,7 +35,6 @@ import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.table.store.connector.FileStoreITCase.CONVERTER;
@@ -159,12 +159,15 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
assertThat(results).containsExactlyInAnyOrder(expected);
- results =
- store.sourceBuilder().withContinuousMode(true)
- .withLogSourceProvider(sourceProvider).build(buildStreamEnv())
- .executeAndCollect(expected.length).stream()
- .map(CONVERTER::toExternal)
- .collect(Collectors.toList());
+ BlockingIterator<RowData, Row> iterator =
+ BlockingIterator.of(
+ store.sourceBuilder()
+ .withContinuousMode(true)
+ .withLogSourceProvider(sourceProvider)
+ .build(buildStreamEnv())
+ .executeAndCollect(),
+ CONVERTER::toExternal);
+ results = iterator.collectAndClose(expected.length);
assertThat(results).containsExactlyInAnyOrder(expected);
} finally {
factory.onDropTable(context, true);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
new file mode 100644
index 0000000..7e973aa
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+/** Provides the ability to bring timeout to blocking iterators. */
+public class BlockingIterator<IN, OUT> implements AutoCloseable {
+
+ /**
+ * A static cached {@link ExecutorService}. We don't limit the number of threads since the work
+ * inside is I/O type.
+ */
+ private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();
+
+ private final Iterator<IN> iterator;
+
+ private final Function<IN, OUT> converter;
+
+ public BlockingIterator(Iterator<IN> iterator, Function<IN, OUT> converter) {
+ this.iterator = iterator;
+ this.converter = converter;
+ }
+
+ public static <T> BlockingIterator<T, T> of(Iterator<T> iterator) {
+ return new BlockingIterator<>(iterator, t -> t);
+ }
+
+ public static <IN, OUT> BlockingIterator<IN, OUT> of(
+ Iterator<IN> iterator, Function<IN, OUT> converter) {
+ return new BlockingIterator<>(iterator, converter);
+ }
+
+ public List<OUT> collectAndClose(int limit) throws Exception {
+ try {
+ return collect(limit);
+ } finally {
+ close();
+ }
+ }
+
+ public List<OUT> collect() throws Exception {
+ return collect(Integer.MAX_VALUE);
+ }
+
+ public List<OUT> collect(int limit) throws TimeoutException {
+ return collect(limit, 1, TimeUnit.MINUTES);
+ }
+
+ public List<OUT> collect(int limit, long timeout, TimeUnit unit) throws TimeoutException {
+ Future<List<OUT>> future = EXECUTOR.submit(() -> doCollect(limit));
+ try {
+ return future.get(timeout, unit);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ throw new TimeoutException(
+ String.format("Cannot collect %s records in %s %s", limit, timeout, unit));
+ }
+ }
+
+ private List<OUT> doCollect(int limit) {
+ if (limit == 0) {
+ return Collections.emptyList();
+ }
+
+ List<OUT> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(converter.apply(iterator.next()));
+
+ if (result.size() == limit) {
+ return result;
+ }
+ }
+
+ if (limit != Integer.MAX_VALUE) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The stream ended before reaching the requested %d records. Only %d records were received.",
+ limit, result.size()));
+ }
+
+ return result;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (this.iterator instanceof AutoCloseable) {
+ ((AutoCloseable) this.iterator).close();
+ }
+ }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index ab05fa7..bb8a788 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
import org.apache.flink.table.store.log.LogOptions.LogConsistency;
import org.apache.flink.types.RowKind;
@@ -216,8 +217,10 @@ public class KafkaLogITCase extends KafkaTableTestBase {
private List<RowData> collect(KafkaSource<RowData> source, int numRecord) throws Exception {
List<RowData> records =
- env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
- .executeAndCollect(numRecord);
+ BlockingIterator.of(
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ .executeAndCollect())
+ .collectAndClose(numRecord);
records.sort(Comparator.comparingInt(o -> o.getInt(0)));
return records;
}