You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/20 14:52:26 UTC

[GitHub] [flink] godfreyhe opened a new pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

godfreyhe opened a new pull request #13707:
URL: https://github.com/apache/flink/pull/13707


   
   
   ## What is the purpose of the change
   
   *Transformation is not serializable, while StreamOperatorFactory is. We need to introduce another class (named TableOperatorWrapper) to store the information of a Transformation, and introduce a utility class (named TableOperatorWrapper) to convert the Transformation DAG to TableOperatorWrapper DAG. This pr aims to support this.*
   
   
   ## Brief change log
   
     - *introduce TableOperatorWrapper to store the information of a Tranformation*
     - *Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG*
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added TableOperatorWrapperTest to test TableOperatorWrapper*
     - *Added TableOperatorWrapperGeneratorTest to test TableOperatorWrapperGenerator*
     - *Extended ExplainTest to verify the logic of BatchExecMultipleInputNode#translateToPlanInternal*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13707:
URL: https://github.com/apache/flink/pull/13707#issuecomment-712937023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7956",
       "triggerID" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da UNKNOWN
   * 21c8f136608b0ce47d5bdf3a4b6e27654f90b872 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7956) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13707:
URL: https://github.com/apache/flink/pull/13707#issuecomment-712912201


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da (Tue Oct 20 14:54:19 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13707:
URL: https://github.com/apache/flink/pull/13707#issuecomment-712937023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7956",
       "triggerID" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "triggerType" : "PUSH"
     }, {
       "hash" : "022b84ab6402143d4660e2f2348ea85e08d6649d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "713505230",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "713505230",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "022b84ab6402143d4660e2f2348ea85e08d6649d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "022b84ab6402143d4660e2f2348ea85e08d6649d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da UNKNOWN
   *  Unknown: [CANCELED](TBD) 
   * 022b84ab6402143d4660e2f2348ea85e08d6649d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13707:
URL: https://github.com/apache/flink/pull/13707#discussion_r509142238



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator
+		if (!(transform instanceof UnionTransformation)) {
+			calcParallelismAndResource(transform);
+		}
+
+		final TableOperatorWrapper<?> wrapper;
+		if (visitedTransforms.containsKey(transform)) {
+			wrapper = visitedTransforms.get(transform);
+		} else {
+			wrapper = visitTransformation(transform);
+			visitedTransforms.put(transform, wrapper);
+		}
+		return wrapper;
+	}
+
+	private void calcParallelismAndResource(Transformation<?> transform) {
+		int currentParallelism = transform.getParallelism();
+		if (parallelism < 0) {
+			parallelism = currentParallelism;
+		} else {
+			checkState(
+					currentParallelism < 0 || parallelism == currentParallelism,
+					"Parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");

Review comment:
       good catch, `set the parallelism to the maximum parallelism of the members` looks good to me. but we should add more strict validation for the plan when constructing multiple input node.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13707:
URL: https://github.com/apache/flink/pull/13707#issuecomment-712937023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7956",
       "triggerID" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "713505230",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "022b84ab6402143d4660e2f2348ea85e08d6649d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8005",
       "triggerID" : "713505230",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "022b84ab6402143d4660e2f2348ea85e08d6649d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8005",
       "triggerID" : "022b84ab6402143d4660e2f2348ea85e08d6649d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da UNKNOWN
   *  Unknown: [CANCELED](TBD) 
   * 022b84ab6402143d4660e2f2348ea85e08d6649d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8005) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #13707:
URL: https://github.com/apache/flink/pull/13707#discussion_r509103986



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;

Review comment:
       we should keep `InputSpec` clean. we can return `List<Pair<Transformation<?>, InputSpec>>` instead




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13707:
URL: https://github.com/apache/flink/pull/13707#issuecomment-712937023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da UNKNOWN
   * 21c8f136608b0ce47d5bdf3a4b6e27654f90b872 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13707:
URL: https://github.com/apache/flink/pull/13707#issuecomment-712937023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7956",
       "triggerID" : "21c8f136608b0ce47d5bdf3a4b6e27654f90b872",
       "triggerType" : "PUSH"
     }, {
       "hash" : "022b84ab6402143d4660e2f2348ea85e08d6649d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8005",
       "triggerID" : "713505230",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "022b84ab6402143d4660e2f2348ea85e08d6649d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8005",
       "triggerID" : "022b84ab6402143d4660e2f2348ea85e08d6649d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "713505230",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da UNKNOWN
   * 022b84ab6402143d4660e2f2348ea85e08d6649d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8005) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13707:
