You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/21 07:36:29 UTC

[GitHub] [flink] lincoln-lil opened a new pull request, #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

lincoln-lil opened a new pull request, #20324:
URL: https://github.com/apache/flink/pull/20324

   ## What is the purpose of the change
   This is a followup implementation of FLINK-28566 which introduces  a new lookup join operator (sync mode only) with state to eliminate the non determinism.
   
   ## Brief change log
   * add KeyedLookupJoinWrapper to process I/+U vs D/-U differently
   * add ListenableCollector to offer a callback when original records were collected
   
   ## Verifying this change
   newly added KeyedLookupJoinHarnessTest and existing LookupJoinITCase态AsyncLookupJoinITCase
   
   ## Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
     - The serializers: (no )
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #20324:
URL: https://github.com/apache/flink/pull/20324#issuecomment-1207212735

   rebased latest master branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20324:
URL: https://github.com/apache/flink/pull/20324#discussion_r939875933


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -143,6 +154,8 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
 
     public static final String LOOKUP_JOIN_TRANSFORMATION = "lookup-join";
 
+    public static final String LOOKUP_JOIN_WITH_STATE_TRANSFORMATION = "lookup-join-with-state";

Review Comment:
   changed to 'lookup-join-materialize', omit the word 'with'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20324:
URL: https://github.com/apache/flink/pull/20324#discussion_r939876601


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -329,8 +342,66 @@ private Transformation<RowData> createSyncLookupJoinWithState(
                         isLeftOuterJoin,
                         isObjectReuseEnabled);
 
-        // TODO then wrapper it into a keyed lookup function with state FLINK-28568
-        throw new UnsupportedOperationException("to be supported");
+        RowType rightRowType =
+                getRightRowType(
+                        getProjectionRowTypeOnTemporalTable(relBuilder), tableSourceRowType);
+
+        KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+                new KeyedLookupJoinWrapper(
+                        (LookupJoinRunner) processFunction,
+                        StateConfigUtil.createTtlConfig(
+                                config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
+                        InternalSerializers.create(rightRowType),
+                        lookupKeyContainsPrimaryKey);
+
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+        List<Integer> refKeys =
+                allLookupKeys.values().stream()
+                        .filter(key -> key instanceof LookupJoinUtil.FieldRefLookupKey)
+                        .map(key -> ((LookupJoinUtil.FieldRefLookupKey) key).index)
+                        .collect(Collectors.toList());
+        RowDataKeySelector keySelector;
+
+        // use single parallelism for empty key shuffle
+        boolean singleParallelism = refKeys.isEmpty();
+        if (singleParallelism) {
+            // all lookup keys are constants, then use an empty key selector
+            keySelector = EmptyRowDataKeySelector.INSTANCE;
+        } else {
+            // make it a deterministic asc order
+            Collections.sort(refKeys);
+            keySelector =
+                    KeySelectorUtil.getRowDataSelector(
+                            classLoader,
+                            refKeys.stream().mapToInt(Integer::intValue).toArray(),
+                            InternalTypeInfo.of(inputRowType));
+        }
+        final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
+                new KeyGroupStreamPartitioner<>(
+                        keySelector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
+        Transformation<RowData> partitionedTransform =
+                new PartitionTransformation<>(inputTransformation, partitioner);
+        if (singleParallelism) {
+            setSingletonParallelism(partitionedTransform);
+        } else {
+            partitionedTransform.setParallelism(inputTransformation.getParallelism());
+        }
+
+        OneInputTransformation<RowData, RowData> transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        partitionedTransform,
+                        createTransformationMeta(LOOKUP_JOIN_WITH_STATE_TRANSFORMATION, config),
+                        operator,
+                        InternalTypeInfo.of(resultRowType),
+                        partitionedTransform.getParallelism());
+        transform.setStateKeySelector(keySelector);
+        transform.setStateKeyType(keySelector.getProducedType());
+        if (singleParallelism) {
+            setSingletonParallelism(transform);
+        }
+        return transform;
     }
 
     private LogicalType getLookupKeyLogicalType(

Review Comment:
   Yes, this newly method from master is unused, I'll remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20324:
URL: https://github.com/apache/flink/pull/20324#discussion_r939777681


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java:
##########
@@ -156,4 +173,43 @@ public void testLegacyTableSourceException() {
                                 ValidationException.class,
                                 "TemporalTableSourceSpec can not be serialized."));
     }
