You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/21 06:30:25 UTC

[GitHub] [flink] TsReaper commented on a change in pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

TsReaper commented on a change in pull request #13707:
URL: https://github.com/apache/flink/pull/13707#discussion_r508990188



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapper.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class handles the close, endInput and other related logic of a {@link StreamOperator}.
+ * It also automatically propagates the end-input operation to the next wrapper that
+ * the {@link #outputEdges} points to, so we only need to call the head wrapper's
+ * {@link #endOperatorInput(int)} method.
+ */
+public class TableOperatorWrapper<OP extends StreamOperator<RowData>> implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The factory to create the wrapped operator.
+	 */
+	private final StreamOperatorFactory<RowData> factory;
+
+	/**
+	 * the operator name for debugging.
+	 */
+	private final String operatorName;
+
+	/**
+	 * The type info of this wrapped operator's all inputs.
+	 *
+	 * <p>NOTE:The inputs of an operator may not all be in the multiple-input operator, e.g.
+	 * The multiple-input operator contains A and J, and A is one of the input of J,
+	 * and another input of J is not in the multiple-input operator.
+	 * <pre>
+	 * -------
+	 *        \
+	 *         J --
+	 *        /
+	 * -- A --
+	 * </pre>
+	 * For this example, `allInputTypes` contains two input types.
+	 */
+	private final List<TypeInformation<?>> allInputTypes;
+
+	/**
+	 * The type info of this wrapped operator's output.
+	 */
+	private final TypeInformation<?> outputType;
+
+	/**
+	 * Managed memory fraction in the multiple-input operator.
+	 */
+	private double managedMemoryFraction = -1;
+
+	/**
+	 * The input edges of this operator wrapper, the edges' target is current instance.
+	 */
+	private final List<Edge> inputEdges;
+
+	/**
+	 * The output edges of this operator wrapper, the edges' source is current instance.
+	 */
+	private final List<Edge> outputEdges;
+
+	/**
+	 * The wrapped operator, which will be generated by {@link #factory}.
+	 */
+	private transient OP wrapped;
+
+	private boolean closed;
+	private int endedInputCount;
+
+	public TableOperatorWrapper(
+			StreamOperatorFactory<RowData> factory,
+			String operatorName,
+			List<TypeInformation<?>> allInputTypes,
+			TypeInformation<?> outputType) {
+		this.factory = checkNotNull(factory);
+		this.operatorName = checkNotNull(operatorName);
+		this.outputType = checkNotNull(outputType);
+		this.allInputTypes = checkNotNull(allInputTypes);
+
+		this.inputEdges = new ArrayList<>();
+		this.outputEdges = new ArrayList<>();
+
+		this.endedInputCount = 0;
+	}
+
+	public void createOperator(StreamOperatorParameters<RowData> parameters) {
+		checkArgument(wrapped == null, "This operator has been initialized");
+		if (factory instanceof ProcessingTimeServiceAware) {
+			((ProcessingTimeServiceAware) factory)
+					.setProcessingTimeService(parameters.getProcessingTimeService());
+		}
+		wrapped = factory.createStreamOperator(parameters);
+	}
+
+	public void endOperatorInput(int inputId) throws Exception {
+		endedInputCount++;
+		if (wrapped instanceof BoundedOneInput) {
+			((BoundedOneInput) wrapped).endInput();
+			endOperatorInputForOutput();
+		} else if (wrapped instanceof BoundedMultiInput) {
+			((BoundedMultiInput) wrapped).endInput(inputId);
+			if (endedInputCount >= allInputTypes.size()) {
+				endOperatorInputForOutput();
+			}
+		} else {
+			// some batch operators do not extend from BoundedOneInput, such as BatchCalc
+			endOperatorInputForOutput();
+		}
+	}
+
+	private void endOperatorInputForOutput() throws Exception {

Review comment:
       `propagateEndOperatorInput`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator
+		if (!(transform instanceof UnionTransformation)) {
+			calcParallelismAndResource(transform);
+		}
+
+		final TableOperatorWrapper<?> wrapper;
+		if (visitedTransforms.containsKey(transform)) {
+			wrapper = visitedTransforms.get(transform);
+		} else {
+			wrapper = visitTransformation(transform);
+			visitedTransforms.put(transform, wrapper);
+		}
+		return wrapper;
+	}
+
+	private void calcParallelismAndResource(Transformation<?> transform) {
+		int currentParallelism = transform.getParallelism();
+		if (parallelism < 0) {
+			parallelism = currentParallelism;
+		} else {
+			checkState(
+					currentParallelism < 0 || parallelism == currentParallelism,
+					"Parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");
+		}
+
+		int currentMaxParallelism = transform.getMaxParallelism();
+		if (maxParallelism < 0) {
+			maxParallelism = currentMaxParallelism;
+		} else {
+			checkState(
+					currentMaxParallelism < 0 || maxParallelism == currentMaxParallelism,
+					"Max parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");
+		}
+
+		if (minResources == null) {
+			minResources = transform.getMinResources();
+			preferredResources = transform.getPreferredResources();
+			managedMemoryWeight = transform.getManagedMemoryOperatorScopeUseCaseWeights()
+					.getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+		} else {
+			minResources = minResources.merge(transform.getMinResources());
+			preferredResources = preferredResources.merge(transform.getPreferredResources());
+			managedMemoryWeight += transform.getManagedMemoryOperatorScopeUseCaseWeights()
+					.getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+		}
+	}
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	private TableOperatorWrapper<?> visitTransformation(Transformation<?> transform) {
+		if (transform instanceof OneInputTransformation) {
+			return visitOneInputTransformation((OneInputTransformation) transform);
+		} else if (transform instanceof TwoInputTransformation) {
+			return visitTwoInputTransformation((TwoInputTransformation) transform);
+		} else  if (transform instanceof UnionTransformation) {
+			return visitUnionTransformation((UnionTransformation) transform);
+		} else  {
+			throw new RuntimeException("Unsupported Transformation: " + transform);
+		}
+	}
+
+	private TableOperatorWrapper<?> visitOneInputTransformation(
+			OneInputTransformation<RowData, RowData> transform) {
+		Transformation<?> input = transform.getInputs().get(0);
+
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				transform.getOperatorFactory(),
+				genSubOperatorName(transform),
+				Collections.singletonList(transform.getInputType()),
+				transform.getOutputType()
+		);
+
+		int inputIdx = inputTransforms.indexOf(input);
+		if (inputIdx >= 0) {
+			orderedInputTransforms.add(input);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx], wrapper, 1));
+			headWrappers.add(wrapper);
+		} else {
+			TableOperatorWrapper<?> inputWrapper = visit(input);
+			wrapper.addInput(inputWrapper, 1);
+		}
+		return wrapper;
+	}
+
+	private TableOperatorWrapper<?> visitTwoInputTransformation(
+			TwoInputTransformation<RowData, RowData, RowData> transform) {
+		Transformation<?> input1 = transform.getInput1();
+		Transformation<?> input2 = transform.getInput2();
+		int inputIdx1 = inputTransforms.indexOf(input1);
+		int inputIdx2 = inputTransforms.indexOf(input2);
+
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				transform.getOperatorFactory(),
+				genSubOperatorName(transform),
+				Arrays.asList(transform.getInputType1(), transform.getInputType2()),
+				transform.getOutputType());
+
+		if (inputIdx1 >= 0 && inputIdx2 >= 0) {
+			orderedInputTransforms.add(input1);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx1], wrapper, 1));
+			orderedInputTransforms.add(input2);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx2], wrapper, 2));
+			headWrappers.add(wrapper);
+		} else if (inputIdx1 >= 0) {
+			TableOperatorWrapper<?> inputWrapper = visit(input2);
+			wrapper.addInput(inputWrapper, 2);
+			orderedInputTransforms.add(input1);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx1], wrapper, 1));
+			headWrappers.add(wrapper);
+		} else if (inputIdx2 >= 0) {
+			TableOperatorWrapper<?> inputWrapper = visit(input1);
+			wrapper.addInput(inputWrapper, 1);
+			orderedInputTransforms.add(input2);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx2], wrapper, 2));
+			headWrappers.add(wrapper);
+		} else {
+			TableOperatorWrapper<?> inputWrapper1 = visit(input1);
+			wrapper.addInput(inputWrapper1, 1);
+			TableOperatorWrapper<?> inputWrapper2 = visit(input2);
+			wrapper.addInput(inputWrapper2, 2);
+		}
+
+		return wrapper;
+	}
+
+	private TableOperatorWrapper<?> visitUnionTransformation(
+			UnionTransformation<RowData> transform) {
+		// use MapFunction to combine the input data
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				SimpleOperatorFactory.of(new UnionStreamOperator()),
+				genSubOperatorName(transform),
+				transform.getInputs().stream().map(Transformation::getOutputType).collect(Collectors.toList()),
+				transform.getOutputType());
+
+		int numberOfHeadInput = 0;
+		for (Transformation<?> input : transform.getInputs()) {
+			int inputIdx = inputTransforms.indexOf(input);
+			if (inputIdx >= 0) {
+				numberOfHeadInput ++;

Review comment:
       extra space

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator
+		if (!(transform instanceof UnionTransformation)) {
+			calcParallelismAndResource(transform);
+		}
+
+		final TableOperatorWrapper<?> wrapper;
+		if (visitedTransforms.containsKey(transform)) {
+			wrapper = visitedTransforms.get(transform);
+		} else {
+			wrapper = visitTransformation(transform);
+			visitedTransforms.put(transform, wrapper);
+		}
+		return wrapper;
+	}
+
+	private void calcParallelismAndResource(Transformation<?> transform) {
+		int currentParallelism = transform.getParallelism();
+		if (parallelism < 0) {
+			parallelism = currentParallelism;
+		} else {
+			checkState(
+					currentParallelism < 0 || parallelism == currentParallelism,
+					"Parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");

Review comment:
       Consider the following case:
   ```
   source1 (100 parallelism) -> calc -\
                                        -> union -> join -> ...
   source2 (50 parallelism)  -> calc -/
   ```
   If both source1 and 2 are chainable, both calc will be merged into the multiple input node. However their parallelism are different. Multiple input creation algorithm handles `ExecNode` which has no information about parallelism, so this problem cannot be avoid by that algorithm.
   
   What I would suggest is to set the parallelism to the maximum parallelism of the members.

##########
File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestOneInputStreamOperator.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * A {@link OneInputStreamOperator} for testing.
+ */
+public class TestOneInputStreamOperator extends AbstractStreamOperator<RowData>

Review comment:
       `TestOneInputStreamOperator` -> `TestingOneInputStreamOperator`

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator
+		if (!(transform instanceof UnionTransformation)) {
+			calcParallelismAndResource(transform);
+		}
+
+		final TableOperatorWrapper<?> wrapper;
+		if (visitedTransforms.containsKey(transform)) {
+			wrapper = visitedTransforms.get(transform);
+		} else {
+			wrapper = visitTransformation(transform);
+			visitedTransforms.put(transform, wrapper);
+		}

Review comment:
       `wrapper = visitedTransforms.computeIfAbsent(transform, t -> visitTransformation(t))`

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator
+		if (!(transform instanceof UnionTransformation)) {
+			calcParallelismAndResource(transform);
+		}
+
+		final TableOperatorWrapper<?> wrapper;
+		if (visitedTransforms.containsKey(transform)) {
+			wrapper = visitedTransforms.get(transform);
+		} else {
+			wrapper = visitTransformation(transform);
+			visitedTransforms.put(transform, wrapper);
+		}
+		return wrapper;
+	}
+
+	private void calcParallelismAndResource(Transformation<?> transform) {
+		int currentParallelism = transform.getParallelism();
+		if (parallelism < 0) {
+			parallelism = currentParallelism;
+		} else {
+			checkState(
+					currentParallelism < 0 || parallelism == currentParallelism,
+					"Parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");
+		}
+
+		int currentMaxParallelism = transform.getMaxParallelism();
+		if (maxParallelism < 0) {
+			maxParallelism = currentMaxParallelism;
+		} else {
+			checkState(
+					currentMaxParallelism < 0 || maxParallelism == currentMaxParallelism,
+					"Max parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");
+		}
+
+		if (minResources == null) {
+			minResources = transform.getMinResources();
+			preferredResources = transform.getPreferredResources();
+			managedMemoryWeight = transform.getManagedMemoryOperatorScopeUseCaseWeights()
+					.getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+		} else {
+			minResources = minResources.merge(transform.getMinResources());
+			preferredResources = preferredResources.merge(transform.getPreferredResources());
+			managedMemoryWeight += transform.getManagedMemoryOperatorScopeUseCaseWeights()
+					.getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+		}
+	}
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	private TableOperatorWrapper<?> visitTransformation(Transformation<?> transform) {
+		if (transform instanceof OneInputTransformation) {
+			return visitOneInputTransformation((OneInputTransformation) transform);
+		} else if (transform instanceof TwoInputTransformation) {
+			return visitTwoInputTransformation((TwoInputTransformation) transform);
+		} else  if (transform instanceof UnionTransformation) {
+			return visitUnionTransformation((UnionTransformation) transform);
+		} else  {
+			throw new RuntimeException("Unsupported Transformation: " + transform);
+		}
+	}
+
+	private TableOperatorWrapper<?> visitOneInputTransformation(
+			OneInputTransformation<RowData, RowData> transform) {
+		Transformation<?> input = transform.getInputs().get(0);
+
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				transform.getOperatorFactory(),
+				genSubOperatorName(transform),
+				Collections.singletonList(transform.getInputType()),
+				transform.getOutputType()
+		);
+
+		int inputIdx = inputTransforms.indexOf(input);
+		if (inputIdx >= 0) {
+			orderedInputTransforms.add(input);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx], wrapper, 1));
+			headWrappers.add(wrapper);
+		} else {
+			TableOperatorWrapper<?> inputWrapper = visit(input);
+			wrapper.addInput(inputWrapper, 1);
+		}
+		return wrapper;
+	}
+
+	private TableOperatorWrapper<?> visitTwoInputTransformation(
+			TwoInputTransformation<RowData, RowData, RowData> transform) {
+		Transformation<?> input1 = transform.getInput1();
+		Transformation<?> input2 = transform.getInput2();
+		int inputIdx1 = inputTransforms.indexOf(input1);
+		int inputIdx2 = inputTransforms.indexOf(input2);
+
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				transform.getOperatorFactory(),
+				genSubOperatorName(transform),
+				Arrays.asList(transform.getInputType1(), transform.getInputType2()),
+				transform.getOutputType());
+
+		if (inputIdx1 >= 0 && inputIdx2 >= 0) {
+			orderedInputTransforms.add(input1);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx1], wrapper, 1));
+			orderedInputTransforms.add(input2);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx2], wrapper, 2));
+			headWrappers.add(wrapper);

