You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lincoln-lil (via GitHub)" <gi...@apache.org> on 2023/03/24 13:15:28 UTC

[GitHub] [flink] lincoln-lil commented on a diff in pull request #22014: [FLINK-18996][table-runtime] Emit join failed record realtime

lincoln-lil commented on code in PR #22014:
URL: https://github.com/apache/flink/pull/22014#discussion_r1147543075


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -542,6 +542,18 @@ public class ExecutionConfigOptions {
                                     + "In Flink 1.15.x the pattern was wrongly defined as '<id>_<type>_<version>_<transformation>' "
                                     + "which would prevent migrations in the future.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Duration>
+            TABLE_EXEC_INTERVAL_JOIN_MIN_CLEAN_UP_INTERVAL_MILLIS =

Review Comment:
   -> "TABLE_EXEC_INTERVAL_JOIN_MIN_CLEAN_UP_INTERVAL"



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -542,6 +542,18 @@ public class ExecutionConfigOptions {
                                     + "In Flink 1.15.x the pattern was wrongly defined as '<id>_<type>_<version>_<transformation>' "
                                     + "which would prevent migrations in the future.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Duration>
+            TABLE_EXEC_INTERVAL_JOIN_MIN_CLEAN_UP_INTERVAL_MILLIS =
+                    key("table.exec.interval-join.min-cleanup-interval")
+                            .durationType()
+                            .defaultValue(Duration.ofMillis(0))
+                            .withDescription(
+                                    "Specifies a minimum time interval for how long cleanup unmatched records in the interval join operator. "
+                                            + "Before Flink 1.18, the default value of this param was the half of interval duration. "
+                                            + "NOTE: This option greater than 0 will cause records disorder and may cause downstream operator discard these records e.g. window operator. "

Review Comment:
   'records disorder' may cause some confusion, how about changing it to "Note: Set this option greater than 0 will cause unmatched records in outer joins to be output later than watermark, leading to possible discarding of these records by downstream watermark-dependent operators, such as window operators. The default value is 0, which means it will clean up unmatched records immediately." ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java:
##########
@@ -347,6 +355,7 @@ private TwoInputTransformation<RowData, RowData, RowData> createProcTimeJoin(
             IntervalJoinFunction joinFunction,
             JoinSpec joinSpec,
             IntervalJoinSpec.WindowBounds windowBounds,
+            long minCleanUpInterval,

Review Comment:
   nit: -> minCleanUpIntervalMillis



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java:
##########
@@ -66,6 +66,8 @@
 
 import java.util.List;
 
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_INTERVAL_JOIN_MIN_CLEAN_UP_INTERVAL_MILLIS;

Review Comment:
   nit: unnecessary static import



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoin.java:
##########
@@ -35,6 +35,7 @@ public RowTimeIntervalJoin(
             long leftLowerBound,
             long leftUpperBound,
             long allowedLateness,
+            long minCleanUpInterval,

Review Comment:
   ditto



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/ProcTimeIntervalJoin.java:
##########
@@ -31,10 +31,19 @@ public ProcTimeIntervalJoin(
             FlinkJoinType joinType,
             long leftLowerBound,
             long leftUpperBound,
+            long minCleanUpInterval,

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java:
##########
@@ -357,6 +366,7 @@ private TwoInputTransformation<RowData, RowData, RowData> createProcTimeJoin(
                         joinSpec.getJoinType(),
                         windowBounds.getLeftLowerBound(),
                         windowBounds.getLeftUpperBound(),
+                        minCleanUpInterval,

Review Comment:
   ditto



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java:
##########
@@ -87,13 +87,14 @@ abstract class TimeIntervalJoin extends KeyedCoProcessFunction<RowData, RowData,
             long leftLowerBound,
             long leftUpperBound,
             long allowedLateness,
+            long minCleanUpInterval,

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org