You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/15 09:59:27 UTC

[GitHub] [flink] dawidwys opened a new pull request #13647: [FLINK-19640] Enable sorting inputs for batch

dawidwys opened a new pull request #13647:
URL: https://github.com/apache/flink/pull/13647


   ## What is the purpose of the change
   
   This PR adds feature flags for enabling/disabling the sorting inputs and
   special types of a state backend and a timer service for BATCH execution
   runtime. Those options are enabled by default for BATCH runtime
   execution mode.
   
   ## Verifying this change
   
   Added tests in `StreamGraphGeneratorBatchExecutionTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -> will be documented along with the RuntimeExecutionMode
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699",
       "triggerID" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d7b0469116070f12c0c2c3c15fb034978250e4b1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693) 
   * fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r505564569



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
##########
@@ -51,7 +54,26 @@
 	protected Collection<Integer> translateForBatchInternal(
 			final AbstractMultipleInputTransformation<OUT> transformation,
 			final Context context) {
-		return translateInternal(transformation, context);
+		boolean isKeyed = transformation instanceof KeyedMultipleInputTransformation;
+		boolean isInputSelectable = isInputSelectable(transformation);

Review comment:
       Why not making the following like: 
   
   ```
   Collection<Integer> ids = translateInternal(transformation, context);
   if (isKeyed && !isInputSelectable) {
   		transformation.setChainingStrategy(ChainingStrategy.HEAD);
   		BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
   	}
   ```
   
   This will make the `if () ...` check being checked once. The same for the other translators.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -336,15 +341,30 @@ public void setUserHash(String userHash) {
 		this.userHash = userHash;
 	}
 
-	@VisibleForTesting
 	public void setSortedInputs(boolean sortedInputs) {
 		this.sortedInputs = sortedInputs;
 	}
 
-	boolean getSortedInputs() {
+	public boolean getSortedInputs() {
 		return sortedInputs;
 	}
 
+	public void setStateBackend(StateBackend stateBackend) {

Review comment:
       From what I understand, the flow is that we set the batch `StateBackend` and the `timerService` in the translator to the `StreamNode` so that the `StreamJobGraphGenerator` can pick it up. Why not setting the state backend and the timer service at the `StreamGraph` level (e.g. in. the `StreamGraphGenerator.configureStreamGraph()`) from where the `StreamJobGraphGenerator` can pick it up.
   
   This seems to be able to reduce the changes in the `StreamNode` and the `StreamJobGraphGenerator`. 
   
   WDYT @dawidwys ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699",
       "triggerID" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7735",
       "triggerID" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf5fdddb8d47ae1379a6a472d67496dae9f031a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7741",
       "triggerID" : "bf5fdddb8d47ae1379a6a472d67496dae9f031a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699) 
   * c25c53d285406037a409fc0c95fba9eaff59b98a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7735) 
   * bf5fdddb8d47ae1379a6a472d67496dae9f031a8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7741) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r505434321



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
##########
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
+import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for generating correct properties for sorting inputs in {@link RuntimeExecutionMode#BATCH} runtime mode.
+ */
+public class StreamGraphGeneratorBatchExecutionTest extends TestLogger {

Review comment:
       I would probably call this an `ITCase` since we test the interplay of a lot of parts. But I'm also ok to leave it as is because we currently don't have unit tests for individual `TransformTranslators`, which could be used to test this behaviour.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r506144173



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -336,12 +336,11 @@ public void setUserHash(String userHash) {
 		this.userHash = userHash;
 	}
 
-	@VisibleForTesting
 	public void setSortedInputs(boolean sortedInputs) {
 		this.sortedInputs = sortedInputs;
 	}
 
-	boolean getSortedInputs() {
+	public boolean getSortedInputs() {

Review comment:
       I made this change because I found it strange that `setSortedInputs` (which is a more concerning method) is `public` and the corresponding getter is in the default scope. 
   
   The setter must be `public` because it is used in `BatchExecutionUtils` in `org.apache.flink.streaming.runtime.translators` package.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699",
       "triggerID" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7735",
       "triggerID" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf5fdddb8d47ae1379a6a472d67496dae9f031a8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bf5fdddb8d47ae1379a6a472d67496dae9f031a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699) 
   * c25c53d285406037a409fc0c95fba9eaff59b98a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7735) 
   * bf5fdddb8d47ae1379a6a472d67496dae9f031a8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699",
       "triggerID" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7735",
       "triggerID" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf5fdddb8d47ae1379a6a472d67496dae9f031a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7741",
       "triggerID" : "bf5fdddb8d47ae1379a6a472d67496dae9f031a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c25c53d285406037a409fc0c95fba9eaff59b98a Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7735) 
   * bf5fdddb8d47ae1379a6a472d67496dae9f031a8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7741) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709064089


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit b72a8e18f9a4419b43a5bfbaa7b65d4eed3c62a7 (Thu Oct 15 10:01:18 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r506141452



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -792,6 +792,12 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
 			});
 		config.configure(configuration, classLoader);
 		checkpointCfg.configure(configuration);
+		configuration.getOptional(ExecutionOptions.SORT_INPUTS).ifPresent(

Review comment:
       I am rather strong on having an option to disable these features. 
   
   First of all I think it is good to have feature flags, secondly it makes it possible to e.g. use the `BATCH` execution mode (scheduling + shuffles) with `InputSelectable`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709064089


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit bf5fdddb8d47ae1379a6a472d67496dae9f031a8 (Fri Oct 16 10:57:27 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d85a53422831da189f70548103872742f65b9a8d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d7b0469116070f12c0c2c3c15fb034978250e4b1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693) 
   * fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699",
       "triggerID" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699",
       "triggerID" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699) 
   * c25c53d285406037a409fc0c95fba9eaff59b98a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys merged pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
dawidwys merged pull request #13647:
URL: https://github.com/apache/flink/pull/13647


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d85a53422831da189f70548103872742f65b9a8d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676) 
   * d7b0469116070f12c0c2c3c15fb034978250e4b1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r506179130



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -792,6 +792,12 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
 			});
 		config.configure(configuration, classLoader);
 		checkpointCfg.configure(configuration);
+		configuration.getOptional(ExecutionOptions.SORT_INPUTS).ifPresent(

Review comment:
       Ok now I saw that the defaults are for the Batch statebackend and the timer service to be activated. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r506181718



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -239,13 +261,28 @@ private void configureStreamGraph(final StreamGraph graph) {
 			graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
 			graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
 			setDefaultBufferTimeout(-1);
+			setBatchStateBackendAndTimerService(graph);
 		} else {
 			graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
 			graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
 			graph.setScheduleMode(ScheduleMode.EAGER);
 		}
 	}
 
+	private void setBatchStateBackendAndTimerService(StreamGraph graph) {
+		boolean useStateBackend = configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
+		boolean sortInputs = configuration.get(ExecutionOptions.SORT_INPUTS);
+		checkState(
+			!useStateBackend || sortInputs,
+			"Batch state backend requires the sorted inputs to be enabled!");
+
+		if (useStateBackend) {
+			LOG.debug("Using BATCH execution state backend.");

Review comment:
       Nit: Can't we move https://github.com/apache/flink/pull/13647/files#diff-54c8fe1971ffb5aa55b3f829f43aa02c7765b62c397f0c943b4049a4fd1e3a62R253 to the `else{}` block in lines 266...? I find it more clear than write and then overwrite.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -239,13 +261,28 @@ private void configureStreamGraph(final StreamGraph graph) {
 			graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
 			graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
 			setDefaultBufferTimeout(-1);
+			setBatchStateBackendAndTimerService(graph);
 		} else {
 			graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
 			graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
 			graph.setScheduleMode(ScheduleMode.EAGER);
 		}
 	}
 
+	private void setBatchStateBackendAndTimerService(StreamGraph graph) {
+		boolean useStateBackend = configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
+		boolean sortInputs = configuration.get(ExecutionOptions.SORT_INPUTS);
+		checkState(
+			!useStateBackend || sortInputs,
+			"Batch state backend requires the sorted inputs to be enabled!");
+
+		if (useStateBackend) {
+			LOG.debug("Using BATCH execution state backend.");

Review comment:
       "Using BATCH execution state backend _and timer service_."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r506132003



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -792,6 +792,12 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
 			});
 		config.configure(configuration, classLoader);
 		checkpointCfg.configure(configuration);
+		configuration.getOptional(ExecutionOptions.SORT_INPUTS).ifPresent(

Review comment:
       Do we want to expose these to the user? Even if it is a power user. I lean more towards these being set depending on the selected execution mode in the `StreamGraphGenerator` (see https://github.com/apache/flink/pull/13656). This will also remove the need of passing the configuration to the generator
   		

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -336,12 +336,11 @@ public void setUserHash(String userHash) {
 		this.userHash = userHash;
 	}
 
-	@VisibleForTesting
 	public void setSortedInputs(boolean sortedInputs) {
 		this.sortedInputs = sortedInputs;
 	}
 
-	boolean getSortedInputs() {
+	public boolean getSortedInputs() {

Review comment:
       I think this change is not needed anymore.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -239,13 +259,28 @@ private void configureStreamGraph(final StreamGraph graph) {
 			graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
 			graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
 			setDefaultBufferTimeout(-1);
+			setBatchStateBackend(graph);
 		} else {
 			graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
 			graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
 			graph.setScheduleMode(ScheduleMode.EAGER);
 		}
 	}
 
+	private void setBatchStateBackend(StreamGraph graph) {

Review comment:
       Given that we also set the `timerService`, maybe give a more descriptive name?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
##########
@@ -51,7 +51,12 @@
 	protected Collection<Integer> translateForBatchInternal(
 			final AbstractMultipleInputTransformation<OUT> transformation,
 			final Context context) {
-		return translateInternal(transformation, context);
+		Collection<Integer> ids = translateInternal(transformation, context);

Review comment:
       In some places you use the name `ids` and in other translators `nodeIds`. What about make them uniform?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -112,7 +117,7 @@
 	private final ExecutionConfig executionConfig;
 
 	private final CheckpointConfig checkpointConfig;
-
+	private final ReadableConfig configuration;

Review comment:
       What about leaving some empty lines above and below?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.translators;
+
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.TransformationTranslator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A utility class for applying sorting inputs.
+ */
+class BatchExecutionUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionUtils.class);
+
+	static void applySortingInputs(
+			int transformationId,
+			TransformationTranslator.Context context) {
+		StreamNode node = context.getStreamGraph().getStreamNode(transformationId);
+		boolean sortInputs = context.getGraphGeneratorConfig().get(ExecutionOptions.SORT_INPUTS);
+		boolean isInputSelectable = isInputSelectable(node);
+
+		adjustChainingStrategy(node);
+
+		checkState(
+			!isInputSelectable || !sortInputs,
+			"Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator."
+		);
+
+		if (sortInputs) {
+			LOG.debug("Enabling sorting inputs for an operator {}.", node);
+			node.setSortedInputs(true);
+			Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights = new HashMap<>();
+			operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.BATCH_OP, 1);
+			node.setManagedMemoryUseCaseWeights(
+				operatorScopeUseCaseWeights,
+				Collections.emptySet()
+			);
+		}
+	}
+
+	@SuppressWarnings("rawtypes")
+	private static boolean isInputSelectable(StreamNode node) {
+		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+		Class<? extends StreamOperator> operatorClass = node.getOperatorFactory()
+			.getStreamOperatorClass(classLoader);
+		return InputSelectable.class.isAssignableFrom(operatorClass);
+	}
+
+	private static void adjustChainingStrategy(StreamNode node) {
+		StreamOperatorFactory<?> operatorFactory = node.getOperatorFactory();
+		ChainingStrategy currentChainingStrategy = operatorFactory.getChainingStrategy();
+		switch (currentChainingStrategy) {
+			case ALWAYS:
+			case HEAD_WITH_SOURCES:
+				LOG.debug(
+					"Setting chaining strategy for an operator {}, because of the BATCH execution mode.",

Review comment:
       What about "Setting chaining strategy to HEAD for operator {}, because of the BATCH execution mode."?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d85a53422831da189f70548103872742f65b9a8d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676) 
   * d7b0469116070f12c0c2c3c15fb034978250e4b1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699",
       "triggerID" : "fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7735",
       "triggerID" : "c25c53d285406037a409fc0c95fba9eaff59b98a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fe46fe2aa40e2b29e0c68aa37e9c99fe9f713e07 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7699) 
   * c25c53d285406037a409fc0c95fba9eaff59b98a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7735) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r506181447



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -792,6 +792,12 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
 			});
 		config.configure(configuration, classLoader);
 		checkpointCfg.configure(configuration);
+		configuration.getOptional(ExecutionOptions.SORT_INPUTS).ifPresent(

Review comment:
       Yes, the options take effect only if the `RuntimeExecutionMode.BATCH` is enabled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r506177083



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -792,6 +792,12 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
 			});
 		config.configure(configuration, classLoader);
 		checkpointCfg.configure(configuration);
+		configuration.getOptional(ExecutionOptions.SORT_INPUTS).ifPresent(

Review comment:
       The way it is now though, the user has to explicitly state that he/she wants the BATCH state backend and timer service, right? Not the other way around. I would expect that even when we have the options available, when the `runtime-mode` is decided to be BATCH, the default behaviour would be to use the batch statebackend and timerservice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7676",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693",
       "triggerID" : "d7b0469116070f12c0c2c3c15fb034978250e4b1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d7b0469116070f12c0c2c3c15fb034978250e4b1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7693) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13647:
URL: https://github.com/apache/flink/pull/13647#issuecomment-709113863


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d85a53422831da189f70548103872742f65b9a8d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d85a53422831da189f70548103872742f65b9a8d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d85a53422831da189f70548103872742f65b9a8d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #13647: [FLINK-19640] Enable sorting inputs for batch

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r505639934



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -336,15 +341,30 @@ public void setUserHash(String userHash) {
 		this.userHash = userHash;
 	}
 
-	@VisibleForTesting
 	public void setSortedInputs(boolean sortedInputs) {
 		this.sortedInputs = sortedInputs;
 	}
 
-	boolean getSortedInputs() {
+	public boolean getSortedInputs() {
 		return sortedInputs;
 	}
 
+	public void setStateBackend(StateBackend stateBackend) {

Review comment:
       After offline sync and some more thinking and I thought it is actually dangerous to set the state backend on a per node level. I will revert back to setting state backend only on the StreamGraph level and throw an exception if there is an InputSelectable operator for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org