You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/07 02:33:51 UTC

[flink] 03/03: [FLINK-28709][table] Introduce ExecutionOrderEnforcerOperator to ensure the source with dynamic filtering is executed after the DynamicFilteringData is collected

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0859789e7733c73a21e600ec0d595ead730c59d
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Thu Jul 28 18:28:25 2022 +0800

    [FLINK-28709][table] Introduce ExecutionOrderEnforcerOperator to ensure the source with dynamic filtering is executed after the DynamicFilteringData is collected
    
    This closes #20374
---
 .../ExecutionOrderEnforcerOperator.java            | 71 ++++++++++++++++++++++
 .../ExecutionOrderEnforcerOperatorFactory.java     | 47 ++++++++++++++
 2 files changed, 118 insertions(+)

diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java
new file mode 100644
index 00000000000..caa74ed3713
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * ExecutionOrderEnforcerOperator has two inputs, one of which is a source, and the other is the
+ * dependent upstream. It enforces that the input source is executed after the dependent input is
+ * finished. Everything passed from the inputs is forwarded to the output, though typically the
+ * dependent input should not send anything.
+ *
+ * <p>The operator must be chained with the source, which is generally ensured by the {@link
+ * ExecutionOrderEnforcerOperatorFactory}. If chaining is explicitly disabled, the enforcer can not
+ * work as expected.
+ *
+ * <p>The operator is used only for dynamic filtering at present.
+ */
+public class ExecutionOrderEnforcerOperator<IN> extends AbstractStreamOperatorV2<IN>
+        implements MultipleInputStreamOperator<IN> {
+
+    public ExecutionOrderEnforcerOperator(StreamOperatorParameters<IN> parameters) {
+        super(parameters, 2);
+    }
+
+    @Override
+    public List<Input> getInputs() {
+        return Arrays.asList(
+                new ForwardingInput<>(this, 1, output), new ForwardingInput<>(this, 2, output));
+    }
+
+    private static class ForwardingInput<IN> extends AbstractInput<IN, IN> {
+        private final Output<StreamRecord<IN>> output;
+
+        public ForwardingInput(
+                AbstractStreamOperatorV2<IN> owner, int inputId, Output<StreamRecord<IN>> output) {
+            super(owner, inputId);
+            this.output = output;
+        }
+
+        @Override
+        public void processElement(StreamRecord<IN> element) throws Exception {
+            output.collect(element);
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java
new file mode 100644
index 00000000000..291d16d2a46
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+/**
+ * The factory class for {@link ExecutionOrderEnforcerOperator}. This is a simple operator factory
+ * whose chaining strategy is always ChainingStrategy.HEAD_WITH_SOURCES.
+ */
+public class ExecutionOrderEnforcerOperatorFactory<IN> extends AbstractStreamOperatorFactory<IN> {
+
+    @Override
+    public <T extends StreamOperator<IN>> T createStreamOperator(
+            StreamOperatorParameters<IN> parameters) {
+        return (T) new ExecutionOrderEnforcerOperator<>(parameters);
+    }
+
+    @Override
+    public ChainingStrategy getChainingStrategy() {
+        return ChainingStrategy.HEAD_WITH_SOURCES;
+    }
+
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+        return ExecutionOrderEnforcerOperator.class;
+    }
+}