You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/07 14:30:00 UTC

[GitHub] [flink] kl0u opened a new pull request #13556: Flink 19485 final

kl0u opened a new pull request #13556:
URL: https://github.com/apache/flink/pull/13556


   ## What is the purpose of the change
   
   Although `DataStream` is going to be the unified API for Batch and Streaming applications (see [FLIP-134](https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API) ) , some operations, _e.g._ Sinks, may need to have different runtime implementations depending on if they are intended to run on bounded or unbounded data. This is not necessarily only for optimisations but also for the exposed semantics, _i.e._ correctness.
   
   So far, DataStream had a 1-to-1 mapping between an API call and an operator. In a sense, the DataStream API was an "explicit" API. With this addition, we will decouple the API calls from the actual runtime implementations of the operations and thus allow different operations to have more than one runtime implementations, depending (for now) on the `execution.runtime-mode`.
   
   In this PR we introduce the `StreamGraphTranslator` interface, which is the main new entity responsible for translating a `Transformation` into its runtime implementation, the framework based on which a developer can write a `StreamGraphTranslator` for a new `Transformation` and wire it to the `StreamGraphGenerator` and an example implementation for the `OneInputTransformation`.
   
   ## Brief change log
   
   The changes are mainly in the `StreamGraphGenerator` and the addition of the `StreamGraphTranslator`/`BaseSimpleTransformationTranslator`/`OneInputTranslator`.
   
   ## Verifying this change
   
   With the addition of the new `OneInputTranslator`, all existing tests (including e2e tests) using an `OneInputTransformation` also verify the changes in this PR.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   
   **NOTE**: I would also like to move the `streaming.api.graph` to a new `streaming.runtime.graph` package as this seems more "correct" but this will fail because if we move the `StreamGraph`, then we will break binary compatibility because of the `LocalStreamEnvironment.execute(streamGraph)` and `RemoteStreamEnvironment.execute(streamGraph)`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r502328312



##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
##########
@@ -445,9 +443,10 @@ public void testParallelismOnLimitPushDown() {
 		RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
 		ExecNode execNode = planner.translateToExecNodePlan(toScala(Collections.singletonList(relNode))).get(0);
 		@SuppressWarnings("unchecked")
-		Transformation transformation = execNode.translateToPlan(planner);
-		Assert.assertEquals(1, ((PartitionTransformation) ((OneInputTransformation) transformation).getInput())
-			.getInput().getParallelism());
+		Transformation transformation = (Transformation) ((Transformation) execNode.translateToPlan(planner).getInputs().get(0)).getInputs().get(0);

Review comment:
       (I know it was weird before as well)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2cca39eb37be468850c114f3d646aa300113664",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2cca39eb37be468850c114f3d646aa300113664",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337",
       "triggerID" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7351",
       "triggerID" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "084ed6857fc97334bccba5046a90d3990749c752",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "084ed6857fc97334bccba5046a90d3990749c752",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e2cca39eb37be468850c114f3d646aa300113664 UNKNOWN
   * 37ea3cc8e2dfcdb80f81952b653bce40b2326ed5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7351) 
   * 084ed6857fc97334bccba5046a90d3990749c752 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r502253641



