You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/18 07:29:17 UTC

[GitHub] [flink] lincoln-lil opened a new pull request, #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

lincoln-lil opened a new pull request, #19759:
URL: https://github.com/apache/flink/pull/19759

   ## What is the purpose of the change
   Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions so that users can configure the async lookup join's result order if needed. 
   * ORDERED by default (behavior the same as now)
   * ALLOW_UNORDERED if users allow unordered result and this does not break correctness(if lookup join's input is  insert only) then will turn to `AsyncDataStream.OutputMode.UNORDERED` mode, usually gain higher throughput.
   
   ## Brief change log
   * add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
   * add inputInsertOnly attribute to exec lookup join node
   * update existing tests ensure unordered mode is covered
   
   ## Verifying this change
   AsyncLookupJoinHarnessTest  AsyncLookupJoinITCase and xxxJsonPlanTest
   
   ## Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
     - The serializers: (no )
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #19759:
URL: https://github.com/apache/flink/pull/19759#issuecomment-1130907243

   @flinkbot  run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #19759:
URL: https://github.com/apache/flink/pull/19759#issuecomment-1185072872

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #19759:
URL: https://github.com/apache/flink/pull/19759#issuecomment-1184042723

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19759:
URL: https://github.com/apache/flink/pull/19759#discussion_r920650717


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java:
##########
@@ -46,7 +46,7 @@ public BatchExecLookupJoin(
             Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
             @Nullable List<RexNode> projectionOnTemporalTable,
             @Nullable RexNode filterOnTemporalTable,
-            InputProperty inputProperty,
+            @Nullable InputProperty inputProperty,

Review Comment:
   To be honest I can't remember why did that change, it seems never be null from the only instance creation path now. I'll revert this 'nullable' change.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -303,13 +308,15 @@ class AsyncLookupJoinITCase(
 }
 
 object AsyncLookupJoinITCase {
-  @Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}")
+  @Parameterized.Parameters(
+    name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, AsyncOutputMode={3}")
   def parameters(): JCollection[Array[Object]] = {
     Seq[Array[AnyRef]](
-      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE),
-      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE)
+      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED),
+      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED)

Review Comment:
   ok



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -382,10 +391,17 @@ private StreamOperatorFactory<RowData> createAsyncLookupJoin(
                             asyncBufferCapacity);
         }
 
-        // force ORDERED output mode currently, optimize it to UNORDERED
-        // when the downstream do not need orderness
         return new AsyncWaitOperatorFactory<>(
-                asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED);
+                asyncFunc, asyncTimeout, asyncBufferCapacity, convert(asyncOutputMode));
+    }
+
+    private AsyncDataStream.OutputMode convert(
+            ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+        if (inputInsertOnly
+                && asyncOutputMode == ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED) {
+            return AsyncDataStream.OutputMode.UNORDERED;
+        }
+        return AsyncDataStream.OutputMode.ORDERED;

Review Comment:
   yes, AsyncLookupJoinITCase#testAggAndAsyncLeftJoinTemporalTable cover the case you mentioned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe closed pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
URL: https://github.com/apache/flink/pull/19759


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19759:
URL: https://github.com/apache/flink/pull/19759#issuecomment-1129671736

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "54c4e41d5623b384bbff136a7c30c7fabe0030b7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "54c4e41d5623b384bbff136a7c30c7fabe0030b7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 54c4e41d5623b384bbff136a7c30c7fabe0030b7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #19759:
URL: https://github.com/apache/flink/pull/19759#issuecomment-1131268448

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #19759:
URL: https://github.com/apache/flink/pull/19759#issuecomment-1184258332

   @flinkbot  run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #19759:
URL: https://github.com/apache/flink/pull/19759#discussion_r920094045


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -303,13 +308,15 @@ class AsyncLookupJoinITCase(
 }
 
 object AsyncLookupJoinITCase {
-  @Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}")
+  @Parameterized.Parameters(
+    name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, AsyncOutputMode={3}")
   def parameters(): JCollection[Array[Object]] = {
     Seq[Array[AnyRef]](
-      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE),
-      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE)
+      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED),
+      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED)

Review Comment:
   add case: `ObjectReuse=false` and `AsyncOutputMode=AsyncOutputMode.ALLOW_UNORDERED`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -382,10 +391,17 @@ private StreamOperatorFactory<RowData> createAsyncLookupJoin(
                             asyncBufferCapacity);
         }
 
-        // force ORDERED output mode currently, optimize it to UNORDERED
-        // when the downstream do not need orderness
         return new AsyncWaitOperatorFactory<>(
-                asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED);
+                asyncFunc, asyncTimeout, asyncBufferCapacity, convert(asyncOutputMode));
+    }
+
+    private AsyncDataStream.OutputMode convert(
+            ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+        if (inputInsertOnly
+                && asyncOutputMode == ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED) {
+            return AsyncDataStream.OutputMode.UNORDERED;
+        }
+        return AsyncDataStream.OutputMode.ORDERED;

Review Comment:
   do we have any cases that can cover the case: inputInsertOnly is false and  asyncOutputMode == ALLOW_UNORDERED



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java:
##########
@@ -46,7 +46,7 @@ public BatchExecLookupJoin(
             Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
             @Nullable List<RexNode> projectionOnTemporalTable,
             @Nullable RexNode filterOnTemporalTable,
-            InputProperty inputProperty,
+            @Nullable InputProperty inputProperty,

Review Comment:
   can `inputProperty` be null ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org