URL: https://github.com/apache/flink/pull/13707#issuecomment-712937023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f3f1c3c2277c20f9be0f6b2e490e5fa74807e9da UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe merged pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
godfreyhe merged pull request #13707:
URL: https://github.com/apache/flink/pull/13707


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on pull request #13707:
URL: https://github.com/apache/flink/pull/13707#issuecomment-713505230


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #13707: [FLINK-19737][table] Introduce TableOperatorWrapperGenerator to translate transformation DAG in a multiple-input node to TableOperatorWrapper DAG

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #13707:
URL: https://github.com/apache/flink/pull/13707#discussion_r508990188



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapper.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class handles the close, endInput and other related logic of a {@link StreamOperator}.
+ * It also automatically propagates the end-input operation to the next wrapper that
+ * the {@link #outputEdges} points to, so we only need to call the head wrapper's
+ * {@link #endOperatorInput(int)} method.
+ */
+public class TableOperatorWrapper<OP extends StreamOperator<RowData>> implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The factory to create the wrapped operator.
+	 */
+	private final StreamOperatorFactory<RowData> factory;
+
+	/**
+	 * the operator name for debugging.
+	 */
+	private final String operatorName;
+
+	/**
+	 * The type info of this wrapped operator's all inputs.
+	 *
+	 * <p>NOTE:The inputs of an operator may not all be in the multiple-input operator, e.g.
+	 * The multiple-input operator contains A and J, and A is one of the input of J,
+	 * and another input of J is not in the multiple-input operator.
+	 * <pre>
+	 * -------
+	 *        \
+	 *         J --
+	 *        /
+	 * -- A --
+	 * </pre>
+	 * For this example, `allInputTypes` contains two input types.
+	 */
+	private final List<TypeInformation<?>> allInputTypes;
+
+	/**
+	 * The type info of this wrapped operator's output.
+	 */
+	private final TypeInformation<?> outputType;
+
+	/**
+	 * Managed memory fraction in the multiple-input operator.
+	 */
+	private double managedMemoryFraction = -1;
+
+	/**
+	 * The input edges of this operator wrapper, the edges' target is current instance.
+	 */
+	private final List<Edge> inputEdges;
+
+	/**
+	 * The output edges of this operator wrapper, the edges' source is current instance.
+	 */
+	private final List<Edge> outputEdges;
+
+	/**
+	 * The wrapped operator, which will be generated by {@link #factory}.
+	 */
+	private transient OP wrapped;
+
+	private boolean closed;
+	private int endedInputCount;
+
+	public TableOperatorWrapper(
+			StreamOperatorFactory<RowData> factory,
+			String operatorName,
+			List<TypeInformation<?>> allInputTypes,
+			TypeInformation<?> outputType) {
+		this.factory = checkNotNull(factory);
+		this.operatorName = checkNotNull(operatorName);
+		this.outputType = checkNotNull(outputType);
+		this.allInputTypes = checkNotNull(allInputTypes);
+
+		this.inputEdges = new ArrayList<>();
+		this.outputEdges = new ArrayList<>();
+
+		this.endedInputCount = 0;
+	}
+
+	public void createOperator(StreamOperatorParameters<RowData> parameters) {
+		checkArgument(wrapped == null, "This operator has been initialized");
+		if (factory instanceof ProcessingTimeServiceAware) {
+			((ProcessingTimeServiceAware) factory)
+					.setProcessingTimeService(parameters.getProcessingTimeService());
+		}
+		wrapped = factory.createStreamOperator(parameters);
+	}
+
+	public void endOperatorInput(int inputId) throws Exception {
+		endedInputCount++;
+		if (wrapped instanceof BoundedOneInput) {
+			((BoundedOneInput) wrapped).endInput();
+			endOperatorInputForOutput();
+		} else if (wrapped instanceof BoundedMultiInput) {
+			((BoundedMultiInput) wrapped).endInput(inputId);
+			if (endedInputCount >= allInputTypes.size()) {
+				endOperatorInputForOutput();
+			}
+		} else {
+			// some batch operators do not extend from BoundedOneInput, such as BatchCalc
+			endOperatorInputForOutput();
+		}
+	}
+
+	private void endOperatorInputForOutput() throws Exception {

Review comment:
       `propagateEndOperatorInput`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator
+		if (!(transform instanceof UnionTransformation)) {
+			calcParallelismAndResource(transform);
+		}
+
+		final TableOperatorWrapper<?> wrapper;
+		if (visitedTransforms.containsKey(transform)) {
+			wrapper = visitedTransforms.get(transform);
+		} else {
+			wrapper = visitTransformation(transform);
+			visitedTransforms.put(transform, wrapper);
+		}
+		return wrapper;
+	}
+
+	private void calcParallelismAndResource(Transformation<?> transform) {
+		int currentParallelism = transform.getParallelism();
+		if (parallelism < 0) {
+			parallelism = currentParallelism;
+		} else {
+			checkState(
+					currentParallelism < 0 || parallelism == currentParallelism,
+					"Parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");
+		}
+
+		int currentMaxParallelism = transform.getMaxParallelism();
+		if (maxParallelism < 0) {
+			maxParallelism = currentMaxParallelism;
+		} else {
+			checkState(
+					currentMaxParallelism < 0 || maxParallelism == currentMaxParallelism,
+					"Max parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");
+		}
+
+		if (minResources == null) {
+			minResources = transform.getMinResources();
+			preferredResources = transform.getPreferredResources();
+			managedMemoryWeight = transform.getManagedMemoryOperatorScopeUseCaseWeights()
+					.getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+		} else {
+			minResources = minResources.merge(transform.getMinResources());
+			preferredResources = preferredResources.merge(transform.getPreferredResources());
+			managedMemoryWeight += transform.getManagedMemoryOperatorScopeUseCaseWeights()
+					.getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+		}
+	}
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	private TableOperatorWrapper<?> visitTransformation(Transformation<?> transform) {
+		if (transform instanceof OneInputTransformation) {
+			return visitOneInputTransformation((OneInputTransformation) transform);
+		} else if (transform instanceof TwoInputTransformation) {
+			return visitTwoInputTransformation((TwoInputTransformation) transform);
+		} else  if (transform instanceof UnionTransformation) {
+			return visitUnionTransformation((UnionTransformation) transform);
+		} else  {
+			throw new RuntimeException("Unsupported Transformation: " + transform);
+		}
+	}
+
+	private TableOperatorWrapper<?> visitOneInputTransformation(
+			OneInputTransformation<RowData, RowData> transform) {
+		Transformation<?> input = transform.getInputs().get(0);
+
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				transform.getOperatorFactory(),
+				genSubOperatorName(transform),
+				Collections.singletonList(transform.getInputType()),
+				transform.getOutputType()
+		);
+
+		int inputIdx = inputTransforms.indexOf(input);
+		if (inputIdx >= 0) {
+			orderedInputTransforms.add(input);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx], wrapper, 1));
+			headWrappers.add(wrapper);
+		} else {
+			TableOperatorWrapper<?> inputWrapper = visit(input);
+			wrapper.addInput(inputWrapper, 1);
+		}
+		return wrapper;
+	}
+
+	private TableOperatorWrapper<?> visitTwoInputTransformation(
+			TwoInputTransformation<RowData, RowData, RowData> transform) {
+		Transformation<?> input1 = transform.getInput1();
+		Transformation<?> input2 = transform.getInput2();
+		int inputIdx1 = inputTransforms.indexOf(input1);
+		int inputIdx2 = inputTransforms.indexOf(input2);
+
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				transform.getOperatorFactory(),
+				genSubOperatorName(transform),
+				Arrays.asList(transform.getInputType1(), transform.getInputType2()),
+				transform.getOutputType());
+
+		if (inputIdx1 >= 0 && inputIdx2 >= 0) {
+			orderedInputTransforms.add(input1);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx1], wrapper, 1));
+			orderedInputTransforms.add(input2);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx2], wrapper, 2));
+			headWrappers.add(wrapper);
+		} else if (inputIdx1 >= 0) {
+			TableOperatorWrapper<?> inputWrapper = visit(input2);
+			wrapper.addInput(inputWrapper, 2);
+			orderedInputTransforms.add(input1);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx1], wrapper, 1));
+			headWrappers.add(wrapper);
+		} else if (inputIdx2 >= 0) {
+			TableOperatorWrapper<?> inputWrapper = visit(input1);
+			wrapper.addInput(inputWrapper, 1);
+			orderedInputTransforms.add(input2);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx2], wrapper, 2));
+			headWrappers.add(wrapper);
+		} else {
+			TableOperatorWrapper<?> inputWrapper1 = visit(input1);
+			wrapper.addInput(inputWrapper1, 1);
+			TableOperatorWrapper<?> inputWrapper2 = visit(input2);
+			wrapper.addInput(inputWrapper2, 2);
+		}
+
+		return wrapper;
+	}
+
+	private TableOperatorWrapper<?> visitUnionTransformation(
+			UnionTransformation<RowData> transform) {
+		// use MapFunction to combine the input data
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				SimpleOperatorFactory.of(new UnionStreamOperator()),
+				genSubOperatorName(transform),
+				transform.getInputs().stream().map(Transformation::getOutputType).collect(Collectors.toList()),
+				transform.getOutputType());
+
+		int numberOfHeadInput = 0;
+		for (Transformation<?> input : transform.getInputs()) {
+			int inputIdx = inputTransforms.indexOf(input);
+			if (inputIdx >= 0) {
+				numberOfHeadInput ++;

Review comment:
       extra space

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator
+		if (!(transform instanceof UnionTransformation)) {
+			calcParallelismAndResource(transform);
+		}
+
+		final TableOperatorWrapper<?> wrapper;
+		if (visitedTransforms.containsKey(transform)) {
+			wrapper = visitedTransforms.get(transform);
+		} else {
+			wrapper = visitTransformation(transform);
+			visitedTransforms.put(transform, wrapper);
+		}
+		return wrapper;
+	}
+
+	private void calcParallelismAndResource(Transformation<?> transform) {
+		int currentParallelism = transform.getParallelism();
+		if (parallelism < 0) {
+			parallelism = currentParallelism;
+		} else {
+			checkState(
+					currentParallelism < 0 || parallelism == currentParallelism,
+					"Parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");

Review comment:
       Consider the following case:
   ```
   source1 (100 parallelism) -> calc -\
                                        -> union -> join -> ...
   source2 (50 parallelism)  -> calc -/
   ```
   If both source1 and 2 are chainable, both calc will be merged into the multiple input node. However their parallelism are different. Multiple input creation algorithm handles `ExecNode` which has no information about parallelism, so this problem cannot be avoid by that algorithm.
   
   What I would suggest is to set the parallelism to the maximum parallelism of the members.

##########
File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestOneInputStreamOperator.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * A {@link OneInputStreamOperator} for testing.
+ */
+public class TestOneInputStreamOperator extends AbstractStreamOperator<RowData>

Review comment:
       `TestOneInputStreamOperator` -> `TestingOneInputStreamOperator`

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator
+		if (!(transform instanceof UnionTransformation)) {
+			calcParallelismAndResource(transform);
+		}
+
+		final TableOperatorWrapper<?> wrapper;
+		if (visitedTransforms.containsKey(transform)) {
+			wrapper = visitedTransforms.get(transform);
+		} else {
+			wrapper = visitTransformation(transform);
+			visitedTransforms.put(transform, wrapper);
+		}

Review comment:
       `wrapper = visitedTransforms.computeIfAbsent(transform, t -> visitTransformation(t))`

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator
+		if (!(transform instanceof UnionTransformation)) {
+			calcParallelismAndResource(transform);
+		}
+
+		final TableOperatorWrapper<?> wrapper;
+		if (visitedTransforms.containsKey(transform)) {
+			wrapper = visitedTransforms.get(transform);
+		} else {
+			wrapper = visitTransformation(transform);
+			visitedTransforms.put(transform, wrapper);
+		}
+		return wrapper;
+	}
+
+	private void calcParallelismAndResource(Transformation<?> transform) {
+		int currentParallelism = transform.getParallelism();
+		if (parallelism < 0) {
+			parallelism = currentParallelism;
+		} else {
+			checkState(
+					currentParallelism < 0 || parallelism == currentParallelism,
+					"Parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");
+		}
+
+		int currentMaxParallelism = transform.getMaxParallelism();
+		if (maxParallelism < 0) {
+			maxParallelism = currentMaxParallelism;
+		} else {
+			checkState(
+					currentMaxParallelism < 0 || maxParallelism == currentMaxParallelism,
+					"Max parallelism of a transformation in MultipleInputNode is different from others. This is a bug.");
+		}
+
+		if (minResources == null) {
+			minResources = transform.getMinResources();
+			preferredResources = transform.getPreferredResources();
+			managedMemoryWeight = transform.getManagedMemoryOperatorScopeUseCaseWeights()
+					.getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+		} else {
+			minResources = minResources.merge(transform.getMinResources());
+			preferredResources = preferredResources.merge(transform.getPreferredResources());
+			managedMemoryWeight += transform.getManagedMemoryOperatorScopeUseCaseWeights()
+					.getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0);
+		}
+	}
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	private TableOperatorWrapper<?> visitTransformation(Transformation<?> transform) {
+		if (transform instanceof OneInputTransformation) {
+			return visitOneInputTransformation((OneInputTransformation) transform);
+		} else if (transform instanceof TwoInputTransformation) {
+			return visitTwoInputTransformation((TwoInputTransformation) transform);
+		} else  if (transform instanceof UnionTransformation) {
+			return visitUnionTransformation((UnionTransformation) transform);
+		} else  {
+			throw new RuntimeException("Unsupported Transformation: " + transform);
+		}
+	}
+
+	private TableOperatorWrapper<?> visitOneInputTransformation(
+			OneInputTransformation<RowData, RowData> transform) {
+		Transformation<?> input = transform.getInputs().get(0);
+
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				transform.getOperatorFactory(),
+				genSubOperatorName(transform),
+				Collections.singletonList(transform.getInputType()),
+				transform.getOutputType()
+		);
+
+		int inputIdx = inputTransforms.indexOf(input);
+		if (inputIdx >= 0) {
+			orderedInputTransforms.add(input);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx], wrapper, 1));
+			headWrappers.add(wrapper);
+		} else {
+			TableOperatorWrapper<?> inputWrapper = visit(input);
+			wrapper.addInput(inputWrapper, 1);
+		}
+		return wrapper;
+	}
+
+	private TableOperatorWrapper<?> visitTwoInputTransformation(
+			TwoInputTransformation<RowData, RowData, RowData> transform) {
+		Transformation<?> input1 = transform.getInput1();
+		Transformation<?> input2 = transform.getInput2();
+		int inputIdx1 = inputTransforms.indexOf(input1);
+		int inputIdx2 = inputTransforms.indexOf(input2);
+
+		TableOperatorWrapper<?> wrapper = new TableOperatorWrapper<>(
+				transform.getOperatorFactory(),
+				genSubOperatorName(transform),
+				Arrays.asList(transform.getInputType1(), transform.getInputType2()),
+				transform.getOutputType());
+
+		if (inputIdx1 >= 0 && inputIdx2 >= 0) {
+			orderedInputTransforms.add(input1);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx1], wrapper, 1));
+			orderedInputTransforms.add(input2);
+			inputSpecs.add(createInputSpec(readOrders[inputIdx2], wrapper, 2));
+			headWrappers.add(wrapper);

