You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2017/01/20 15:24:22 UTC

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/3182

    [FLINK-5473] Limit MaxParallelism to 1 for non-parallel operators and improve choice of max parallelism without explicit configuration

    This PR limits the maximum parallelism for non-parallel operator to 1.
    
    Furthermore, this improves the default behaviour if the user did not explicitly specify a maximum parallelism. In particular, maximum parallelism can now be derived from savepoints, allowing users that migrate from Flink 1.1 to Flink 1.2 to keep their job unchanged.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink recover-max-para

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3182
    
----
commit 20d0a3fc88c85a71b692c5408fc7b2fd33da8ff2
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-01-16T13:31:22Z

    [FLINK-5473] Limit max parallelism to 1 for non-parallel operators

commit f6081f319b7f2ef8615557743d906bc4585445f7
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-01-16T17:41:37Z

    [FLINK-5473] Better default behaviours for unspecified maximum parallelism

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3182#discussion_r97293150
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---
    @@ -230,9 +237,46 @@ public int getParallelism() {
     		return parallelism;
     	}
     
    +	/**
    +	 * Returns the effective max parallelism. This value is determined in the following order of priority:
    +	 * <p>
    +	 * (maxParallelismConfigured) overrides (maxParallelismOverride) override (max(128, roundUp(parallelism)) / default)
    --- End diff --
    
    `maxParallelismOverride` => `maxParallelismDerived`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3182#discussion_r97280170
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ---
    @@ -45,29 +45,37 @@
     
     	private final Logger logger;
     	private final Map<JobVertexID, ExecutionJobVertex> tasks;
    -	private final CompletedCheckpoint latest;
    +	private final Map<JobVertexID, TaskState> taskStates;
     	private final boolean allowNonRestoredState;
     
     	public StateAssignmentOperation(
     			Logger logger,
     			Map<JobVertexID, ExecutionJobVertex> tasks,
    -			CompletedCheckpoint latest,
    +			Map<JobVertexID, TaskState> taskStates,
     			boolean allowNonRestoredState) {
     
     		this.logger = logger;
     		this.tasks = tasks;
    -		this.latest = latest;
    +		this.taskStates = taskStates;
     		this.allowNonRestoredState = allowNonRestoredState;
    --- End diff --
    
    `Precondition` checks could be helpful here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3182#discussion_r97282806
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---
    @@ -212,6 +206,19 @@ public ExecutionJobVertex(
     		finishedSubtasks = new boolean[parallelism];
     	}
     
    +	public void setMaxParallelismDerived(int maxParallelism) {
    +
    +		Preconditions.checkState(VALUE_NOT_SET == maxParallelismConfigured,
    +				"Attempt to override a configured max parallelism. Configured: " + maxParallelismConfigured
    +						+ ", argument: " + maxParallelism);
    +
    +		Preconditions.checkArgument(maxParallelism > 0
    +						&& maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
    +				"Overriding max parallelism is not in valid bounds: " + maxParallelism);
    --- End diff --
    
    Maybe we could add the valid bounds here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3182#discussion_r97281937
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ---
    @@ -45,29 +45,37 @@
     
     	private final Logger logger;
     	private final Map<JobVertexID, ExecutionJobVertex> tasks;
    -	private final CompletedCheckpoint latest;
    +	private final Map<JobVertexID, TaskState> taskStates;
     	private final boolean allowNonRestoredState;
     
     	public StateAssignmentOperation(
     			Logger logger,
     			Map<JobVertexID, ExecutionJobVertex> tasks,
    -			CompletedCheckpoint latest,
    +			Map<JobVertexID, TaskState> taskStates,
     			boolean allowNonRestoredState) {
     
     		this.logger = logger;
     		this.tasks = tasks;
    -		this.latest = latest;
    +		this.taskStates = taskStates;
     		this.allowNonRestoredState = allowNonRestoredState;
     	}
     
     	public boolean assignStates() throws Exception {
    --- End diff --
    
    This method seems a bit lengthy. Maybe we could split it up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3182#discussion_r97337263
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---
    @@ -57,47 +57,51 @@
     
     	/** Use the same log for all ExecutionGraph classes */
     	private static final Logger LOG = ExecutionGraph.LOG;
    -	
    -	private final SerializableObject stateMonitor = new SerializableObject();
    +
    +	public static final int VALUE_NOT_SET = -1;
    +
    +	private final Object stateMonitor = new Object();
     	
     	private final ExecutionGraph graph;
     	
     	private final JobVertex jobVertex;
     	
     	private final ExecutionVertex[] taskVertices;
     
    -	private IntermediateResult[] producedDataSets;
    +	private final IntermediateResult[] producedDataSets;
     	
     	private final List<IntermediateResult> inputs;
     	
     	private final int parallelism;
     
    -	private final int maxParallelism;
    -	
     	private final boolean[] finishedSubtasks;
    -			
    -	private volatile int numSubtasksInFinalState;
    -	
    +
     	private final SlotSharingGroup slotSharingGroup;
    -	
    +
     	private final CoLocationGroup coLocationGroup;
    -	
    +
     	private final InputSplit[] inputSplits;
     
    +	private final int maxParallelismConfigured;
    +
    +	private int maxParallelismDerived;
    +
    +	private volatile int numSubtasksInFinalState;
    +
     	/**
     	 * Serialized task information which is for all sub tasks the same. Thus, it avoids to
     	 * serialize the same information multiple times in order to create the
     	 * TaskDeploymentDescriptors.
     	 */
    -	private final SerializedValue<TaskInformation> serializedTaskInformation;
    +	private SerializedValue<TaskInformation> serializedTaskInformation;
     
     	private InputSplitAssigner splitAssigner;
     	
     	public ExecutionJobVertex(
     		ExecutionGraph graph,
     		JobVertex jobVertex,
     		int defaultParallelism,
    -		Time timeout) throws JobException, IOException {
    +		Time timeout) throws JobException {
    --- End diff --
    
    You are right, but I kept the indentation to avoid formatting changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-parallel o...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3182
  
    I've merged the PR to the `release-1.2` branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3182#discussion_r97286408
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---
    @@ -94,6 +95,9 @@ public OperatorChain(StreamTask<OUT, OP> containingTask) {
     		try {
     			for (int i = 0; i < outEdgesInOrder.size(); i++) {
     				StreamEdge outEdge = outEdgesInOrder.get(i);
    +
    +
    +
    --- End diff --
    
    One line break would probably be enough here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3182#discussion_r97282598
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---
    @@ -57,47 +57,51 @@
     
     	/** Use the same log for all ExecutionGraph classes */
     	private static final Logger LOG = ExecutionGraph.LOG;
    -	
    -	private final SerializableObject stateMonitor = new SerializableObject();
    +
    +	public static final int VALUE_NOT_SET = -1;
    +
    +	private final Object stateMonitor = new Object();
     	
     	private final ExecutionGraph graph;
     	
     	private final JobVertex jobVertex;
     	
     	private final ExecutionVertex[] taskVertices;
     
    -	private IntermediateResult[] producedDataSets;
    +	private final IntermediateResult[] producedDataSets;
     	
     	private final List<IntermediateResult> inputs;
     	
     	private final int parallelism;
     
    -	private final int maxParallelism;
    -	
     	private final boolean[] finishedSubtasks;
    -			
    -	private volatile int numSubtasksInFinalState;
    -	
    +
     	private final SlotSharingGroup slotSharingGroup;
    -	
    +
     	private final CoLocationGroup coLocationGroup;
    -	
    +
     	private final InputSplit[] inputSplits;
     
    +	private final int maxParallelismConfigured;
    +
    +	private int maxParallelismDerived;
    +
    +	private volatile int numSubtasksInFinalState;
    +
     	/**
     	 * Serialized task information which is for all sub tasks the same. Thus, it avoids to
     	 * serialize the same information multiple times in order to create the
     	 * TaskDeploymentDescriptors.
     	 */
    -	private final SerializedValue<TaskInformation> serializedTaskInformation;
    +	private SerializedValue<TaskInformation> serializedTaskInformation;
     
     	private InputSplitAssigner splitAssigner;
     	
     	public ExecutionJobVertex(
     		ExecutionGraph graph,
     		JobVertex jobVertex,
     		int defaultParallelism,
    -		Time timeout) throws JobException, IOException {
    +		Time timeout) throws JobException {
    --- End diff --
    
    Method declaration parameters which are broken into multiple lines are usually indented twice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-parallel o...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3182
  
    Thanks for the review, @tillrohrmann! I followed all of your suggestions, except for the indentation formatting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3182#discussion_r97283204
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---
    @@ -599,7 +602,24 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
     		boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment();
     
     		for (IntermediateResultPartition partition : resultPartitions.values()) {
    -			producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling));
    +
    +			List<List<ExecutionEdge>> consumers = partition.getConsumers();
    +
    +			if(consumers.isEmpty()) {
    --- End diff --
    
    whitespace missing between `if` and `(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-parallel o...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3182
  
    Rebased.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-parallel o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3182
  
    Changes look good. Travis passed. Merging this PR. Thanks for your work @StefanRRichter :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-parallel o...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3182
  
    cc @uce 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3182: [FLINK-5473] Limit MaxParallelism to 1 for non-par...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3182


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---