You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/15 08:31:31 UTC
[flink] branch master updated: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c27fd8dc72c [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
c27fd8dc72c is described below
commit c27fd8dc72ceac7631b5f7482db1e9a14b339f68
Author: lincoln lee <li...@gmail.com>
AuthorDate: Mon May 16 12:30:38 2022 +0800
[FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
This closes #19759
---
.../generated/execution_config_configuration.html | 6 +++
.../table/api/config/ExecutionConfigOptions.java | 25 ++++++++++++
.../plan/nodes/exec/batch/BatchExecLookupJoin.java | 1 +
.../nodes/exec/common/CommonExecLookupJoin.java | 22 ++++++++--
.../nodes/exec/stream/StreamExecLookupJoin.java | 4 ++
.../physical/stream/StreamPhysicalLookupJoin.scala | 3 +-
.../factories/TestValuesRuntimeFunctions.java | 11 ++++-
.../testJoinTemporalTable.out | 1 +
...testJoinTemporalTableWithProjectionPushDown.out | 1 +
.../runtime/stream/sql/AsyncLookupJoinITCase.scala | 20 ++++++---
.../operators/join/AsyncLookupJoinHarnessTest.java | 47 ++++++++++++++++++----
11 files changed, 123 insertions(+), 18 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 364d9bf9d81..d2974df5ac6 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -14,6 +14,12 @@
<td>Integer</td>
<td>The max number of async i/o operation that the async lookup join can trigger.</td>
</tr>
+ <tr>
+ <td><h5>table.exec.async-lookup.output-mode</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+ <td style="word-wrap: break-word;">ORDERED</td>
+ <td><p>Enum</p></td>
+ <td>Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used.<br /><br />Possible values:<ul><li>"ORDERED"</li><li>"ALLOW_UNORDERED"</li></ul></td>
+ </tr>
<tr>
<td><h5>table.exec.async-lookup.timeout</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">3 min</td>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 9aa8642dee4..474fc4f9c24 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -323,6 +323,16 @@ public class ExecutionConfigOptions {
.withDescription(
"The async timeout for the asynchronous operation to complete.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+ public static final ConfigOption<AsyncOutputMode> TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE =
+ key("table.exec.async-lookup.output-mode")
+ .enumType(AsyncOutputMode.class)
+ .defaultValue(AsyncOutputMode.ORDERED)
+ .withDescription(
+ "Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. "
+ + "If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not "
+ + "affect the correctness of the result, otherwise ORDERED will be still used.");
+
// ------------------------------------------------------------------------
// MiniBatch Options
// ------------------------------------------------------------------------
@@ -573,6 +583,21 @@ public class ExecutionConfigOptions {
FORCE
}
+ /** Output mode for asynchronous operations, equivalent to {@see AsyncDataStream.OutputMode}. */
+ @PublicEvolving
+ public enum AsyncOutputMode {
+
+ /** Ordered output mode, equivalent to {@see AsyncDataStream.OutputMode.ORDERED}. */
+ ORDERED,
+
+ /**
+ * Allow unordered output mode, will attempt to use {@see
+ * AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the
+ * result, otherwise ORDERED will be still used.
+ */
+ ALLOW_UNORDERED
+ }
+
/** Determine if CAST operates using the legacy behaviour or the new one. */
@Deprecated
public enum LegacyCastBehaviour implements DescribedEnum {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
index c9f9180941f..7d02498013a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
@@ -59,6 +59,7 @@ public class BatchExecLookupJoin extends CommonExecLookupJoin implements BatchEx
lookupKeys,
projectionOnTemporalTable,
filterOnTemporalTable,
+ true,
Collections.singletonList(inputProperty),
outputType,
description);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index fb2b7619be2..a888cb292f7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -149,6 +149,8 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
"projectionOnTemporalTable";
public static final String FIELD_NAME_FILTER_ON_TEMPORAL_TABLE = "filterOnTemporalTable";
+ public static final String FIELD_NAME_INPUT_INSERT_ONLY = "inputInsertOnly";
+
@JsonProperty(FIELD_NAME_JOIN_TYPE)
private final FlinkJoinType joinType;
@@ -172,6 +174,9 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_JOIN_CONDITION)
private final @Nullable RexNode joinCondition;
+ @JsonProperty(FIELD_NAME_INPUT_INSERT_ONLY)
+ private final boolean inputInsertOnly;
+
protected CommonExecLookupJoin(
int id,
ExecNodeContext context,
@@ -183,6 +188,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
@Nullable List<RexNode> projectionOnTemporalTable,
@Nullable RexNode filterOnTemporalTable,
+ boolean inputInsertOnly,
List<InputProperty> inputProperties,
RowType outputType,
String description) {
@@ -194,6 +200,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
this.temporalTableSourceSpec = checkNotNull(temporalTableSourceSpec);
this.projectionOnTemporalTable = projectionOnTemporalTable;
this.filterOnTemporalTable = filterOnTemporalTable;
+ this.inputInsertOnly = inputInsertOnly;
}
public TemporalTableSourceSpec getTemporalTableSourceSpec() {
@@ -316,6 +323,8 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY);
long asyncTimeout =
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT).toMillis();
+ ExecutionConfigOptions.AsyncOutputMode asyncOutputMode =
+ config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE);
DataTypeFactory dataTypeFactory =
ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
@@ -388,10 +397,17 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
asyncBufferCapacity);
}
- // force ORDERED output mode currently, optimize it to UNORDERED
- // when the downstream do not need orderness
return new AsyncWaitOperatorFactory<>(
- asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED);
+ asyncFunc, asyncTimeout, asyncBufferCapacity, convert(asyncOutputMode));
+ }
+
+ private AsyncDataStream.OutputMode convert(
+ ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+ if (inputInsertOnly
+ && asyncOutputMode == ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED) {
+ return AsyncDataStream.OutputMode.UNORDERED;
+ }
+ return AsyncDataStream.OutputMode.ORDERED;
}
private StreamOperatorFactory<RowData> createSyncLookupJoin(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
index 89549588827..f54b06e7c60 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
@@ -57,6 +57,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
@Nullable List<RexNode> projectionOnTemporalTable,
@Nullable RexNode filterOnTemporalTable,
+ boolean inputInsertOnly,
InputProperty inputProperty,
RowType outputType,
String description) {
@@ -70,6 +71,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
lookupKeys,
projectionOnTemporalTable,
filterOnTemporalTable,
+ inputInsertOnly,
Collections.singletonList(inputProperty),
outputType,
description);
@@ -89,6 +91,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
List<RexNode> projectionOnTemporalTable,
@JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE) @Nullable
RexNode filterOnTemporalTable,
+ @JsonProperty(FIELD_NAME_INPUT_INSERT_ONLY) @Nullable Boolean inputInsertOnly,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
@@ -102,6 +105,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
lookupKeys,
projectionOnTemporalTable,
filterOnTemporalTable,
+ inputInsertOnly,
inputProperties,
outputType,
description);
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
index 3d797f74d6a..d2d03841b4f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
-import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil}
+import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, FlinkRexUtil, JoinTypeUtil}
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
@@ -83,6 +83,7 @@ class StreamPhysicalLookupJoin(
allLookupKeys.map(item => (Int.box(item._1), item._2)).asJava,
projectionOnTemporalTable,
filterOnTemporalTable,
+ ChangelogPlanUtils.inputInsertOnly(this),
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index 66621dc06e0..284d8602984 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -594,18 +595,21 @@ final class TestValuesRuntimeFunctions {
private static final long serialVersionUID = 1L;
private final Map<Row, List<Row>> mapping;
+ private final Random random;
private transient boolean isOpenCalled = false;
private transient ExecutorService executor;
protected AsyncTestValueLookupFunction(Map<Row, List<Row>> mapping) {
this.mapping = mapping;
+ this.random = new Random();
}
@Override
public void open(FunctionContext context) throws Exception {
RESOURCE_COUNTER.incrementAndGet();
isOpenCalled = true;
- executor = Executors.newSingleThreadExecutor();
+ // generate unordered result for async lookup
+ executor = Executors.newFixedThreadPool(2);
}
public void eval(CompletableFuture<Collection<Row>> resultFuture, Object... inputs) {
@@ -619,6 +623,11 @@ final class TestValuesRuntimeFunctions {
}
CompletableFuture.supplyAsync(
() -> {
+ try {
+ Thread.sleep(random.nextInt(5));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
List<Row> list = mapping.get(key);
if (list == null) {
return Collections.<Row>emptyList();
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
index cba8ae4894a..b232637a236 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
@@ -258,6 +258,7 @@
},
"projectionOnTemporalTable" : null,
"filterOnTemporalTable" : null,
+ "inputInsertOnly" : true,
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
index c3012c79471..fc980f3b3c0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
@@ -262,6 +262,7 @@
"type" : "INT"
} ],
"filterOnTemporalTable" : null,
+ "inputInsertOnly" : true,
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 5bb390596ea..94858311ed6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -20,6 +20,8 @@ package org.apache.flink.table.planner.runtime.stream.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{TableSchema, Types}
import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions.AsyncOutputMode
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, StreamingWithStateTestBase, TestingAppendSink, TestingRetractSink}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
@@ -41,7 +43,8 @@ import scala.collection.JavaConversions._
class AsyncLookupJoinITCase(
legacyTableSource: Boolean,
backend: StateBackendMode,
- objectReuse: Boolean)
+ objectReuse: Boolean,
+ asyncOutputMode: AsyncOutputMode)
extends StreamingWithStateTestBase(backend) {
val data = List(
@@ -62,6 +65,8 @@ class AsyncLookupJoinITCase(
env.getConfig.disableObjectReuse()
}
+ tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE, asyncOutputMode)
+
createScanTable("src", data)
createLookupTable("user_table", userData)
}
@@ -303,13 +308,16 @@ class AsyncLookupJoinITCase(
}
object AsyncLookupJoinITCase {
- @Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}")
+ @Parameterized.Parameters(
+ name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, AsyncOutputMode={3}")
def parameters(): JCollection[Array[Object]] = {
Seq[Array[AnyRef]](
- Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE),
- Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE),
- Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE),
- Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE)
+ Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED),
+ Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED),
+ Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED),
+ Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ORDERED),
+ Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.FALSE, AsyncOutputMode.ALLOW_UNORDERED),
+ Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED)
)
}
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java
index d3a2952be75..6c70e6b54cf 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java
@@ -51,6 +51,8 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
@@ -59,6 +61,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -71,11 +74,19 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
/** Harness tests for {@link LookupJoinRunner} and {@link LookupJoinWithCalcRunner}. */
+@RunWith(Parameterized.class)
public class AsyncLookupJoinHarnessTest {
private static final int ASYNC_BUFFER_CAPACITY = 100;
private static final int ASYNC_TIMEOUT_MS = 3000;
+ @Parameterized.Parameter public boolean orderedResult;
+
+ @Parameterized.Parameters(name = "ordered result = {0}")
+ public static Object[] parameters() {
+ return new Object[][] {new Object[] {true}, new Object[] {false}};
+ }
+
private final TypeSerializer<RowData> inSerializer =
new RowDataSerializer(
DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType());
@@ -130,7 +141,7 @@ public class AsyncLookupJoinHarnessTest {
expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
- assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ checkResult(expectedOutput, testHarness.getOutput());
}
@Test
@@ -159,7 +170,7 @@ public class AsyncLookupJoinHarnessTest {
expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
- assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ checkResult(expectedOutput, testHarness.getOutput());
}
@Test
@@ -191,7 +202,7 @@ public class AsyncLookupJoinHarnessTest {
expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
expectedOutput.add(insertRecord(5, "e", null, null));
- assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ checkResult(expectedOutput, testHarness.getOutput());
}
@Test
@@ -222,11 +233,19 @@ public class AsyncLookupJoinHarnessTest {
expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
expectedOutput.add(insertRecord(5, "e", null, null));
- assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ checkResult(expectedOutput, testHarness.getOutput());
}
// ---------------------------------------------------------------------------------
+ private void checkResult(Collection<Object> expectedOutput, Collection<Object> actualOutput) {
+ if (orderedResult) {
+ assertor.assertOutputEquals("output wrong.", expectedOutput, actualOutput);
+ } else {
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput);
+ }
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(
JoinType joinType, FilterOnTable filterOnTable) throws Exception {
@@ -258,7 +277,9 @@ public class AsyncLookupJoinHarnessTest {
joinRunner,
ASYNC_TIMEOUT_MS,
ASYNC_BUFFER_CAPACITY,
- AsyncDataStream.OutputMode.ORDERED),
+ orderedResult
+ ? AsyncDataStream.OutputMode.ORDERED
+ : AsyncDataStream.OutputMode.UNORDERED),
inSerializer);
}
@@ -319,6 +340,8 @@ public class AsyncLookupJoinHarnessTest {
private static final Map<Integer, List<RowData>> data = new HashMap<>();
+ private final Random random = new Random();
+
static {
data.put(1, Collections.singletonList(GenericRowData.of(1, fromString("Julian"))));
data.put(
@@ -334,7 +357,8 @@ public class AsyncLookupJoinHarnessTest {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- this.executor = Executors.newSingleThreadExecutor();
+ // generate unordered result for async lookup
+ this.executor = Executors.newFixedThreadPool(2);
}
@Override
@@ -342,7 +366,16 @@ public class AsyncLookupJoinHarnessTest {
throws Exception {
int id = input.getInt(0);
CompletableFuture.supplyAsync(
- (Supplier<Collection<RowData>>) () -> data.get(id), executor)
+ (Supplier<Collection<RowData>>)
+ () -> {
+ try {
+ Thread.sleep(random.nextInt(5));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return data.get(id);
+ },
+ executor)
.thenAcceptAsync(resultFuture::complete, executor);
}