You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/25 17:19:10 UTC

[1/3] git commit: Fix race condition leading to erroneous "NoResourceAvailableException".

Repository: incubator-flink
Updated Branches:
  refs/heads/master c3c7e2d15 -> 4a4489936


Fix race condition leading to erroneous "NoResourceAvailableException".


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4a448993
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4a448993
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4a448993

Branch: refs/heads/master
Commit: 4a4489936530002d32f1d7e187d5583cc3e405cc
Parents: 73b5b3d
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 24 21:34:28 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 25 16:45:54 2014 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/scheduler/Scheduler.java | 27 +++++++++++++++++---
 1 file changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a448993/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 9ef30b8..c25c931 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -24,7 +24,9 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +60,8 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 	/** All tasks pending to be scheduled */
 	private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
 	
+	private final BlockingQueue<Instance> newlyAvailableInstances;
+	
 	
 	private int unconstrainedAssignments;
 	
@@ -72,6 +76,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 	
 	public Scheduler(ExecutorService executorService) {
 		this.executor = executorService;
+		this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
 	}
 	
 	
@@ -305,7 +310,13 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 		// in the set-with-available-instances
 		while (true) {
 			if (this.instancesWithAvailableResources.isEmpty()) {
-				return null;
+				// check if the asynchronous calls did not yet return the queues
+				Instance queuedInstance = this.newlyAvailableInstances.poll();
+				if (queuedInstance == null) {
+					return null;
+				} else {
+					this.instancesWithAvailableResources.add(queuedInstance);
+				}
 			}
 			
 			Iterator<Instance> locations = requestedLocations == null ? null : requestedLocations.iterator();
@@ -383,23 +394,31 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 		// 
 		// that leads with a high probability to deadlocks, when scheduling fast
 		
+		this.newlyAvailableInstances.add(instance);
+		
 		if (this.executor != null) {
 			this.executor.execute(new Runnable() {
 				@Override
 				public void run() {
-					handleNewSlot(instance);
+					handleNewSlot();
 				}
 			});
 		}
 		else {
 			// for tests, we use the synchronous variant
-			handleNewSlot(instance);
+			handleNewSlot();
 		}
 	}
 	
-	private void handleNewSlot(Instance instance) {
+	private void handleNewSlot() {
 		
 		synchronized (globalLock) {
+			Instance instance = this.newlyAvailableInstances.poll();
+			if (instance == null || !instance.hasResourcesAvailable()) {
+				// someone else took it
+				return;
+			}
+			
 			QueuedTask queued = taskQueue.peek();
 			
 			// the slot was properly released, we can allocate a new one from that instance


[2/3] git commit: Fix error with invalid config values for degree of parallelism.

Posted by se...@apache.org.
Fix error with invalid config values for degree of parallelism.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/73b5b3dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/73b5b3dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/73b5b3dd

Branch: refs/heads/master
Commit: 73b5b3dd81e2a146592d9623f44ceff3d8c035fa
Parents: 9cf2467
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 24 18:38:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 25 16:45:54 2014 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/compiler/PactCompiler.java   | 5 +++++
 .../main/java/org/apache/flink/compiler/dag/OptimizerNode.java  | 2 +-
 2 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73b5b3dd/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index c355560..174f8f3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -415,6 +415,11 @@ public class PactCompiler {
 		// determine the default parallelization degree
 		this.defaultDegreeOfParallelism = config.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
 			ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
+		if (defaultDegreeOfParallelism < 1) {
+			LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
+					+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");
+			this.defaultDegreeOfParallelism = 1;
+		}
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73b5b3dd/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
index deb24bb..89cc354 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
@@ -411,7 +411,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 */
 	public void setDegreeOfParallelism(int degreeOfParallelism) {
 		if (degreeOfParallelism < 1) {
-			throw new IllegalArgumentException();
+			throw new IllegalArgumentException("Degree of parallelism of " + degreeOfParallelism + " is invalid.");
 		}
 		this.degreeOfParallelism = degreeOfParallelism;
 	}


[3/3] git commit: Fix Bug with forced repartitioning in DOP changes.

Posted by se...@apache.org.
Fix Bug with forced repartitioning in DOP changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9cf24670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9cf24670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9cf24670

Branch: refs/heads/master
Commit: 9cf24670d8f88fb34b782d67b711d219909e90fd
Parents: c3c7e2d
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 24 18:37:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 25 16:45:54 2014 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/compiler/plan/Channel.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cf24670/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
index fec9c80..5fb03f5 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
@@ -442,11 +442,12 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		case FORWARD:
 			throw new CompilerException("Cannot use FORWARD strategy between operations " +
 					"with different number of parallel instances.");
-		case NONE: // excluded by sanity check. lust here for verification check completion
+		case NONE: // excluded by sanity check. left here for verification check completion
 		case BROADCAST:
 		case PARTITION_HASH:
 		case PARTITION_RANGE:
 		case PARTITION_RANDOM:
+		case PARTITION_FORCED_REBALANCE:
 			return;
 		}
 		throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);