You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/15 14:04:20 UTC

[GitHub] [flink] kl0u commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

kl0u commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r505564569



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
##########
@@ -51,7 +54,26 @@
 	protected Collection<Integer> translateForBatchInternal(
 			final AbstractMultipleInputTransformation<OUT> transformation,
 			final Context context) {
-		return translateInternal(transformation, context);
+		boolean isKeyed = transformation instanceof KeyedMultipleInputTransformation;
+		boolean isInputSelectable = isInputSelectable(transformation);

Review comment:
       Why not making the following like: 
   
   ```
   Collection<Integer> ids = translateInternal(transformation, context);
   if (isKeyed && !isInputSelectable) {
   		transformation.setChainingStrategy(ChainingStrategy.HEAD);
   		BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
   	}
   ```
   
   This will make the `if () ...` check being checked once. The same for the other translators.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -336,15 +341,30 @@ public void setUserHash(String userHash) {
 		this.userHash = userHash;
 	}
 
-	@VisibleForTesting
 	public void setSortedInputs(boolean sortedInputs) {
 		this.sortedInputs = sortedInputs;
 	}
 
-	boolean getSortedInputs() {
+	public boolean getSortedInputs() {
 		return sortedInputs;
 	}
 
+	public void setStateBackend(StateBackend stateBackend) {

Review comment:
       From what I understand, the flow is that we set the batch `StateBackend` and the `timerService` in the translator to the `StreamNode` so that the `StreamJobGraphGenerator` can pick it up. Why not setting the state backend and the timer service at the `StreamGraph` level (e.g. in. the `StreamGraphGenerator.configureStreamGraph()`) from where the `StreamJobGraphGenerator` can pick it up.
   
   This seems to be able to reduce the changes in the `StreamNode` and the `StreamJobGraphGenerator`. 
   
   WDYT @dawidwys ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org