##########
File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
##########
@@ -513,6 +514,12 @@ public long getBufferTimeout() {
 	 */
 	public abstract Collection<Transformation<?>> getTransitivePredecessors();
 
+	/**
+	 * Returns the {@link Transformation transformations} that are the
+	 * immediate predecessors of the current transformation in the transformation graph.
+	 */
+	public abstract List<Transformation<?>> getInputs();

Review comment:
       very nitpicking: How about we unify the return type of `getTransitivePredecessors` & `getInputs` ?

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
##########
@@ -445,9 +443,10 @@ public void testParallelismOnLimitPushDown() {
 		RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
 		ExecNode execNode = planner.translateToExecNodePlan(toScala(Collections.singletonList(relNode))).get(0);
 		@SuppressWarnings("unchecked")
-		Transformation transformation = execNode.translateToPlan(planner);
-		Assert.assertEquals(1, ((PartitionTransformation) ((OneInputTransformation) transformation).getInput())
-			.getInput().getParallelism());
+		Transformation transformation = (Transformation) ((Transformation) execNode.translateToPlan(planner).getInputs().get(0)).getInputs().get(0);

Review comment:
       This looks extremely weird.
   
   How about:
   ```
   		@SuppressWarnings("unchecked")
   		ExecNode<PlannerBase, ?> execNode = (ExecNode<PlannerBase, ?>) planner.translateToExecNodePlan(toScala(
   			Collections.singletonList(relNode))).get(0);
   		Transformation<?> transformation = (execNode.translateToPlan(planner).getInputs().get(0)).getInputs().get(0);
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -127,6 +129,16 @@
 
 	private RuntimeExecutionMode runtimeExecutionMode = RuntimeExecutionMode.STREAMING;
 
+	private boolean shouldExecuteInBatchMode;
+
+	private static final Map<Class<? extends Transformation>, StreamGraphTranslator<?, ? extends Transformation>> translatorMap;

Review comment:
       Could we add suppressions here?
   
   ```
   	@SuppressWarnings("rawtypes")
   	private static final Map<Class<? extends Transformation>, StreamGraphTranslator<?, ? extends Transformation>> translatorMap;
   
   	static {
   		@SuppressWarnings("rawtypes")
   		Map<Class<? extends Transformation>, StreamGraphTranslator<?, ? extends Transformation>> tmp = new HashMap<>();
   		tmp.put(OneInputTransformation.class, new OneInputTranslator<>());
   		translatorMap = Collections.unmodifiableMap(tmp);
   	}
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -276,10 +289,28 @@ private boolean isUnboundedSource(final Transformation<?> transformation) {
 		// call at least once to trigger exceptions about MissingTypeInfo
 		transform.getOutputType();
 
+		final StreamGraphTranslator<?, Transformation<?>> translator =

Review comment:
       add suppression




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r501796258



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/BaseSimpleTransformationTranslator.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link StreamGraphTranslator StreamGraphTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class BaseSimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements StreamGraphTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH-style} execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the nodes corresponding to this transformation in the transformation graph.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING-style} execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the nodes corresponding to this transformation in the transformation graph.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		streamGraph.setManagedMemoryUseCaseWeights(

Review comment:
       Just a quick comment here. I think we might need to set different weights/use cases depending on the runtime mode. E.g. in the BATCH runtime we will declare `ManagedMemoryUseCase.BATCH_OP`, whereas for streaming `ManagedMemoryUseCase.ROCKS_DB`.
   
   I am still not entirely sure, how this should all play together, but thats a concern that I have.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2cca39eb37be468850c114f3d646aa300113664",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2cca39eb37be468850c114f3d646aa300113664",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302) 
   * e2cca39eb37be468850c114f3d646aa300113664 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 40f3f7d289665256ea5a8a938cbbf4ded64396f1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r503239589



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link TransformationTranslator TransformationTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class SimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements TransformationTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for BATCH-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for STREAMING-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
+		if (streamNode != null

Review comment:
       You mean throw an exception if one is set and the other is not, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704977057


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 40f3f7d289665256ea5a8a938cbbf4ded64396f1 (Wed Oct 07 14:31:43 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **Invalid pull request title: No valid Jira ID provided**
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 40f3f7d289665256ea5a8a938cbbf4ded64396f1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u closed pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
kl0u closed pull request #13556:
URL: https://github.com/apache/flink/pull/13556


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2cca39eb37be468850c114f3d646aa300113664",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2cca39eb37be468850c114f3d646aa300113664",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337",
       "triggerID" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e2cca39eb37be468850c114f3d646aa300113664 UNKNOWN
   * 4c39d77d637ed92a9db9680ff0f4a6f50783722d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337) 
   * 37ea3cc8e2dfcdb80f81952b653bce40b2326ed5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2cca39eb37be468850c114f3d646aa300113664",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2cca39eb37be468850c114f3d646aa300113664",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337",
       "triggerID" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7351",
       "triggerID" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e2cca39eb37be468850c114f3d646aa300113664 UNKNOWN
   * 37ea3cc8e2dfcdb80f81952b653bce40b2326ed5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7351) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2cca39eb37be468850c114f3d646aa300113664",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2cca39eb37be468850c114f3d646aa300113664",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302) 
   * e2cca39eb37be468850c114f3d646aa300113664 UNKNOWN
   * 4c39d77d637ed92a9db9680ff0f4a6f50783722d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r503288869



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link TransformationTranslator TransformationTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class SimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements TransformationTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for BATCH-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for STREAMING-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
+		if (streamNode != null

Review comment:
       I am wondering do we want to fail, or simply give priority to what is set in the `translateForBatch/Stream`? The reason is that I do not know what is done in the Table API with the existing transformations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r503244871



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link TransformationTranslator TransformationTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class SimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements TransformationTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for BATCH-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for STREAMING-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
+		if (streamNode != null

Review comment:
       I mean if both the `translateForBatch/Stream` and the `Transformation` itself define some requirements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r503305189



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link TransformationTranslator TransformationTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class SimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements TransformationTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for BATCH-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for STREAMING-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
+		if (streamNode != null

Review comment:
       I see and I agree that this can be a point of concern. For now, I will leave it like this because the `translateForBatch/Streaming` is new, so hopefully whoever touches it, will know what he/she is doing. And as a last resort we use the `Transformation` setting as default. In the future we will see, as this may also change now that the `Translator` could become "thinner".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
kl0u commented on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-707134203


   Merging.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r502396973



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/BaseSimpleTransformationTranslator.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link StreamGraphTranslator StreamGraphTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class BaseSimpleTransformationTranslator<OUT, T extends Transformation<OUT>>

Review comment:
       That was also the initial name I gave, but the problem is that this will be also the base for Transformations like the SideOutput and the Partition, which do not have underlying operators.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/BaseSimpleTransformationTranslator.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link StreamGraphTranslator StreamGraphTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class BaseSimpleTransformationTranslator<OUT, T extends Transformation<OUT>>

Review comment:
       I went with "simple" as opposed to "composite" that may be the ones that have more than one sub-operations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
kl0u commented on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-706399361


   Thanks for the reviews @aljoscha and @dawidwys ! I integrated them and rebased. Let me know if you have any more comments and if not, I will merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r501802045



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}
+ * to its runtime implementation depending on the execution mode it is being executed.
+ * Currently, the execution mode can be either
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING} or
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH}.