+
+    @Test
+    public void testAggAndLeftJoinWithTryResolveMode() {
+        tEnv.getConfig()
+                .set(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+                        OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+        util.verifyJsonPlan(
+                "INSERT INTO Sink1 "
+                        + "SELECT T.a, D.name, D.age "
+                        + "FROM (SELECT max(a) a, count(c) c, PROCTIME() proctime FROM MyTable GROUP BY b) T "
+                        + "LEFT JOIN LookupTable "
+                        + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
+    }
+
+    @Test
+    public void testAggAndAllConstantLookupKeyWithTryResolveMode() {

Review Comment:
   ok



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -268,6 +268,35 @@ class AsyncLookupJoinITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
+  @Test
+  def testAggAndAsyncLeftJoinWithTryResolveMode(): Unit = {

Review Comment:
   I've added more cases in LookupJoinITCase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe closed pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
URL: https://github.com/apache/flink/pull/20324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #20324:
URL: https://github.com/apache/flink/pull/20324#issuecomment-1208787436

   @lincoln-lil run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #20324:
URL: https://github.com/apache/flink/pull/20324#issuecomment-1208829686

   An irrelevant failure case of es sink https://issues.apache.org/jira/browse/FLINK-28877


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on PR #20324:
URL: https://github.com/apache/flink/pull/20324#issuecomment-1208864940

   > An irrelevant failure case of es sink https://issues.apache.org/jira/browse/FLINK-28877
   
   I will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20324:
URL: https://github.com/apache/flink/pull/20324#discussion_r939773566


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java:
##########
@@ -156,4 +173,43 @@ public void testLegacyTableSourceException() {
                                 ValidationException.class,
                                 "TemporalTableSourceSpec can not be serialized."));
     }
+
+    @Test
+    public void testAggAndLeftJoinWithTryResolveMode() {
+        tEnv.getConfig()
+                .set(
+                        OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+                        OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+        util.verifyJsonPlan(
+                "INSERT INTO Sink1 "
+                        + "SELECT T.a, D.name, D.age "
+                        + "FROM (SELECT max(a) a, count(c) c, PROCTIME() proctime FROM MyTable GROUP BY b) T "
+                        + "LEFT JOIN LookupTable "
+                        + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
+    }
+
+    @Test
+    public void testAggAndAllConstantLookupKeyWithTryResolveMode() {

Review Comment:
   This method should be moved to LookupJoinTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #20324:
URL: https://github.com/apache/flink/pull/20324#issuecomment-1208042809

   rebased latest master to get rid of failed yarn it case


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20324:
URL: https://github.com/apache/flink/pull/20324#discussion_r939599197


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -329,8 +340,59 @@ private Transformation<RowData> createSyncLookupJoinWithState(
                         isLeftOuterJoin,
                         isObjectReuseEnabled);
 
-        // TODO then wrapper it into a keyed lookup function with state FLINK-28568
-        throw new UnsupportedOperationException("to be supported");
+        KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+                new KeyedLookupJoinWrapper(
+                        (LookupJoinRunner) processFunction,
+                        StateConfigUtil.createTtlConfig(
+                                config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
+                        InternalSerializers.create(tableSourceRowType),
+                        lookupKeyContainsPrimaryKey);
+
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+        List<Integer> refKeys =
+                allLookupKeys.entrySet().stream()

Review Comment:
   use `allLookupKeys.values()` can avoid `key.getValue()`



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/ListenableCollector.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.collector;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Optional;
+
+/**
+ * A listenable collector for lookup join that can be called when an original record was collected.
+ */
+@Internal
+public abstract class ListenableCollector<T> extends TableFunctionCollector<T> {
+    private CollectListener<T> collectListener;

Review Comment:
   we should mark it as `@Nullable`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -329,8 +340,59 @@ private Transformation<RowData> createSyncLookupJoinWithState(
                         isLeftOuterJoin,
                         isObjectReuseEnabled);
 
-        // TODO then wrapper it into a keyed lookup function with state FLINK-28568
-        throw new UnsupportedOperationException("to be supported");
+        KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+                new KeyedLookupJoinWrapper(
+                        (LookupJoinRunner) processFunction,
+                        StateConfigUtil.createTtlConfig(
+                                config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
+                        InternalSerializers.create(tableSourceRowType),
+                        lookupKeyContainsPrimaryKey);
+
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+        List<Integer> refKeys =
+                allLookupKeys.entrySet().stream()
+                        .filter(

Review Comment:
   directly filter the `FieldRefLookupKey`s using `filter(key -> (key.getValue() instanceof LookupJoinUtil.FieldRefLookupKey))` ?



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java:
##########
@@ -238,13 +239,16 @@ public void flatMap(RowData value, Collector<RowData> out) throws Exception {
      * The {@link TestingFetcherCollector} is a simple implementation of {@link
      * TableFunctionCollector} which combines left and right into a JoinedRowData.
      */
-    public static final class TestingFetcherCollector extends TableFunctionCollector {
+    public static final class TestingFetcherCollector extends ListenableCollector {

Review Comment:
   The type should be `ListenableCollector<RowData>` ?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -268,6 +268,35 @@ class AsyncLookupJoinITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
+  @Test
+  def testAggAndAsyncLeftJoinWithTryResolveMode(): Unit = {

Review Comment:
   Do we have any IT case to verify the change? this pr aims to support sync LookupJoin



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -329,8 +340,59 @@ private Transformation<RowData> createSyncLookupJoinWithState(
                         isLeftOuterJoin,
                         isObjectReuseEnabled);
 
-        // TODO then wrapper it into a keyed lookup function with state FLINK-28568
-        throw new UnsupportedOperationException("to be supported");
+        KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+                new KeyedLookupJoinWrapper(
+                        (LookupJoinRunner) processFunction,
+                        StateConfigUtil.createTtlConfig(
+                                config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
+                        InternalSerializers.create(tableSourceRowType),
+                        lookupKeyContainsPrimaryKey);
+
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+        List<Integer> refKeys =
+                allLookupKeys.entrySet().stream()
+                        .filter(
+                                key ->
+                                        !(key.getValue()
+                                                instanceof LookupJoinUtil.ConstantLookupKey))
+                        .map(key -> ((LookupJoinUtil.FieldRefLookupKey) key.getValue()).index)
+                        .collect(Collectors.toList());
+        RowDataKeySelector keySelector;
+
+        int parallelism = inputTransformation.getParallelism();
+        if (refKeys.isEmpty()) {
+            // all lookup keys are constants, then use an empty key selector
+            keySelector = EmptyRowDataKeySelector.INSTANCE;
+            // single parallelism for empty key shuffle
+            parallelism = 1;
+        } else {
+            // make it a deterministic asc order
+            Collections.sort(refKeys);
+            keySelector =
+                    KeySelectorUtil.getRowDataSelector(
+                            classLoader,
+                            refKeys.stream().mapToInt(Integer::intValue).toArray(),
+                            InternalTypeInfo.of(inputRowType));
+        }
+        final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
+                new KeyGroupStreamPartitioner<>(
+                        keySelector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
+        Transformation<RowData> partitionedTransform =
+                new PartitionTransformation<>(inputTransformation, partitioner);
+        partitionedTransform.setParallelism(parallelism);
+
+        OneInputTransformation<RowData, RowData> transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        partitionedTransform,
+                        createTransformationMeta(LOOKUP_JOIN_TRANSFORMATION, config),

Review Comment:
   we should define another operatorName for the Transformation, Because LOOKUP_JOIN_TRANSFORMATION has used for the join Transformation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20324:
URL: https://github.com/apache/flink/pull/20324#discussion_r939866279


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -541,16 +632,9 @@ private ProcessFunction<RowData, RowData> createSyncLookupJoinFunction(
                         isObjectReuseEnabled);
 
         Optional<RelDataType> temporalTableOutputType =
-                projectionOnTemporalTable != null
-                        ? Optional.of(
-                                RexUtil.createStructType(
-                                        unwrapTypeFactory(relBuilder), projectionOnTemporalTable))
-                        : Optional.empty();
-        RowType rightRowType =
-                projectionOnTemporalTable != null
-                        ? (RowType) toLogicalType(temporalTableOutputType.get())
-                        : tableSourceRowType;
-        GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
+                getProjectionRowTypeOnTemporalTable(relBuilder);

Review Comment:
   we can call  the `getProjectionRowTypeOnTemporalTable`  method in `if (projectionOnTemporalTable != null) ` branch, and the implementation of `getProjectionRowTypeOnTemporalTable` method can be simplified
   
   There is some duplicated code in `createAsyncLookupJoin` method



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java:
##########
@@ -238,13 +239,16 @@ public void flatMap(RowData value, Collector<RowData> out) throws Exception {
      * The {@link TestingFetcherCollector} is a simple implementation of {@link
      * TableFunctionCollector} which combines left and right into a JoinedRowData.
      */
-    public static final class TestingFetcherCollector extends TableFunctionCollector {
+    public static final class TestingFetcherCollector extends ListenableCollector<RowData> {
         private static final long serialVersionUID = -312754413938303160L;
 
         @Override
-        public void collect(Object record) {
+        public void collect(RowData record) {
             RowData left = (RowData) getInput();
-            RowData right = (RowData) record;
+            RowData right = record;
+            getCollectListener()
+                    .ifPresent(listener -> ((CollectListener) listener).onCollect(record));

Review Comment:
   cast is no needed here



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -143,6 +154,8 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
 
     public static final String LOOKUP_JOIN_TRANSFORMATION = "lookup-join";
 
+    public static final String LOOKUP_JOIN_WITH_STATE_TRANSFORMATION = "lookup-join-with-state";

Review Comment:
   how about `lookup-join-with-materialize` ?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/ListenableCollector.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.collector;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/**
+ * A listenable collector for lookup join that can be called when an original record was collected.
+ */
+@Internal
+public abstract class ListenableCollector<T> extends TableFunctionCollector<T> {
+    @Nullable private CollectListener<T> collectListener;
+
+    public void setCollectListener(CollectListener<T> collectListener) {

Review Comment:
   nit: @Nullable CollectListener<T> collectListener



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -329,8 +342,66 @@ private Transformation<RowData> createSyncLookupJoinWithState(
                         isLeftOuterJoin,
                         isObjectReuseEnabled);
 
-        // TODO then wrapper it into a keyed lookup function with state FLINK-28568
-        throw new UnsupportedOperationException("to be supported");
+        RowType rightRowType =
+                getRightRowType(
+                        getProjectionRowTypeOnTemporalTable(relBuilder), tableSourceRowType);
+
+        KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+                new KeyedLookupJoinWrapper(
+                        (LookupJoinRunner) processFunction,
+                        StateConfigUtil.createTtlConfig(
+                                config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
+                        InternalSerializers.create(rightRowType),
+                        lookupKeyContainsPrimaryKey);
+
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+        List<Integer> refKeys =
+                allLookupKeys.values().stream()
+                        .filter(key -> key instanceof LookupJoinUtil.FieldRefLookupKey)
+                        .map(key -> ((LookupJoinUtil.FieldRefLookupKey) key).index)
+                        .collect(Collectors.toList());
+        RowDataKeySelector keySelector;
+
+        // use single parallelism for empty key shuffle
+        boolean singleParallelism = refKeys.isEmpty();
+        if (singleParallelism) {
+            // all lookup keys are constants, then use an empty key selector
+            keySelector = EmptyRowDataKeySelector.INSTANCE;
+        } else {
+            // make it a deterministic asc order
+            Collections.sort(refKeys);
+            keySelector =
+                    KeySelectorUtil.getRowDataSelector(
+                            classLoader,
+                            refKeys.stream().mapToInt(Integer::intValue).toArray(),
+                            InternalTypeInfo.of(inputRowType));
+        }
+        final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
+                new KeyGroupStreamPartitioner<>(
+                        keySelector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
+        Transformation<RowData> partitionedTransform =
+                new PartitionTransformation<>(inputTransformation, partitioner);
+        if (singleParallelism) {
+            setSingletonParallelism(partitionedTransform);
+        } else {
+            partitionedTransform.setParallelism(inputTransformation.getParallelism());
+        }
+
+        OneInputTransformation<RowData, RowData> transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        partitionedTransform,
+                        createTransformationMeta(LOOKUP_JOIN_WITH_STATE_TRANSFORMATION, config),
+                        operator,
+                        InternalTypeInfo.of(resultRowType),
+                        partitionedTransform.getParallelism());
+        transform.setStateKeySelector(keySelector);
+        transform.setStateKeyType(keySelector.getProducedType());
+        if (singleParallelism) {
+            setSingletonParallelism(transform);
+        }
+        return transform;
     }
 
     private LogicalType getLookupKeyLogicalType(

Review Comment:
   this method can be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20324:
URL: https://github.com/apache/flink/pull/20324#discussion_r939876782


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -541,16 +632,9 @@ private ProcessFunction<RowData, RowData> createSyncLookupJoinFunction(
                         isObjectReuseEnabled);
 
         Optional<RelDataType> temporalTableOutputType =
-                projectionOnTemporalTable != null
-                        ? Optional.of(
-                                RexUtil.createStructType(
-                                        unwrapTypeFactory(relBuilder), projectionOnTemporalTable))
-                        : Optional.empty();
-        RowType rightRowType =
-                projectionOnTemporalTable != null
-                        ? (RowType) toLogicalType(temporalTableOutputType.get())
-                        : tableSourceRowType;
-        GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
+                getProjectionRowTypeOnTemporalTable(relBuilder);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20324:
URL: https://github.com/apache/flink/pull/20324#issuecomment-1191161353

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9752663fcd7a68aa9425dad88465af1cf678c3cb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9752663fcd7a68aa9425dad88465af1cf678c3cb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9752663fcd7a68aa9425dad88465af1cf678c3cb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20324:
URL: https://github.com/apache/flink/pull/20324#discussion_r939629582


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -329,8 +340,59 @@ private Transformation<RowData> createSyncLookupJoinWithState(
                         isLeftOuterJoin,
                         isObjectReuseEnabled);
 
-        // TODO then wrapper it into a keyed lookup function with state FLINK-28568
-        throw new UnsupportedOperationException("to be supported");
+        KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+                new KeyedLookupJoinWrapper(
+                        (LookupJoinRunner) processFunction,
+                        StateConfigUtil.createTtlConfig(
+                                config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
+                        InternalSerializers.create(tableSourceRowType),
+                        lookupKeyContainsPrimaryKey);
+
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+        List<Integer> refKeys =
+                allLookupKeys.entrySet().stream()
+                        .filter(
+                                key ->
+                                        !(key.getValue()
+                                                instanceof LookupJoinUtil.ConstantLookupKey))
+                        .map(key -> ((LookupJoinUtil.FieldRefLookupKey) key.getValue()).index)
+                        .collect(Collectors.toList());
+        RowDataKeySelector keySelector;
+
+        int parallelism = inputTransformation.getParallelism();
+        if (refKeys.isEmpty()) {
+            // all lookup keys are constants, then use an empty key selector
+            keySelector = EmptyRowDataKeySelector.INSTANCE;
+            // single parallelism for empty key shuffle
+            parallelism = 1;
+        } else {
+            // make it a deterministic asc order
+            Collections.sort(refKeys);
+            keySelector =
+                    KeySelectorUtil.getRowDataSelector(
+                            classLoader,
+                            refKeys.stream().mapToInt(Integer::intValue).toArray(),
+                            InternalTypeInfo.of(inputRowType));
+        }
+        final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
+                new KeyGroupStreamPartitioner<>(
+                        keySelector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
+        Transformation<RowData> partitionedTransform =
+                new PartitionTransformation<>(inputTransformation, partitioner);
+        partitionedTransform.setParallelism(parallelism);
+
+        OneInputTransformation<RowData, RowData> transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        partitionedTransform,
+                        createTransformationMeta(LOOKUP_JOIN_TRANSFORMATION, config),

Review Comment:
   ok



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -268,6 +268,35 @@ class AsyncLookupJoinITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
+  @Test
+  def testAggAndAsyncLeftJoinWithTryResolveMode(): Unit = {

Review Comment:
   this case can cover the change, the legacy source can provide both sync and async functions, so it can fallback to sync lookup function with state.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -329,8 +340,59 @@ private Transformation<RowData> createSyncLookupJoinWithState(
                         isLeftOuterJoin,
                         isObjectReuseEnabled);
 
-        // TODO then wrapper it into a keyed lookup function with state FLINK-28568
-        throw new UnsupportedOperationException("to be supported");
+        KeyedLookupJoinWrapper keyedLookupJoinWrapper =
+                new KeyedLookupJoinWrapper(
+                        (LookupJoinRunner) processFunction,
+                        StateConfigUtil.createTtlConfig(
+                                config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
+                        InternalSerializers.create(tableSourceRowType),
+                        lookupKeyContainsPrimaryKey);
+
+        KeyedProcessOperator<RowData, RowData, RowData> operator =
+                new KeyedProcessOperator<>(keyedLookupJoinWrapper);
+
+        List<Integer> refKeys =
+                allLookupKeys.entrySet().stream()
+                        .filter(

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org