You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/15 08:31:31 UTC

[flink] branch master updated: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c27fd8dc72c [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
c27fd8dc72c is described below

commit c27fd8dc72ceac7631b5f7482db1e9a14b339f68
Author: lincoln lee <li...@gmail.com>
AuthorDate: Mon May 16 12:30:38 2022 +0800

    [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
    
    This closes #19759
---
 .../generated/execution_config_configuration.html  |  6 +++
 .../table/api/config/ExecutionConfigOptions.java   | 25 ++++++++++++
 .../plan/nodes/exec/batch/BatchExecLookupJoin.java |  1 +
 .../nodes/exec/common/CommonExecLookupJoin.java    | 22 ++++++++--
 .../nodes/exec/stream/StreamExecLookupJoin.java    |  4 ++
 .../physical/stream/StreamPhysicalLookupJoin.scala |  3 +-
 .../factories/TestValuesRuntimeFunctions.java      | 11 ++++-
 .../testJoinTemporalTable.out                      |  1 +
 ...testJoinTemporalTableWithProjectionPushDown.out |  1 +
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala | 20 ++++++---
 .../operators/join/AsyncLookupJoinHarnessTest.java | 47 ++++++++++++++++++----
 11 files changed, 123 insertions(+), 18 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 364d9bf9d81..d2974df5ac6 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -14,6 +14,12 @@
             <td>Integer</td>
             <td>The max number of async i/o operation that the async lookup join can trigger.</td>
         </tr>
+        <tr>
+            <td><h5>table.exec.async-lookup.output-mode</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">ORDERED</td>
+            <td><p>Enum</p></td>
+            <td>Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used.<br /><br />Possible values:<ul><li>"ORDERED"</li><li>"ALLOW_UNORDERED"</li></ul></td>
+        </tr>
         <tr>
             <td><h5>table.exec.async-lookup.timeout</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">3 min</td>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 9aa8642dee4..474fc4f9c24 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -323,6 +323,16 @@ public class ExecutionConfigOptions {
                     .withDescription(
                             "The async timeout for the asynchronous operation to complete.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<AsyncOutputMode> TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE =
+            key("table.exec.async-lookup.output-mode")
+                    .enumType(AsyncOutputMode.class)
+                    .defaultValue(AsyncOutputMode.ORDERED)
+                    .withDescription(
+                            "Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. "
+                                    + "If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not "
+                                    + "affect the correctness of the result, otherwise ORDERED will be still used.");
+
     // ------------------------------------------------------------------------
     //  MiniBatch Options
     // ------------------------------------------------------------------------
@@ -573,6 +583,21 @@ public class ExecutionConfigOptions {
         FORCE
     }
 
+    /** Output mode for asynchronous operations, equivalent to {@see AsyncDataStream.OutputMode}. */
+    @PublicEvolving
+    public enum AsyncOutputMode {
+
+        /** Ordered output mode, equivalent to {@see AsyncDataStream.OutputMode.ORDERED}. */
+        ORDERED,
+
+        /**
+         * Allow unordered output mode, will attempt to use {@see
+         * AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the
+         * result, otherwise ORDERED will be still used.
+         */
+        ALLOW_UNORDERED
+    }
+
     /** Determine if CAST operates using the legacy behaviour or the new one. */
     @Deprecated
     public enum LegacyCastBehaviour implements DescribedEnum {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
index c9f9180941f..7d02498013a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
@@ -59,6 +59,7 @@ public class BatchExecLookupJoin extends CommonExecLookupJoin implements BatchEx
                 lookupKeys,
                 projectionOnTemporalTable,
                 filterOnTemporalTable,
+                true,
                 Collections.singletonList(inputProperty),
                 outputType,
                 description);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index fb2b7619be2..a888cb292f7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -149,6 +149,8 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
             "projectionOnTemporalTable";
     public static final String FIELD_NAME_FILTER_ON_TEMPORAL_TABLE = "filterOnTemporalTable";
 
+    public static final String FIELD_NAME_INPUT_INSERT_ONLY = "inputInsertOnly";
+
     @JsonProperty(FIELD_NAME_JOIN_TYPE)
     private final FlinkJoinType joinType;
 
@@ -172,6 +174,9 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
     @JsonProperty(FIELD_NAME_JOIN_CONDITION)
     private final @Nullable RexNode joinCondition;
 
+    @JsonProperty(FIELD_NAME_INPUT_INSERT_ONLY)
+    private final boolean inputInsertOnly;
+
     protected CommonExecLookupJoin(
             int id,
             ExecNodeContext context,
@@ -183,6 +188,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
             Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
             @Nullable List<RexNode> projectionOnTemporalTable,
             @Nullable RexNode filterOnTemporalTable,
+            boolean inputInsertOnly,
             List<InputProperty> inputProperties,
             RowType outputType,
             String description) {
@@ -194,6 +200,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
         this.temporalTableSourceSpec = checkNotNull(temporalTableSourceSpec);
         this.projectionOnTemporalTable = projectionOnTemporalTable;
         this.filterOnTemporalTable = filterOnTemporalTable;
+        this.inputInsertOnly = inputInsertOnly;
     }
 
     public TemporalTableSourceSpec getTemporalTableSourceSpec() {
@@ -316,6 +323,8 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
                 config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY);
         long asyncTimeout =
                 config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT).toMillis();
+        ExecutionConfigOptions.AsyncOutputMode asyncOutputMode =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE);
 
         DataTypeFactory dataTypeFactory =
                 ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
@@ -388,10 +397,17 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
                             asyncBufferCapacity);
         }
 