Review comment:
       I think we don't need to refer to the API facing enum here, it's enough to know that we translate either for batch or streaming. Because the enum technically also has `AUTOMATIC`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}

Review comment:
       Should be `{@link StreamGraphTranslator}` or `{@code StreamGraphTranslator}`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}
+ * to its runtime implementation depending on the execution mode it is being executed.

Review comment:
       ```suggestion
    * to its runtime implementation depending on the execution mode.
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}
+ * to its runtime implementation depending on the execution mode it is being executed.
+ * Currently, the execution mode can be either
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING} or
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH}.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public interface StreamGraphTranslator<OUT, T extends Transformation<OUT>> {
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH-style} execution.

Review comment:
       Same as above about `RuntimeExecutionMode`

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/BaseSimpleTransformationTranslator.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link StreamGraphTranslator StreamGraphTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class BaseSimpleTransformationTranslator<OUT, T extends Transformation<OUT>>

Review comment:
       Maybe `SingleOperatorTranslator` would be more concise. 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}
+ * to its runtime implementation depending on the execution mode it is being executed.
+ * Currently, the execution mode can be either
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING} or
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH}.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public interface StreamGraphTranslator<OUT, T extends Transformation<OUT>> {
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH-style} execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the nodes corresponding to this transformation in the transformation graph.
+	 */
+	Collection<Integer> translateForBatch(
+			final T transformation,
+			final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING-style} execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the nodes corresponding to this transformation in the transformation graph.
+	 */
+	Collection<Integer> translateForStreaming(
+			final T transformation,
+			final Context context);
+
+	/**
+	 * A context giving the necessary information for the translation of a given transformation.
+	 */
+	interface Context {
+
+		/**
+		 * Returns the {@link StreamGraph} being created as the transformations
+		 * of a pipeline are translated to their runtime implementations.
+		 */
+		StreamGraph getStreamGraph();
+
+		/**
+		 * Returns the ids of the nodes in the {@link StreamGraph} corresponding to the
+		 * provided transformation.
+		 *
+		 * @param transformation the transformation whose nodes' ids we want.
+		 * @return The requested ids.
+		 */
+		Collection<Integer> getTransformationIds(final Transformation<?> transformation);

Review comment:
       Should maybe be `getStreamNodeIds()`, because it doesn't return transformation its.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2cca39eb37be468850c114f3d646aa300113664",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2cca39eb37be468850c114f3d646aa300113664",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337",
       "triggerID" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7351",
       "triggerID" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "084ed6857fc97334bccba5046a90d3990749c752",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7454",
       "triggerID" : "084ed6857fc97334bccba5046a90d3990749c752",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e2cca39eb37be468850c114f3d646aa300113664 UNKNOWN
   * 37ea3cc8e2dfcdb80f81952b653bce40b2326ed5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7351) 
   * 084ed6857fc97334bccba5046a90d3990749c752 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7454) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 40f3f7d289665256ea5a8a938cbbf4ded64396f1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270) 
   * 455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r501796258



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/BaseSimpleTransformationTranslator.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link StreamGraphTranslator StreamGraphTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class BaseSimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements StreamGraphTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH-style} execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the nodes corresponding to this transformation in the transformation graph.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING-style} execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the nodes corresponding to this transformation in the transformation graph.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		streamGraph.setManagedMemoryUseCaseWeights(

Review comment:
       Just a quick comment here. I think we might need to set different weights/use cases depending on the runtime mode. E.g. in the BATCH runtime we will declare `ManagedMemoryUseCase.BATCH_OP`, whereas for streaming `ManagedMemoryUseCase.ROCKS_DB`.
   
   I am still not entirely sure, how this should all play together, but thats a concern that I have.

##########
File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
##########
@@ -513,6 +514,12 @@ public long getBufferTimeout() {
 	 */
 	public abstract Collection<Transformation<?>> getTransitivePredecessors();
 
+	/**
+	 * Returns the {@link Transformation transformations} that are the
+	 * immediate predecessors of the current transformation in the transformation graph.
+	 */
+	public abstract List<Transformation<?>> getInputs();

Review comment:
       very nitpicking: How about we unify the return type of `getTransitivePredecessors` & `getInputs` ?

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
##########
@@ -445,9 +443,10 @@ public void testParallelismOnLimitPushDown() {
 		RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
 		ExecNode execNode = planner.translateToExecNodePlan(toScala(Collections.singletonList(relNode))).get(0);
 		@SuppressWarnings("unchecked")
-		Transformation transformation = execNode.translateToPlan(planner);
-		Assert.assertEquals(1, ((PartitionTransformation) ((OneInputTransformation) transformation).getInput())
-			.getInput().getParallelism());
+		Transformation transformation = (Transformation) ((Transformation) execNode.translateToPlan(planner).getInputs().get(0)).getInputs().get(0);

Review comment:
       This looks extremely weird.
   
   How about:
   ```
   		@SuppressWarnings("unchecked")
   		ExecNode<PlannerBase, ?> execNode = (ExecNode<PlannerBase, ?>) planner.translateToExecNodePlan(toScala(
   			Collections.singletonList(relNode))).get(0);
   		Transformation<?> transformation = (execNode.translateToPlan(planner).getInputs().get(0)).getInputs().get(0);
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -127,6 +129,16 @@
 
 	private RuntimeExecutionMode runtimeExecutionMode = RuntimeExecutionMode.STREAMING;
 