Review comment:
       extract into a function

##########
File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/multipleinput/TestTwoInputStreamOperator.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link TwoInputStreamOperator} for testing.
+ */
+public class TestTwoInputStreamOperator extends AbstractStreamOperator<RowData>

Review comment:
       ditto

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapper.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class handles the close, endInput and other related logic of a {@link StreamOperator}.
+ * It also automatically propagates the end-input operation to the next wrapper that
+ * the {@link #outputEdges} points to, so we only need to call the head wrapper's
+ * {@link #endOperatorInput(int)} method.
+ */
+public class TableOperatorWrapper<OP extends StreamOperator<RowData>> implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The factory to create the wrapped operator.
+	 */
+	private final StreamOperatorFactory<RowData> factory;
+
+	/**
+	 * the operator name for debugging.
+	 */
+	private final String operatorName;
+
+	/**
+	 * The type info of this wrapped operator's all inputs.
+	 *
+	 * <p>NOTE:The inputs of an operator may not all be in the multiple-input operator, e.g.
+	 * The multiple-input operator contains A and J, and A is one of the input of J,
+	 * and another input of J is not in the multiple-input operator.
+	 * <pre>
+	 * -------
+	 *        \
+	 *         J --
+	 *        /
+	 * -- A --
+	 * </pre>
+	 * For this example, `allInputTypes` contains two input types.
+	 */
+	private final List<TypeInformation<?>> allInputTypes;
+
+	/**
+	 * The type info of this wrapped operator's output.
+	 */
+	private final TypeInformation<?> outputType;
+
+	/**
+	 * Managed memory fraction in the multiple-input operator.
+	 */
+	private double managedMemoryFraction = -1;
+
+	/**
+	 * The input edges of this operator wrapper, the edges' target is current instance.
+	 */
+	private final List<Edge> inputEdges;
+
+	/**
+	 * The output edges of this operator wrapper, the edges' source is current instance.
+	 */
+	private final List<Edge> outputEdges;
+
+	/**
+	 * The wrapped operator, which will be generated by {@link #factory}.
+	 */
+	private transient OP wrapped;
+
+	private boolean closed;
+	private int endedInputCount;
+
+	public TableOperatorWrapper(
+			StreamOperatorFactory<RowData> factory,
+			String operatorName,
+			List<TypeInformation<?>> allInputTypes,
+			TypeInformation<?> outputType) {
+		this.factory = checkNotNull(factory);
+		this.operatorName = checkNotNull(operatorName);
+		this.outputType = checkNotNull(outputType);
+		this.allInputTypes = checkNotNull(allInputTypes);
+
+		this.inputEdges = new ArrayList<>();
+		this.outputEdges = new ArrayList<>();
+
+		this.endedInputCount = 0;
+	}
+
+	public void createOperator(StreamOperatorParameters<RowData> parameters) {
+		checkArgument(wrapped == null, "This operator has been initialized");
+		if (factory instanceof ProcessingTimeServiceAware) {
+			((ProcessingTimeServiceAware) factory)
+					.setProcessingTimeService(parameters.getProcessingTimeService());
+		}
+		wrapped = factory.createStreamOperator(parameters);
+	}
+
+	public void endOperatorInput(int inputId) throws Exception {
+		endedInputCount++;
+		if (wrapped instanceof BoundedOneInput) {
+			((BoundedOneInput) wrapped).endInput();
+			endOperatorInputForOutput();
+		} else if (wrapped instanceof BoundedMultiInput) {
+			((BoundedMultiInput) wrapped).endInput(inputId);
+			if (endedInputCount >= allInputTypes.size()) {
+				endOperatorInputForOutput();
+			}
+		} else {
+			// some batch operators do not extend from BoundedOneInput, such as BatchCalc
+			endOperatorInputForOutput();
+		}
+	}
+
+	private void endOperatorInputForOutput() throws Exception {
+		for (Edge edge : outputEdges) {
+			edge.target.endOperatorInput(edge.inputId);
+		}
+	}
+
+	public OP getStreamOperator() {
+		return checkNotNull(wrapped);
+	}
+
+	public List<TypeInformation<?>> getAllInputTypes() {
+		return allInputTypes;
+	}
+
+	public TypeInformation<?> getOutputType() {
+		return outputType;
+	}
+
+	public void addInput(
+			TableOperatorWrapper<?> input,
+			int inputId) {
+		Preconditions.checkArgument(inputId > 0 && inputId <= getAllInputTypes().size());
+		Edge edge = new Edge(input, this, inputId);
+		this.inputEdges.add(edge);
+		input.outputEdges.add(edge);
+	}
+
+	public void setManagedMemoryFraction(double managedMemoryFraction) {
+		this.managedMemoryFraction = managedMemoryFraction;
+	}
+
+	public double getManagedMemoryFraction() {
+		return managedMemoryFraction;
+	}
+
+	public List<Edge> getInputEdges() {
+		return inputEdges;
+	}
+
+	public List<TableOperatorWrapper<?>> getInputWrappers() {
+		return inputEdges.stream().map(Edge::getSource).collect(Collectors.toList());
+	}
+
+	public List<Edge> getOutputEdges() {
+		return outputEdges;
+	}
+
+	public List<TableOperatorWrapper<?>> getOutputWrappers() {
+		return outputEdges.stream().map(Edge::getTarget).collect(Collectors.toList());
+	}
+
+	/**
+	 * Checks if the wrapped operator has been closed.
+	 *
+	 * <p>Note that this method must be called in the task thread.
+	 */
+	public boolean isClosed() {
+		return closed;
+	}
+
+	public void close() throws Exception {
+		if (isClosed()) {
+			return;
+		}
+		closed = true;
+		wrapped.close();
+	}
+
+	public String getOperatorName() {
+		return operatorName;
+	}
+
+	@VisibleForTesting
+	public int getEndedInputCount() {
+		return endedInputCount;
+	}
+
+	@Override
+	public String toString() {
+		return operatorName;
+	}
+
+	/**
+	 * The edge connects two {@link TableOperatorWrapper}s.

Review comment:
       connects -> connecting

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;
+
+	/**
+	 * The input specs which order corresponds to the order of {@link #orderedInputTransforms}.
+	 */
+	private final List<InputSpec> inputSpecs;
+
+	/**
+	 * The head (leaf) operator wrappers of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final List<TableOperatorWrapper<?>> headWrappers;
+
+	/**
+	 * The tail (root) operator wrapper of the operator-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private TableOperatorWrapper<?> tailWrapper;
+
+	/**
+	 * Map the visited transformation to its generated TableOperatorWrapper.
+	 */
+	private final Map<Transformation<?>, TableOperatorWrapper<?>> visitedTransforms;
+	/**
+	 * The identifier for each sub operator in {@link MultipleInputStreamOperator}.
+	 */
+	private int identifierOfSubOp = 0;
+
+	private int parallelism;
+	private int maxParallelism;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
+	/**
+	 * managed memory weight for batch operator.
+	 */
+	private int managedMemoryWeight;
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform) {
+		this(inputTransforms, tailTransform, new int[inputTransforms.size()]);
+	}
+
+	public TableOperatorWrapperGenerator(
+			List<Transformation<?>> inputTransforms,
+			Transformation<?> tailTransform,
+			int[] readOrders) {
+		this.inputTransforms = inputTransforms;
+		this.tailTransform = tailTransform;
+		this.readOrders = readOrders;
+		this.inputSpecs = new ArrayList<>();
+		this.headWrappers = new ArrayList<>();
+		this.orderedInputTransforms = new ArrayList<>();
+		this.visitedTransforms = new IdentityHashMap<>();
+
+		this.parallelism = -1;
+		this.maxParallelism = -1;
+	}
+
+	public void generate() {
+		tailWrapper = visit(tailTransform);
+		checkState(orderedInputTransforms.size() == inputTransforms.size());
+		checkState(orderedInputTransforms.size() == inputSpecs.size());
+		calculateManagedMemoryFraction();
+	}
+
+	public List<Transformation<?>> getOrderedInputTransforms() {
+		return orderedInputTransforms;
+	}
+
+	public List<InputSpec> getInputSpecs() {
+		return inputSpecs;
+	}
+
+	public List<TableOperatorWrapper<?>> getHeadWrappers() {
+		return headWrappers;
+	}
+
+	public TableOperatorWrapper<?> getTailWrapper() {
+		return tailWrapper;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	public ResourceSpec getMinResources() {
+		return minResources;
+	}
+
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
+	}
+
+	public int getManagedMemoryWeight() {
+		return managedMemoryWeight;
+	}
+
+	private TableOperatorWrapper<?> visit(Transformation<?> transform) {
+		// ignore UnionTransformation because it's not a really operator

Review comment:
       really -> real

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGenerator.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.multipleinput;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A generator that generates a {@link TableOperatorWrapper} graph from a graph of {@link Transformation}.
+ */
+public class TableOperatorWrapperGenerator {
+
+	/**
+	 * Original input transformations for {@link MultipleInputStreamOperator}.
+	 */
+	private final List<Transformation<?>> inputTransforms;
+
+	/**
+	 * The tail (root) transformation of the transformation-graph in {@link MultipleInputStreamOperator}.
+	 */
+	private final Transformation<?> tailTransform;
+
+	/**
+	 * The read order corresponding to each transformation in {@link #inputTransforms}.
+	 */
+	private final int[] readOrders;
+
+	/**
+	 * Reordered input transformations which order corresponds to the order of {@link #inputSpecs}.
+	 */
+	private final List<Transformation<?>> orderedInputTransforms;

Review comment:
       Why not merge this into `InputSpec` to be a transient member?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org