You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/23 23:49:38 UTC

[1/6] git commit: [FLINK-1114] Move scala-style checks to scala projects, change paths to style conifg, to allow isolated building of individual projects.

Repository: incubator-flink
Updated Branches:
  refs/heads/master b904b0041 -> 4a91be2e4


[FLINK-1114] Move scala-style checks to scala projects, change paths to style conifg, to allow isolated building of individual projects.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cf80d862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cf80d862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cf80d862

Branch: refs/heads/master
Commit: cf80d8627728a719c3ef2be8f31ef77077b9d200
Parents: b904b00
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 19:24:21 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 19:24:21 2014 +0200

----------------------------------------------------------------------
 .../java/graph/TransitiveClosureNaive.java      |  2 +-
 flink-examples/flink-scala-examples/pom.xml     | 36 +++++++++++---
 flink-scala/pom.xml                             | 24 ++++++++++
 pom.xml                                         | 49 ++++++--------------
 4 files changed, 68 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf80d862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
index c3e3e66..c7f2185 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -80,7 +80,7 @@ public class TransitiveClosureNaive implements ProgramDescription {
 				.coGroup(nextPaths)
 				.where(0).equalTo(0)
 				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-					Set prevSet = new HashSet<Tuple2<Long,Long>>();
+					Set<Tuple2<Long,Long>> prevSet = new HashSet<Tuple2<Long,Long>>();
 					@Override
 					public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
 						for (Tuple2<Long,Long> prev : prevPaths) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf80d862/flink-examples/flink-scala-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml
index 79053d6..a3eba40 100644
--- a/flink-examples/flink-scala-examples/pom.xml
+++ b/flink-examples/flink-scala-examples/pom.xml
@@ -146,6 +146,30 @@ under the License.
 			</plugin>
 
 			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+			
+			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>
 				<executions>
@@ -237,7 +261,6 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
-					   -->
 
 					<execution>
 						<id>WordCount</id>
@@ -257,10 +280,11 @@ under the License.
 		
 							<includes>
 								<include>**/wordcount/WordCount*.class</include>
+								<include>**/wordcount/util/WordCountData.class</include>
 							</includes>
 						</configuration>
 					</execution>
-					<!--
+					
 					<execution>
 						<id>ConnectedComponents</id>
 						<phase>package</phase>
@@ -275,13 +299,11 @@ under the License.
 								</manifestEntries>
 							</archive>
 							<includes>
-								<include>**/graph/ConnectedComponents*.class</include>
+								DOES NOT WORK <include>**/graph/ConnectedComponents*.class</include>
 							</includes>
 						</configuration>
 					</execution>
 					
-					-->
-
 					<execution>
 						<id>TransitiveClosureNaive</id>
 						<phase>package</phase>
@@ -300,10 +322,12 @@ under the License.
 
 							<includes>
 								<include>**/wordcount/TransitiveClosureNaive*.class</include>
+								  DOES NOT WORK <include>**/java/graph/util/ConnectedComponentsData.class</include>
 							</includes>
 						</configuration>
 					</execution>
-
+					-->
+					
 				</executions>
 			</plugin>
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf80d862/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index dfd9419..c0a156d 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -199,6 +199,30 @@ under the License.
 				</executions>
 			</plugin>
 			
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+			
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf80d862/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a339702..f18b4c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,20 +84,20 @@ under the License.
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
 			<version>3.1</version>
+			</dependency>
+	
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>${slf4j.version}</version>
+		</dependency>
+	
+		<dependency>
+			<groupId>ch.qos.logback</groupId>
+			<artifactId>logback-classic</artifactId>
+			<version>1.0.13</version>
+			<scope>runtime</scope>
 		</dependency>
-
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <version>${slf4j.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-            <version>1.0.13</version>
-            <scope>runtime</scope>
-        </dependency>
 
 		<dependency>
 			<groupId>org.slf4j</groupId>
@@ -559,29 +559,6 @@ under the License.
 				</configuration>
 			</plugin>
 			<plugin>
-            	<groupId>org.scalastyle</groupId>
-           		<artifactId>scalastyle-maven-plugin</artifactId>
-            	<version>0.5.0</version>
-            	<configuration>
-              		<verbose>false</verbose>
-              		<failOnViolation>true</failOnViolation>
-              		<includeTestSourceDirectory>true</includeTestSourceDirectory>
-              		<failOnWarning>false</failOnWarning>
-              		<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-              		<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-              		<configLocation>tools/maven/scalastyle-config.xml</configLocation>
-              		<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-              		<outputEncoding>UTF-8</outputEncoding>
-            	</configuration>
-            	<executions>
-              		<execution>
-                		<goals>
-                  			<goal>check</goal>
-                		</goals>
-              		</execution>
-            	</executions>
-          </plugin>
-			<plugin>
 				<!-- just define the Java version to be used for compiling and plugins -->
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-compiler-plugin</artifactId>


[4/6] git commit: Improve robustness of task manager test

Posted by se...@apache.org.
Improve robustness of task manager test


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6bd4d2c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6bd4d2c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6bd4d2c4

Branch: refs/heads/master
Commit: 6bd4d2c41d8830b21003916eb3a4c35957b0593b
Parents: b87f2fa
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 20:02:49 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 20:02:49 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/taskmanager/TaskManagerTest.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bd4d2c4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 822f809..f5d401d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -255,17 +255,17 @@ public class TaskManagerTest {
 			Task t1 = tasks.get(eid1);
 			Task t2 = tasks.get(eid2);
 			
-			// wait until the tasks are done
+			// wait until the tasks are done. rare thread races may cause the tasks to be done before
+			// we get to the check, so we need to guard the check
 			if (t1 != null) {
 				t1.getEnvironment().getExecutingThread().join();
+				assertEquals(ExecutionState.FINISHED, t1.getExecutionState());
 			}
 			if (t2 != null) {
 				t2.getEnvironment().getExecutingThread().join();
+				assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
 			}
 			
-			assertEquals(ExecutionState.FINISHED, t1.getExecutionState());
-			assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
-			
 			tasks = tm.getAllRunningTasks();
 			assertEquals(0, tasks.size());
 			


[3/6] git commit: Improve error message when scheduler cannot find a slot for immediate scheduling.

Posted by se...@apache.org.
Improve error message when scheduler cannot find a slot for immediate scheduling.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b87f2fa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b87f2fa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b87f2fa2

Branch: refs/heads/master
Commit: b87f2fa2a06841973c5aaf424e8984bda24a9276
Parents: 1ddec93
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 19:54:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 19:54:01 2014 +0200

----------------------------------------------------------------------
 .../scheduler/NoResourceAvailableException.java | 11 +++++++++--
 .../runtime/jobmanager/scheduler/Scheduler.java | 20 +++++++++++++++++---
 2 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b87f2fa2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index c1c3f94..11fec72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -23,16 +23,23 @@ import org.apache.flink.runtime.JobException;
 public class NoResourceAvailableException extends JobException {
 
 	private static final long serialVersionUID = -2249953165298717803L;
+	
+	private static final String BASE_MESSAGE = "Not enough free slots available to run the job. "
+			+ "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.";
 
 	public NoResourceAvailableException() {
-		super("Not enough free slots available to run the job. "
-				+ "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
+		super(BASE_MESSAGE);
 	}
 	
 	public NoResourceAvailableException(ScheduledUnit unit) {
 		super("No resource available to schedule unit " + unit
 				+ ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
 	}
+	
+	NoResourceAvailableException(int numInstances, int numSlotsTotal) {
+		super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d", 
+				BASE_MESSAGE, numInstances, numSlotsTotal));
+	}
 
 	public NoResourceAvailableException(String message) {
 		super(message);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b87f2fa2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index a3b8471..9ef30b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -108,6 +108,20 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 		return count;
 	}
 	
+	public int getTotalNumberOfSlots() {
+		int count = 0;
+		
+		synchronized (globalLock) {
+			for (Instance instance : allInstances) {
+				if (instance.isAlive()) {
+					count += instance.getTotalNumberOfSlots();
+				}
+			}
+		}
+		
+		return count;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Scheduling
 	// --------------------------------------------------------------------------------------------
@@ -198,7 +212,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 						if (slotFromGroup == null) {
 							// both null
 							if (constraint == null || constraint.isUnassigned()) {
-								throw new NoResourceAvailableException();
+								throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
 							} else {
 								throw new NoResourceAvailableException("Could not allocate a slot on instance " + 
 											constraint.getLocation() + ", as required by the co-location constraint.");
@@ -271,7 +285,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 					return future;
 				}
 				else {
-					throw new NoResourceAvailableException(task);
+					throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
 				}
 			}
 		}
@@ -439,7 +453,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
 			throw new RuntimeException(locality.name());
 		}
 	}
-
+	
 	// --------------------------------------------------------------------------------------------
 	//  Instance Availability
 	// --------------------------------------------------------------------------------------------


[6/6] git commit: Tasks are marked correctly as failed (not canceled), when the taskmanager kills then during shutdown or reset.

Posted by se...@apache.org.
Tasks are marked correctly as failed (not canceled), when the taskmanager kills then during shutdown or reset.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4a91be2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4a91be2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4a91be2e

Branch: refs/heads/master
Commit: 4a91be2e431a029601a45d4d049e47418c4ab5f7
Parents: ab0b3a3
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 21:45:50 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 22:26:25 2014 +0200

----------------------------------------------------------------------
 .../runtime/execution/RuntimeEnvironment.java   |  17 +--
 .../bufferprovider/GlobalBufferPool.java        |   4 +
 .../apache/flink/runtime/taskmanager/Task.java  |  56 ++++++++-
 .../flink/runtime/taskmanager/TaskManager.java  |  17 +--
 .../runtime/jobgraph/JobManagerTestUtils.java   |  13 +-
 .../jobmanager/TaskManagerFailsITCase.java      | 117 ++++++++++++++++++
 .../TaskManagerFailsWithSlotSharingITCase.java  | 122 +++++++++++++++++++
 .../jobmanager/tasks/BlockingReceiver.java      |  39 ++++++
 8 files changed, 365 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 6b60174..e052ccf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -225,7 +225,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	@Override
 	public void run() {
 		// quick fail in case the task was cancelled while the tread was started
-		if (owner.isCanceled()) {
+		if (owner.isCanceledOrFailed()) {
 			owner.cancelingDone();
 			return;
 		}
@@ -235,7 +235,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			this.invokable.invoke();
 
 			// Make sure, we enter the catch block when the task has been canceled
-			if (this.owner.isCanceled()) {
+			if (this.owner.isCanceledOrFailed()) {
 				throw new CancelTaskException();
 			}
 			
@@ -251,7 +251,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			// Now we wait until all output channels have written out their data and are closed
 			waitForOutputChannelsToBeClosed();
 			
-			if (this.owner.isCanceled()) {
+			if (this.owner.isCanceledOrFailed()) {
 				throw new CancelTaskException();
 			}
 			
@@ -262,7 +262,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		}
 		catch (Throwable t) {
 			
-			if (!this.owner.isCanceled()) {
+			if (!this.owner.isCanceledOrFailed()) {
 
 				// Perform clean up when the task failed and has been not canceled by the user
 				try {
@@ -275,10 +275,13 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			// Release all resources that may currently be allocated by the individual channels
 			releaseAllChannelResources();
 
-			if (this.owner.isCanceled() || t instanceof CancelTaskException) {
+			// if we are already set as cancelled or failed (when failure is triggered externally),
+			// mark that the thread is done.
+			if (this.owner.isCanceledOrFailed() || t instanceof CancelTaskException) {
 				this.owner.cancelingDone();
 			}
 			else {
+				// failure from inside the task thread. notify the task of teh failure
 				this.owner.markFailed(t);
 			}
 		}
@@ -429,7 +432,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	 */
 	private void waitForOutputChannelsToBeClosed() throws InterruptedException {
 		// Make sure, we leave this method with an InterruptedException when the task has been canceled
-		if (this.owner.isCanceled()) {
+		if (this.owner.isCanceledOrFailed()) {
 			return;
 		}
 
@@ -449,7 +452,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		while (!canceled.get()) {
 
 			// Make sure, we leave this method with an InterruptedException when the task has been canceled
-			if (this.owner.isCanceled()) {
+			if (this.owner.isCanceledOrFailed()) {
 				throw new InterruptedException();
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
index c3a36bb..b1d7adf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
@@ -138,4 +138,8 @@ public final class GlobalBufferPool {
 			this.buffers.clear();
 		}
 	}
+	
+	public boolean isDestroyed() {
+		return isDestroyed;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f106614..d393e2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -147,9 +147,10 @@ public final class Task {
 		return environment;
 	}
 	
-	public boolean isCanceled() {
+	public boolean isCanceledOrFailed() {
 		return executionState == ExecutionState.CANCELING ||
-				executionState == ExecutionState.CANCELED;
+				executionState == ExecutionState.CANCELED ||
+				executionState == ExecutionState.FAILED;
 	}
 	
 	public String getTaskName() {
@@ -242,11 +243,60 @@ public final class Task {
 		}
 	}
 	
+	/**
+	 * Sets the tasks to be cancelled and reports a failure back to the master.
+	 * This method is important if a failure needs to be reported to the master, because
+	 * a simple canceled m
+	 * 
+	 * @param cause The exception to report in the error message
+	 */
+	public void failExternally(Throwable cause) {
+		while (true) {
+			ExecutionState current = this.executionState;
+			
+			// if the task is already canceled (or canceling) or finished or failed,
+			// then we need not do anything
+			if (current == ExecutionState.FINISHED || current == ExecutionState.CANCELED ||
+					current == ExecutionState.CANCELING || current == ExecutionState.FAILED)
+			{
+				return;
+			}
+			
+			if (current == ExecutionState.DEPLOYING) {
+				// directly set to canceled
+				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+					
+					notifyObservers(ExecutionState.FAILED, null);
+					taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, cause);
+					return;
+				}
+			}
+			else if (current == ExecutionState.RUNNING) {
+				// go to canceling and perform the actual task canceling
+				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+					try {
+						this.environment.cancelExecution();
+					} catch (Throwable e) {
+						LOG.error("Error while cancelling the task.", e);
+					}
+					
+					notifyObservers(ExecutionState.FAILED, null);
+					taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, cause);
+					
+					return;
+				}
+			}
+			else {
+				throw new RuntimeException("unexpected state for cancelling: " + current);
+			}
+		}
+	}
+	
 	public void cancelingDone() {
 		while (true) {
 			ExecutionState current = this.executionState;
 			
-			if (current == ExecutionState.CANCELED) {
+			if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) {
 				return;
 			}
 			if (!(current == ExecutionState.RUNNING || current == ExecutionState.CANCELING)) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 1819b79..305d39f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -398,7 +398,7 @@ public class TaskManager implements TaskOperationProtocol {
 
 		LOG.info("Shutting down TaskManager");
 		
-		cancelAndClearEverything();
+		cancelAndClearEverything(new Exception("Task Manager is shutting down"));
 		
 		// first, stop the heartbeat thread and wait for it to terminate
 		this.heartbeatThread.interrupt();
@@ -699,11 +699,14 @@ public class TaskManager implements TaskOperationProtocol {
 	/**
 	 * Removes all tasks from this TaskManager.
 	 */
-	public void cancelAndClearEverything() {
-		LOG.info("Cancelling all computations and discarding all cached data.");
-		for (Task t : runningTasks.values()) {
-			t.cancelExecution();
-			runningTasks.remove(t.getExecutionId());
+	public void cancelAndClearEverything(Throwable cause) {
+		if (runningTasks.size() > 0) {
+			LOG.info("Cancelling all computations and discarding all cached data.");
+			
+			for (Task t : runningTasks.values()) {
+				t.failExternally(cause);
+				runningTasks.remove(t.getExecutionId());
+			}
 		}
 	}
 	
@@ -841,7 +844,7 @@ public class TaskManager implements TaskOperationProtocol {
 					
 					// mark us as disconnected and abort all computation
 					this.registeredId = null;
-					cancelAndClearEverything();
+					cancelAndClearEverything(new Exception("TaskManager lost heartbeat connection to JobManager"));
 					
 					// wait for a while, then attempt to register again
 					try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index eebe9b0..8ed7a6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -32,11 +32,16 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 public class JobManagerTestUtils {
 
 	public static final JobManager startJobManager(int numSlots) throws Exception {
+		return startJobManager(1, numSlots);
+	}
+	
+	public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager) throws Exception {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 		cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
-		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+		cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
 		
 		GlobalConfiguration.includeConfiguration(cfg);
 		
@@ -46,11 +51,13 @@ public class JobManagerTestUtils {
 		// max time is 5 seconds
 		long deadline = System.currentTimeMillis() + 5000;
 		
-		while (jm.getNumberOfSlotsAvailableToScheduler() < numSlots && System.currentTimeMillis() < deadline) {
+		while (jm.getNumberOfSlotsAvailableToScheduler() < numTaskManagers * numSlotsPerTaskManager &&
+				System.currentTimeMillis() < deadline)
+		{
 			Thread.sleep(10);
 		}
 		
-		assertEquals(numSlots, jm.getNumberOfSlotsAvailableToScheduler());
+		assertEquals(numTaskManagers * numSlotsPerTaskManager, jm.getNumberOfSlotsAvailableToScheduler());
 		
 		return jm;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
new file mode 100644
index 0000000..def20a0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.*;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.tasks.BlockingReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.junit.Test;
+
+public class TaskManagerFailsITCase {
+
+	@Test
+	public void testExecutionWithFailingTaskManager() {
+		final int NUM_TASKS = 31;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(BlockingReceiver.class);
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			final JobManager jm = startJobManager(2, NUM_TASKS);
+			
+			final TaskManager tm1 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+			final TaskManager tm2 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[1];
+			
+			final GlobalBufferPool bp1 = tm1.getChannelManager().getGlobalBufferPool();
+			final GlobalBufferPool bp2 = tm2.getChannelManager().getGlobalBufferPool();
+			
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				// wait until everyone has settled in
+				long deadline = System.currentTimeMillis() + 2000;
+				while (System.currentTimeMillis() < deadline) {
+					
+					boolean allrunning = true;
+					for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+						if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+							allrunning = false;
+							break;
+						}
+					}
+					
+					if (allrunning) {
+						break;
+					}
+					Thread.sleep(200);
+				}
+				
+				// kill one task manager
+				TaskManager tm = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+				tm.shutdown();
+				
+				eg.waitForJobEnd();
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertTrue(bp1.isDestroyed() || bp1.numBuffers() == bp1.numAvailableBuffers());
+				assertTrue(bp2.isDestroyed() || bp2.numBuffers() == bp2.numAvailableBuffers());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
new file mode 100644
index 0000000..d6fec59
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.*;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.BlockingReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.junit.Test;
+
+public class TaskManagerFailsWithSlotSharingITCase {
+
+	@Test
+	public void testExecutionWithFailingTaskManager() {
+		final int NUM_TASKS = 20;
+		
+		try {
+			final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+			final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+			sender.setInvokableClass(Sender.class);
+			receiver.setInvokableClass(BlockingReceiver.class);
+			sender.setParallelism(NUM_TASKS);
+			receiver.setParallelism(NUM_TASKS);
+			receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+			
+			SlotSharingGroup sharingGroup = new SlotSharingGroup();
+			sender.setSlotSharingGroup(sharingGroup);
+			receiver.setSlotSharingGroup(sharingGroup);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+			
+			final JobManager jm = startJobManager(2, NUM_TASKS / 2);
+			
+			final TaskManager tm1 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+			final TaskManager tm2 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[1];
+			
+			final GlobalBufferPool bp1 = tm1.getChannelManager().getGlobalBufferPool();
+			final GlobalBufferPool bp2 = tm2.getChannelManager().getGlobalBufferPool();
+			
+			try {
+				// we need to register the job at the library cache manager (with no libraries)
+				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+				
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+					System.out.println(result.getDescription());
+				}
+				assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+				
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+				
+				// wait until everyone has settled in
+				long deadline = System.currentTimeMillis() + 2000;
+				while (System.currentTimeMillis() < deadline) {
+					
+					boolean allrunning = true;
+					for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+						if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+							allrunning = false;
+							break;
+						}
+					}
+					
+					if (allrunning) {
+						break;
+					}
+					Thread.sleep(200);
+				}
+				
+				// kill one task manager
+				TaskManager tm = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+				tm.shutdown();
+				
+				eg.waitForJobEnd();
+				
+				// make sure that in any case, the network buffers are all returned
+				waitForTaskThreadsToBeTerminated();
+				assertTrue(bp1.isDestroyed() || bp1.numBuffers() == bp1.numAvailableBuffers());
+				assertTrue(bp2.isDestroyed() || bp2.numBuffers() == bp2.numAvailableBuffers());
+			}
+			finally {
+				jm.shutdown();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java
new file mode 100644
index 0000000..f7d4ee5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class BlockingReceiver extends AbstractInvokable {
+	
+	@Override
+	public void registerInputOutput() {
+		new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		Object o = new Object();
+		synchronized (o) {
+			o.wait();
+		}
+	}
+}


[2/6] git commit: [FLINK-1115] Local file streams retry file creation on FileNotFoundException to increase resilience against spurious failures in tests

Posted by se...@apache.org.
[FLINK-1115] Local file streams retry file creation on FileNotFoundException to increase resilience against spurious failures in tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1ddec930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1ddec930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1ddec930

Branch: refs/heads/master
Commit: 1ddec930a29c1d870d5b5bbde0098d10ff9b45ce
Parents: cf80d86
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 19:36:27 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 19:36:27 2014 +0200

----------------------------------------------------------------------
 .../core/fs/local/LocalDataOutputStream.java    | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1ddec930/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
index 14ffefb..afef7c1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.fs.local;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
@@ -28,14 +28,15 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 /**
  * The <code>LocalDataOutputStream</code> class is a wrapper class for a data
  * output stream to the local file system.
- * 
  */
 public class LocalDataOutputStream extends FSDataOutputStream {
 
+	private static final int MAX_OPEN_TRIES = 3;
+	
 	/**
 	 * The file output stream used to write data.
 	 */
-	private FileOutputStream fos = null;
+	private FileOutputStream fos;
 
 	/**
 	 * Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object.
@@ -46,26 +47,34 @@ public class LocalDataOutputStream extends FSDataOutputStream {
 	 *         thrown if the data output stream cannot be created
 	 */
 	public LocalDataOutputStream(final File file) throws IOException {
-
-		this.fos = new FileOutputStream(file);
+		// we allow multiple tries to create the file, to increase resilience against spurious I/O failures
+		
+		FileNotFoundException lastException = null;
+		
+		for (int attempt = 0; attempt < MAX_OPEN_TRIES; attempt++) {
+			try {
+				this.fos = new FileOutputStream(file);
+				return;
+			}
+			catch (FileNotFoundException e) {
+				lastException = e;
+			}
+		}
+		throw lastException;
 	}
 
-
 	@Override
 	public void write(final int b) throws IOException {
 		fos.write(b);
 	}
 
-
 	@Override
 	public void write(final byte[] b, final int off, final int len) throws IOException {
 		fos.write(b, off, len);
 	}
 
-
 	@Override
 	public void close() throws IOException {
-
 		fos.close();
 	}
 }


[5/6] git commit: Remove error message in execution graph for concurrent state changes that are fully acceptable.

Posted by se...@apache.org.
Remove error message in execution graph for concurrent state changes that are fully acceptable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ab0b3a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ab0b3a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ab0b3a31

Branch: refs/heads/master
Commit: ab0b3a314c6b306300748826289944c028dbedc1
Parents: 6bd4d2c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 20:49:39 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 20:49:39 2014 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/executiongraph/ExecutionGraph.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ab0b3a31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9576a64..07e2455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -372,7 +372,10 @@ public class ExecutionGraph {
 			int nextPos = nextVertexToFinish;
 			if (nextPos >= verticesInCreationOrder.size()) {
 				// already done, and we still get a report?
-				LOG.error("Job entered finished state a repeated time.");
+				// this can happen when:
+				// - two job vertices finish almost simultaneously
+				// - The first one advances the position for the second as well (second is in final state)
+				// - the second (after it could grab the lock) tries to advance the position again
 				return;
 			}