+	private boolean shouldExecuteInBatchMode;
+
+	private static final Map<Class<? extends Transformation>, StreamGraphTranslator<?, ? extends Transformation>> translatorMap;

Review comment:
       Could we add suppressions here?
   
   ```
   	@SuppressWarnings("rawtypes")
   	private static final Map<Class<? extends Transformation>, StreamGraphTranslator<?, ? extends Transformation>> translatorMap;
   
   	static {
   		@SuppressWarnings("rawtypes")
   		Map<Class<? extends Transformation>, StreamGraphTranslator<?, ? extends Transformation>> tmp = new HashMap<>();
   		tmp.put(OneInputTransformation.class, new OneInputTranslator<>());
   		translatorMap = Collections.unmodifiableMap(tmp);
   	}
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -276,10 +289,28 @@ private boolean isUnboundedSource(final Transformation<?> transformation) {
 		// call at least once to trigger exceptions about MissingTypeInfo
 		transform.getOutputType();
 
+		final StreamGraphTranslator<?, Transformation<?>> translator =

Review comment:
       add suppression

##########
File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
##########
@@ -445,9 +443,10 @@ public void testParallelismOnLimitPushDown() {
 		RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
 		ExecNode execNode = planner.translateToExecNodePlan(toScala(Collections.singletonList(relNode))).get(0);
 		@SuppressWarnings("unchecked")
-		Transformation transformation = execNode.translateToPlan(planner);
-		Assert.assertEquals(1, ((PartitionTransformation) ((OneInputTransformation) transformation).getInput())
-			.getInput().getParallelism());
+		Transformation transformation = (Transformation) ((Transformation) execNode.translateToPlan(planner).getInputs().get(0)).getInputs().get(0);

Review comment:
       (I know it was weird before as well)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2cca39eb37be468850c114f3d646aa300113664",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2cca39eb37be468850c114f3d646aa300113664",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337",
       "triggerID" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302) 
   * e2cca39eb37be468850c114f3d646aa300113664 UNKNOWN
   * 4c39d77d637ed92a9db9680ff0f4a6f50783722d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2cca39eb37be468850c114f3d646aa300113664",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2cca39eb37be468850c114f3d646aa300113664",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337",
       "triggerID" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7351",
       "triggerID" : "37ea3cc8e2dfcdb80f81952b653bce40b2326ed5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e2cca39eb37be468850c114f3d646aa300113664 UNKNOWN
   * 4c39d77d637ed92a9db9680ff0f4a6f50783722d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337) 
   * 37ea3cc8e2dfcdb80f81952b653bce40b2326ed5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7351) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2cca39eb37be468850c114f3d646aa300113664",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2cca39eb37be468850c114f3d646aa300113664",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337",
       "triggerID" : "4c39d77d637ed92a9db9680ff0f4a6f50783722d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e2cca39eb37be468850c114f3d646aa300113664 UNKNOWN
   * 4c39d77d637ed92a9db9680ff0f4a6f50783722d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7337) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r503244871



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link TransformationTranslator TransformationTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class SimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements TransformationTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for BATCH-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for STREAMING-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
+		if (streamNode != null

Review comment:
       I mean if both the `translateForBatch/Stream` and the transformation itself define some requirements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r503302860



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link TransformationTranslator TransformationTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class SimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements TransformationTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for BATCH-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for STREAMING-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
+		if (streamNode != null

Review comment:
       However I am not strong on that. Just wanted to bring it up for a discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r502396973



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/BaseSimpleTransformationTranslator.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link StreamGraphTranslator StreamGraphTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class BaseSimpleTransformationTranslator<OUT, T extends Transformation<OUT>>

Review comment:
       That was also the initial name I gave, but the problem is that this will be also the base for Transformations like the SideOutput and the Partition, which do not have underlying operators.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r503184860



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link TransformationTranslator TransformationTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class SimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements TransformationTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for BATCH-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for STREAMING-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
+		if (streamNode != null

Review comment:
       nit: How about we throw an exception if both are set? Otherwise it might hide some problems, as we mix different memory requirements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r501802045



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}
+ * to its runtime implementation depending on the execution mode it is being executed.
+ * Currently, the execution mode can be either
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING} or
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH}.

Review comment:
       I think we don't need to refer to the API facing enum here, it's enough to know that we translate either for batch or streaming. Because the enum technically also has `AUTOMATIC`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}

Review comment:
       Should be `{@link StreamGraphTranslator}` or `{@code StreamGraphTranslator}`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}
+ * to its runtime implementation depending on the execution mode it is being executed.

Review comment:
       ```suggestion
    * to its runtime implementation depending on the execution mode.
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}
+ * to its runtime implementation depending on the execution mode it is being executed.
+ * Currently, the execution mode can be either
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING} or
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH}.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public interface StreamGraphTranslator<OUT, T extends Transformation<OUT>> {
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH-style} execution.