Review comment:
       extract into a function

##########
File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestTwoInputStreamOperator.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link TwoInputStreamOperator} for testing.
+ */
+public class TestTwoInputStreamOperator extends AbstractStreamOperator<RowData>

Review comment:
       ditto

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapper.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class handles the close, endInput and other related logic of a {@link StreamOperator}.
+ * It also automatically propagates the end-input operation to the next wrapper that
+ * the {@link #outputEdges} points to, so we only need to call the head wrapper's
+ * {@link #endOperatorInput(int)} method.
+ */
+public class TableOperatorWrapper<OP extends StreamOperator<RowData>> implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The factory to create the wrapped operator.
+	 */
+	private final StreamOperatorFactory<RowData> factory;
+
+	/**
+	 * the operator name for debugging.
+	 */
+	private final String operatorName;
+
+	/**
+	 * The type info of this wrapped operator's all inputs.
+	 *
+	 * <p>NOTE:The inputs of an operator may not all be in the multiple-input operator, e.g.
+	 * The multiple-input operator contains A and J, and A is one of the input of J,
+	 * and another input of J is not in the multiple-input operator.
+	 * <pre>
+	 * -------
+	 *        \
+	 *         J --
+	 *        /
+	 * -- A --
+	 * </pre>
+	 * For this example, `allInputTypes` contains two input types.
+	 */
+	private final List<TypeInformation<?>> allInputTypes;
+
+	/**
+	 * The type info of this wrapped operator's output.
+	 */
+	private final TypeInformation<?> outputType;
+
+	/**
+	 * Managed memory fraction in the multiple-input operator.
+	 */
+	private double managedMemoryFraction = -1;
+
+	/**
+	 * The input edges of this operator wrapper, the edges' target is current instance.
+	 */
+	private final List<Edge> inputEdges;
+
+	/**
+	 * The output edges of this operator wrapper, the edges' source is current instance.
+	 */
+	private final List<Edge> outputEdges;
+
+	/**
+	 * The wrapped operator, which will be generated by {@link #factory}.
+	 */
+	private transient OP wrapped;
+
+	private boolean closed;
+	private int endedInputCount;
+
+	public TableOperatorWrapper(
+			StreamOperatorFactory<RowData> factory,
+			String operatorName,
+			List<TypeInformation<?>> allInputTypes,
+			TypeInformation<?> outputType) {
+		this.factory = checkNotNull(factory);
+		this.operatorName = checkNotNull(operatorName);
+		this.outputType = checkNotNull(outputType);
+		this.allInputTypes = checkNotNull(allInputTypes);
+
+		this.inputEdges = new ArrayList<>();
+		this.outputEdges = new ArrayList<>();
+
+		this.endedInputCount = 0;
+	}
+
+	public void createOperator(StreamOperatorParameters<RowData> parameters) {
+		checkArgument(wrapped == null, "This operator has been initialized");
+		if (factory instanceof ProcessingTimeServiceAware) {
+			((ProcessingTimeServiceAware) factory)
+					.setProcessingTimeService(parameters.getProcessingTimeService());
+		}
+		wrapped = factory.createStreamOperator(parameters);
+	}
+
+	public void endOperatorInput(int inputId) throws Exception {
+		endedInputCount++;
+		if (wrapped instanceof BoundedOneInput) {
+			((BoundedOneInput) wrapped).endInput();
+			endOperatorInputForOutput();
+		} else if (wrapped instanceof BoundedMultiInput) {
+			((BoundedMultiInput) wrapped).endInput(inputId);
+			if (endedInputCount >= allInputTypes.size()) {
+				endOperatorInputForOutput();
+			}
+		} else {
+			// some batch operators do not extend from BoundedOneInput, such as BatchCalc
+			endOperatorInputForOutput();
+		}
+	}
+
+	private void endOperatorInputForOutput() throws Exception {
+		for (Edge edge : outputEdges) {
+			edge.target.endOperatorInput(edge.inputId);
+		}
+	}
+
+	public OP getStreamOperator() {
+		return checkNotNull(wrapped);
+	}
+
+	public List<TypeInformation<?>> getAllInputTypes() {
+		return allInputTypes;
+	}
+
+	public TypeInformation<?> getOutputType() {
+		return outputType;
+	}
+
+	public void addInput(
+			TableOperatorWrapper<?> input,
+			int inputId) {
+		Preconditions.checkArgument(inputId > 0 && inputId <= getAllInputTypes().size());
+		Edge edge = new Edge(input, this, inputId);
+		this.inputEdges.add(edge);
+		input.outputEdges.add(edge);
+	}
+
+	public void setManagedMemoryFraction(double managedMemoryFraction) {
+		this.managedMemoryFraction = managedMemoryFraction;
+	}
+
+	public double getManagedMemoryFraction() {
+		return managedMemoryFraction;
+	}
+
+	public List<Edge> getInputEdges() {
+		return inputEdges;
+	}
+
+	public List<TableOperatorWrapper<?>> getInputWrappers() {
+		return inputEdges.stream().map(Edge::getSource).collect(Collectors.toList());
+	}
+
+	public List<Edge> getOutputEdges() {
+		return outputEdges;
+	}
+
+	public List<TableOperatorWrapper<?>> getOutputWrappers() {
+		return outputEdges.stream().map(Edge::getTarget).collect(Collectors.toList());
+	}
+
+	/**
+	 * Checks if the wrapped operator has been closed.
+	 *
+	 * <p>Note that this method must be called in the task thread.
+	 */
+	public boolean isClosed() {
+		return closed;
+	}
+
+	public void close() throws Exception {
+		if (isClosed()) {
+			return;
+		}
+		closed = true;
+		wrapped.close();
+	}
+
+	public String getOperatorName() {
+		return operatorName;
+	}
+
+	@VisibleForTesting
+	public int getEndedInputCount() {
+		return endedInputCount;
+	}
+
+	@Override
+	public String toString() {
+		return operatorName;
+	}
+
+	/**
+	 * The edge connects two {@link TableOperatorWrapper}s.

Review comment:
       connects -> connecting

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator

Review comment:
       really -> real

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;

Review comment:
       Why not merge this into `InputSpec` to be a transient member?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org