You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/07 02:33:51 UTC
[flink] 03/03: [FLINK-28709][table] Introduce ExecutionOrderEnforcerOperator to ensure the source with dynamic filtering is executed after the DynamicFilteringData is collected
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b0859789e7733c73a21e600ec0d595ead730c59d
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Thu Jul 28 18:28:25 2022 +0800
[FLINK-28709][table] Introduce ExecutionOrderEnforcerOperator to ensure the source with dynamic filtering is executed after the DynamicFilteringData is collected
This closes #20374
---
.../ExecutionOrderEnforcerOperator.java | 71 ++++++++++++++++++++++
.../ExecutionOrderEnforcerOperatorFactory.java | 47 ++++++++++++++
2 files changed, 118 insertions(+)
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java
new file mode 100644
index 00000000000..caa74ed3713
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * ExecutionOrderEnforcerOperator has two inputs, one of which is a source, and the other is the
+ * dependent upstream. It enforces that the input source is executed after the dependent input is
+ * finished. Everything passed from the inputs is forwarded to the output, though typically the
+ * dependent input should not send anything.
+ *
+ * <p>The operator must be chained with the source, which is generally ensured by the {@link
+ * ExecutionOrderEnforcerOperatorFactory}. If chaining is explicitly disabled, the enforcer can not
+ * work as expected.
+ *
+ * <p>The operator is used only for dynamic filtering at present.
+ */
+public class ExecutionOrderEnforcerOperator<IN> extends AbstractStreamOperatorV2<IN>
+ implements MultipleInputStreamOperator<IN> {
+
+ public ExecutionOrderEnforcerOperator(StreamOperatorParameters<IN> parameters) {
+ super(parameters, 2);
+ }
+
+ @Override
+ public List<Input> getInputs() {
+ return Arrays.asList(
+ new ForwardingInput<>(this, 1, output), new ForwardingInput<>(this, 2, output));
+ }
+
+ private static class ForwardingInput<IN> extends AbstractInput<IN, IN> {
+ private final Output<StreamRecord<IN>> output;
+
+ public ForwardingInput(
+ AbstractStreamOperatorV2<IN> owner, int inputId, Output<StreamRecord<IN>> output) {
+ super(owner, inputId);
+ this.output = output;
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ output.collect(element);
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java
new file mode 100644
index 00000000000..291d16d2a46
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+/**
+ * The factory class for {@link ExecutionOrderEnforcerOperator}. This is a simple operator factory
+ * whose chaining strategy is always ChainingStrategy.HEAD_WITH_SOURCES.
+ */
+public class ExecutionOrderEnforcerOperatorFactory<IN> extends AbstractStreamOperatorFactory<IN> {
+
+ @Override
+ public <T extends StreamOperator<IN>> T createStreamOperator(
+ StreamOperatorParameters<IN> parameters) {
+ return (T) new ExecutionOrderEnforcerOperator<>(parameters);
+ }
+
+ @Override
+ public ChainingStrategy getChainingStrategy() {
+ return ChainingStrategy.HEAD_WITH_SOURCES;
+ }
+
+ @Override
+ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+ return ExecutionOrderEnforcerOperator.class;
+ }
+}