Review comment:
       Same as above about `RuntimeExecutionMode`

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/BaseSimpleTransformationTranslator.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link StreamGraphTranslator StreamGraphTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class BaseSimpleTransformationTranslator<OUT, T extends Transformation<OUT>>

Review comment:
       Maybe `SingleOperatorTranslator` would be more concise. 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link Transformation}
+ * to its runtime implementation depending on the execution mode it is being executed.
+ * Currently, the execution mode can be either
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING} or
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH}.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public interface StreamGraphTranslator<OUT, T extends Transformation<OUT>> {
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH-style} execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the nodes corresponding to this transformation in the transformation graph.
+	 */
+	Collection<Integer> translateForBatch(
+			final T transformation,
+			final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for
+	 * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING STREAMING-style} execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the nodes corresponding to this transformation in the transformation graph.
+	 */
+	Collection<Integer> translateForStreaming(
+			final T transformation,
+			final Context context);
+
+	/**
+	 * A context giving the necessary information for the translation of a given transformation.
+	 */
+	interface Context {
+
+		/**
+		 * Returns the {@link StreamGraph} being created as the transformations
+		 * of a pipeline are translated to their runtime implementations.
+		 */
+		StreamGraph getStreamGraph();
+
+		/**
+		 * Returns the ids of the nodes in the {@link StreamGraph} corresponding to the
+		 * provided transformation.
+		 *
+		 * @param transformation the transformation whose nodes' ids we want.
+		 * @return The requested ids.
+		 */
+		Collection<Integer> getTransformationIds(final Transformation<?> transformation);

Review comment:
       Should maybe be `getStreamNodeIds()`, because it doesn't return transformation its.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -782,16 +797,33 @@ private boolean isUnboundedSource(final Transformation<?> transformation) {
 		for (int i = 0; i < allInputIds.size(); i++) {
 			Collection<Integer> inputIds = allInputIds.get(i);
 			for (Integer inputId: inputIds) {
-				streamGraph.addEdge(inputId,
-					transform.getId(),
-					i + 1
-				);
+				streamGraph.addEdge(inputId, transform.getId(), i + 1);
 			}
 		}
 
 		return Collections.singleton(transform.getId());
 	}
 