-        // force ORDERED output mode currently, optimize it to UNORDERED
-        // when the downstream do not need orderness
         return new AsyncWaitOperatorFactory<>(
-                asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED);
+                asyncFunc, asyncTimeout, asyncBufferCapacity, convert(asyncOutputMode));
+    }
+
+    private AsyncDataStream.OutputMode convert(
+            ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+        if (inputInsertOnly
+                && asyncOutputMode == ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED) {
+            return AsyncDataStream.OutputMode.UNORDERED;
+        }
+        return AsyncDataStream.OutputMode.ORDERED;
     }
 
     private StreamOperatorFactory<RowData> createSyncLookupJoin(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
index 89549588827..f54b06e7c60 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
@@ -57,6 +57,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
             Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
             @Nullable List<RexNode> projectionOnTemporalTable,
             @Nullable RexNode filterOnTemporalTable,
+            boolean inputInsertOnly,
             InputProperty inputProperty,
             RowType outputType,
             String description) {
@@ -70,6 +71,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
                 lookupKeys,
                 projectionOnTemporalTable,
                 filterOnTemporalTable,
+                inputInsertOnly,
                 Collections.singletonList(inputProperty),
                 outputType,
                 description);
@@ -89,6 +91,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
                     List<RexNode> projectionOnTemporalTable,
             @JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE) @Nullable
                     RexNode filterOnTemporalTable,
+            @JsonProperty(FIELD_NAME_INPUT_INSERT_ONLY) @Nullable Boolean inputInsertOnly,
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
             @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
@@ -102,6 +105,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
                 lookupKeys,
                 projectionOnTemporalTable,
                 filterOnTemporalTable,
+                inputInsertOnly,
                 inputProperties,
                 outputType,
                 description);
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
index 3d797f74d6a..d2d03841b4f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin
 import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
-import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil}
+import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, FlinkRexUtil, JoinTypeUtil}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 
@@ -83,6 +83,7 @@ class StreamPhysicalLookupJoin(
       allLookupKeys.map(item => (Int.box(item._1), item._2)).asJava,
       projectionOnTemporalTable,
       filterOnTemporalTable,
+      ChangelogPlanUtils.inputInsertOnly(this),
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index 66621dc06e0..284d8602984 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -594,18 +595,21 @@ final class TestValuesRuntimeFunctions {
 
         private static final long serialVersionUID = 1L;
         private final Map<Row, List<Row>> mapping;
+        private final Random random;
         private transient boolean isOpenCalled = false;
         private transient ExecutorService executor;
 
         protected AsyncTestValueLookupFunction(Map<Row, List<Row>> mapping) {
             this.mapping = mapping;
+            this.random = new Random();
         }
 
         @Override
         public void open(FunctionContext context) throws Exception {
             RESOURCE_COUNTER.incrementAndGet();
             isOpenCalled = true;
-            executor = Executors.newSingleThreadExecutor();
+            // generate unordered result for async lookup
+            executor = Executors.newFixedThreadPool(2);
         }
 
         public void eval(CompletableFuture<Collection<Row>> resultFuture, Object... inputs) {
@@ -619,6 +623,11 @@ final class TestValuesRuntimeFunctions {
             }
             CompletableFuture.supplyAsync(
                             () -> {
+                                try {
+                                    Thread.sleep(random.nextInt(5));
+                                } catch (InterruptedException e) {
+                                    throw new RuntimeException(e);
+                                }
                                 List<Row> list = mapping.get(key);
                                 if (list == null) {
                                     return Collections.<Row>emptyList();
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
index cba8ae4894a..b232637a236 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
@@ -258,6 +258,7 @@
     },
     "projectionOnTemporalTable" : null,
     "filterOnTemporalTable" : null,
+    "inputInsertOnly" : true,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
index c3012c79471..fc980f3b3c0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
@@ -262,6 +262,7 @@
       "type" : "INT"
     } ],
     "filterOnTemporalTable" : null,
+    "inputInsertOnly" : true,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 5bb390596ea..94858311ed6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -20,6 +20,8 @@ package org.apache.flink.table.planner.runtime.stream.sql
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.{TableSchema, Types}
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions.AsyncOutputMode
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, StreamingWithStateTestBase, TestingAppendSink, TestingRetractSink}
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
@@ -41,7 +43,8 @@ import scala.collection.JavaConversions._
 class AsyncLookupJoinITCase(
     legacyTableSource: Boolean,
     backend: StateBackendMode,
-    objectReuse: Boolean)
+    objectReuse: Boolean,
+    asyncOutputMode: AsyncOutputMode)
   extends StreamingWithStateTestBase(backend) {
 
   val data = List(
@@ -62,6 +65,8 @@ class AsyncLookupJoinITCase(
       env.getConfig.disableObjectReuse()
     }
 
+    tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE, asyncOutputMode)
+
     createScanTable("src", data)
     createLookupTable("user_table", userData)
   }
@@ -303,13 +308,16 @@ class AsyncLookupJoinITCase(
 }
 
 object AsyncLookupJoinITCase {
-  @Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}")
+  @Parameterized.Parameters(
+    name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, AsyncOutputMode={3}")
   def parameters(): JCollection[Array[Object]] = {
     Seq[Array[AnyRef]](
-      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE),
-      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE)
+      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED),
+      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.FALSE, AsyncOutputMode.ALLOW_UNORDERED),
+      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED)
     )
   }
 }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java