+	/**
+	 * Returns a list of lists containing the ids of the nodes in the transformation graph
+	 * that correspond to the provided transformations. Each transformation may have multiple nodes.

Review comment:
       Maybe add here that `Transformations` will be translated if they are not already translated.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTranslator.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.translators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.graph.BaseSimpleTransformationTranslator;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.TransformationTranslator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TransformationTranslator} for the {@link OneInputTransformation}.
+ *
+ * @param <IN> The type of the elements in the input {@code Transformation} of the transformation to translate.
+ * @param <OUT> The type of the elements that result from the provided {@code OneInputTransformation}.
+ */
+@Internal
+public class OneInputTranslator<IN, OUT> extends BaseSimpleTransformationTranslator<OUT, OneInputTransformation<IN, OUT>> {

Review comment:
       Could be `OneInputTransformationTranslator`. It better fits the naming scheme but I do realise that it's a bit long... 😅




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 40f3f7d289665256ea5a8a938cbbf4ded64396f1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13556:
URL: https://github.com/apache/flink/pull/13556#issuecomment-704987982


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270",
       "triggerID" : "40f3f7d289665256ea5a8a938cbbf4ded64396f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302",
       "triggerID" : "455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 40f3f7d289665256ea5a8a938cbbf4ded64396f1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7270) 
   * 455e21d91670cb0e2ea2a5cb3330d8b1b67a11ef Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7302) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13556: [FLINK-19485] Decouple runtime operator implementation from DataStream operations

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r503302582



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link TransformationTranslator TransformationTranslators} who translate
+ * {@link Transformation Transformations} that have a single operator in their runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class SimpleTransformationTranslator<OUT, T extends Transformation<OUT>>
+		implements TransformationTranslator<OUT, T> {
+
+	@Override
+	public Collection<Integer> translateForBatch(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForBatchInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	@Override
+	public Collection<Integer> translateForStreaming(final T transformation, final Context context) {
+		checkNotNull(transformation);
+		checkNotNull(context);
+
+		final Collection<Integer> transformedIds =
+				translateForStreamingInternal(transformation, context);
+		configure(transformation, context);
+
+		return transformedIds;
+	}
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for BATCH-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForBatchInternal(final T transformation, final Context context);
+
+	/**
+	 * Translates a given {@link Transformation} to its runtime implementation for STREAMING-style execution.
+	 *
+	 * @param transformation The transformation to be translated.
+	 * @param context The translation context.
+	 * @return The ids of the "last" {@link StreamNode StreamNodes} in the transformation graph corresponding
+	 * to this transformation. These will be the nodes that a potential following transformation will need to
+	 * connect to.
+	 */
+	protected abstract Collection<Integer> translateForStreamingInternal(final T transformation, final Context context);
+
+	private void configure(final T transformation, final Context context) {
+		final StreamGraph streamGraph = context.getStreamGraph();
+		final int transformationId = transformation.getId();
+
+		if (transformation.getBufferTimeout() >= 0) {
+			streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
+		} else {
+			streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
+		}
+
+		if (transformation.getUid() != null) {
+			streamGraph.setTransformationUID(transformationId, transformation.getUid());
+		}
+		if (transformation.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
+		}
+
+		if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+			if (transformation instanceof PhysicalTransformation
+					&& transformation.getUserProvidedNodeHash() == null
+					&& transformation.getUid() == null) {
+				throw new IllegalStateException("Auto generated UIDs have been disabled " +
+						"but no UID or hash has been assigned to operator " + transformation.getName());
+			}
+		}
+
+		if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
+			streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
+		}
+
+		final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
+		if (streamNode != null

Review comment:
       I can't tell for sure. That's also somewhat where my comment comes from. Currently, it will be hard to track why/which setting takes precedence. 
   
   With an exception in a way, we postpone that decision to the person that sees that exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org