index d3a2952be75..6c70e6b54cf 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java
@@ -51,6 +51,8 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,6 +61,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -71,11 +74,19 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
 /** Harness tests for {@link LookupJoinRunner} and {@link LookupJoinWithCalcRunner}. */
+@RunWith(Parameterized.class)
 public class AsyncLookupJoinHarnessTest {
 
     private static final int ASYNC_BUFFER_CAPACITY = 100;
     private static final int ASYNC_TIMEOUT_MS = 3000;
 
+    @Parameterized.Parameter public boolean orderedResult;
+
+    @Parameterized.Parameters(name = "ordered result = {0}")
+    public static Object[] parameters() {
+        return new Object[][] {new Object[] {true}, new Object[] {false}};
+    }
+
     private final TypeSerializer<RowData> inSerializer =
             new RowDataSerializer(
                     DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType());
@@ -130,7 +141,7 @@ public class AsyncLookupJoinHarnessTest {
         expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
         expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
 
-        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        checkResult(expectedOutput, testHarness.getOutput());
     }
 
     @Test
@@ -159,7 +170,7 @@ public class AsyncLookupJoinHarnessTest {
         expectedOutput.add(insertRecord(3, "c", 3, "Jackson"));
         expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
 
-        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        checkResult(expectedOutput, testHarness.getOutput());
     }
 
     @Test
@@ -191,7 +202,7 @@ public class AsyncLookupJoinHarnessTest {
         expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
         expectedOutput.add(insertRecord(5, "e", null, null));
 
-        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        checkResult(expectedOutput, testHarness.getOutput());
     }
 
     @Test
@@ -222,11 +233,19 @@ public class AsyncLookupJoinHarnessTest {
         expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
         expectedOutput.add(insertRecord(5, "e", null, null));
 
-        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        checkResult(expectedOutput, testHarness.getOutput());
     }
 
     // ---------------------------------------------------------------------------------
 
+    private void checkResult(Collection<Object> expectedOutput, Collection<Object> actualOutput) {
+        if (orderedResult) {
+            assertor.assertOutputEquals("output wrong.", expectedOutput, actualOutput);
+        } else {
+            assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput);
+        }
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(
             JoinType joinType, FilterOnTable filterOnTable) throws Exception {
@@ -258,7 +277,9 @@ public class AsyncLookupJoinHarnessTest {
                         joinRunner,
                         ASYNC_TIMEOUT_MS,
                         ASYNC_BUFFER_CAPACITY,
-                        AsyncDataStream.OutputMode.ORDERED),
+                        orderedResult
+                                ? AsyncDataStream.OutputMode.ORDERED
+                                : AsyncDataStream.OutputMode.UNORDERED),
                 inSerializer);
     }
 
@@ -319,6 +340,8 @@ public class AsyncLookupJoinHarnessTest {
 
         private static final Map<Integer, List<RowData>> data = new HashMap<>();
 
+        private final Random random = new Random();
+
         static {
             data.put(1, Collections.singletonList(GenericRowData.of(1, fromString("Julian"))));
             data.put(
@@ -334,7 +357,8 @@ public class AsyncLookupJoinHarnessTest {
         @Override
         public void open(Configuration parameters) throws Exception {
             super.open(parameters);
-            this.executor = Executors.newSingleThreadExecutor();
+            // generate unordered result for async lookup
+            this.executor = Executors.newFixedThreadPool(2);
         }
 
         @Override
@@ -342,7 +366,16 @@ public class AsyncLookupJoinHarnessTest {
                 throws Exception {
             int id = input.getInt(0);
             CompletableFuture.supplyAsync(
-                            (Supplier<Collection<RowData>>) () -> data.get(id), executor)
+                            (Supplier<Collection<RowData>>)
+                                    () -> {
+                                        try {
+                                            Thread.sleep(random.nextInt(5));
+                                        } catch (InterruptedException e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                        return data.get(id);
+                                    },
+                            executor)
                     .thenAcceptAsync(resultFuture::complete, executor);
         }