You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/23 23:49:38 UTC
[1/6] git commit: [FLINK-1114] Move scala-style checks to scala
projects, change paths to style conifg,
to allow isolated building of individual projects.
Repository: incubator-flink
Updated Branches:
refs/heads/master b904b0041 -> 4a91be2e4
[FLINK-1114] Move scala-style checks to scala projects, change paths to style conifg, to allow isolated building of individual projects.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cf80d862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cf80d862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cf80d862
Branch: refs/heads/master
Commit: cf80d8627728a719c3ef2be8f31ef77077b9d200
Parents: b904b00
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 19:24:21 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 19:24:21 2014 +0200
----------------------------------------------------------------------
.../java/graph/TransitiveClosureNaive.java | 2 +-
flink-examples/flink-scala-examples/pom.xml | 36 +++++++++++---
flink-scala/pom.xml | 24 ++++++++++
pom.xml | 49 ++++++--------------
4 files changed, 68 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf80d862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
index c3e3e66..c7f2185 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -80,7 +80,7 @@ public class TransitiveClosureNaive implements ProgramDescription {
.coGroup(nextPaths)
.where(0).equalTo(0)
.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
- Set prevSet = new HashSet<Tuple2<Long,Long>>();
+ Set<Tuple2<Long,Long>> prevSet = new HashSet<Tuple2<Long,Long>>();
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
for (Tuple2<Long,Long> prev : prevPaths) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf80d862/flink-examples/flink-scala-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml
index 79053d6..a3eba40 100644
--- a/flink-examples/flink-scala-examples/pom.xml
+++ b/flink-examples/flink-scala-examples/pom.xml
@@ -146,6 +146,30 @@ under the License.
</plugin>
<plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+ <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+ <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ </plugin>
+
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
@@ -237,7 +261,6 @@ under the License.
</includes>
</configuration>
</execution>
- -->
<execution>
<id>WordCount</id>
@@ -257,10 +280,11 @@ under the License.
<includes>
<include>**/wordcount/WordCount*.class</include>
+ <include>**/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
</execution>
- <!--
+
<execution>
<id>ConnectedComponents</id>
<phase>package</phase>
@@ -275,13 +299,11 @@ under the License.
</manifestEntries>
</archive>
<includes>
- <include>**/graph/ConnectedComponents*.class</include>
+ DOES NOT WORK <include>**/graph/ConnectedComponents*.class</include>
</includes>
</configuration>
</execution>
- -->
-
<execution>
<id>TransitiveClosureNaive</id>
<phase>package</phase>
@@ -300,10 +322,12 @@ under the License.
<includes>
<include>**/wordcount/TransitiveClosureNaive*.class</include>
+ DOES NOT WORK <include>**/java/graph/util/ConnectedComponentsData.class</include>
</includes>
</configuration>
</execution>
-
+ -->
+
</executions>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf80d862/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index dfd9419..c0a156d 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -199,6 +199,30 @@ under the License.
</executions>
</plugin>
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+ <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+ <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ </plugin>
+
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf80d862/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a339702..f18b4c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,20 +84,20 @@ under the License.
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.13</version>
+ <scope>runtime</scope>
</dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
-
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.0.13</version>
- <scope>runtime</scope>
- </dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -559,29 +559,6 @@ under the License.
</configuration>
</plugin>
<plugin>
- <groupId>org.scalastyle</groupId>
- <artifactId>scalastyle-maven-plugin</artifactId>
- <version>0.5.0</version>
- <configuration>
- <verbose>false</verbose>
- <failOnViolation>true</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- <failOnWarning>false</failOnWarning>
- <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
- <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
- <configLocation>tools/maven/scalastyle-config.xml</configLocation>
- <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
- <outputEncoding>UTF-8</outputEncoding>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
<!-- just define the Java version to be used for compiling and plugins -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
[4/6] git commit: Improve robustness of task manager test
Posted by se...@apache.org.
Improve robustness of task manager test
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6bd4d2c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6bd4d2c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6bd4d2c4
Branch: refs/heads/master
Commit: 6bd4d2c41d8830b21003916eb3a4c35957b0593b
Parents: b87f2fa
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 20:02:49 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 20:02:49 2014 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/taskmanager/TaskManagerTest.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bd4d2c4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 822f809..f5d401d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -255,17 +255,17 @@ public class TaskManagerTest {
Task t1 = tasks.get(eid1);
Task t2 = tasks.get(eid2);
- // wait until the tasks are done
+ // wait until the tasks are done. rare thread races may cause the tasks to be done before
+ // we get to the check, so we need to guard the check
if (t1 != null) {
t1.getEnvironment().getExecutingThread().join();
+ assertEquals(ExecutionState.FINISHED, t1.getExecutionState());
}
if (t2 != null) {
t2.getEnvironment().getExecutingThread().join();
+ assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
}
- assertEquals(ExecutionState.FINISHED, t1.getExecutionState());
- assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
-
tasks = tm.getAllRunningTasks();
assertEquals(0, tasks.size());
[3/6] git commit: Improve error message when scheduler cannot find a
slot for immediate scheduling.
Posted by se...@apache.org.
Improve error message when scheduler cannot find a slot for immediate scheduling.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b87f2fa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b87f2fa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b87f2fa2
Branch: refs/heads/master
Commit: b87f2fa2a06841973c5aaf424e8984bda24a9276
Parents: 1ddec93
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 19:54:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 19:54:01 2014 +0200
----------------------------------------------------------------------
.../scheduler/NoResourceAvailableException.java | 11 +++++++++--
.../runtime/jobmanager/scheduler/Scheduler.java | 20 +++++++++++++++++---
2 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b87f2fa2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index c1c3f94..11fec72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -23,16 +23,23 @@ import org.apache.flink.runtime.JobException;
public class NoResourceAvailableException extends JobException {
private static final long serialVersionUID = -2249953165298717803L;
+
+ private static final String BASE_MESSAGE = "Not enough free slots available to run the job. "
+ + "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.";
public NoResourceAvailableException() {
- super("Not enough free slots available to run the job. "
- + "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
+ super(BASE_MESSAGE);
}
public NoResourceAvailableException(ScheduledUnit unit) {
super("No resource available to schedule unit " + unit
+ ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
}
+
+ NoResourceAvailableException(int numInstances, int numSlotsTotal) {
+ super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d",
+ BASE_MESSAGE, numInstances, numSlotsTotal));
+ }
public NoResourceAvailableException(String message) {
super(message);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b87f2fa2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index a3b8471..9ef30b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -108,6 +108,20 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
return count;
}
+ public int getTotalNumberOfSlots() {
+ int count = 0;
+
+ synchronized (globalLock) {
+ for (Instance instance : allInstances) {
+ if (instance.isAlive()) {
+ count += instance.getTotalNumberOfSlots();
+ }
+ }
+ }
+
+ return count;
+ }
+
// --------------------------------------------------------------------------------------------
// Scheduling
// --------------------------------------------------------------------------------------------
@@ -198,7 +212,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
if (slotFromGroup == null) {
// both null
if (constraint == null || constraint.isUnassigned()) {
- throw new NoResourceAvailableException();
+ throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
} else {
throw new NoResourceAvailableException("Could not allocate a slot on instance " +
constraint.getLocation() + ", as required by the co-location constraint.");
@@ -271,7 +285,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
return future;
}
else {
- throw new NoResourceAvailableException(task);
+ throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
}
}
}
@@ -439,7 +453,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
throw new RuntimeException(locality.name());
}
}
-
+
// --------------------------------------------------------------------------------------------
// Instance Availability
// --------------------------------------------------------------------------------------------
[6/6] git commit: Tasks are marked correctly as failed (not canceled),
when the taskmanager kills then during shutdown or reset.
Posted by se...@apache.org.
Tasks are marked correctly as failed (not canceled), when the taskmanager kills then during shutdown or reset.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4a91be2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4a91be2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4a91be2e
Branch: refs/heads/master
Commit: 4a91be2e431a029601a45d4d049e47418c4ab5f7
Parents: ab0b3a3
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 21:45:50 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 22:26:25 2014 +0200
----------------------------------------------------------------------
.../runtime/execution/RuntimeEnvironment.java | 17 +--
.../bufferprovider/GlobalBufferPool.java | 4 +
.../apache/flink/runtime/taskmanager/Task.java | 56 ++++++++-
.../flink/runtime/taskmanager/TaskManager.java | 17 +--
.../runtime/jobgraph/JobManagerTestUtils.java | 13 +-
.../jobmanager/TaskManagerFailsITCase.java | 117 ++++++++++++++++++
.../TaskManagerFailsWithSlotSharingITCase.java | 122 +++++++++++++++++++
.../jobmanager/tasks/BlockingReceiver.java | 39 ++++++
8 files changed, 365 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 6b60174..e052ccf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -225,7 +225,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
@Override
public void run() {
// quick fail in case the task was cancelled while the tread was started
- if (owner.isCanceled()) {
+ if (owner.isCanceledOrFailed()) {
owner.cancelingDone();
return;
}
@@ -235,7 +235,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
this.invokable.invoke();
// Make sure, we enter the catch block when the task has been canceled
- if (this.owner.isCanceled()) {
+ if (this.owner.isCanceledOrFailed()) {
throw new CancelTaskException();
}
@@ -251,7 +251,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
// Now we wait until all output channels have written out their data and are closed
waitForOutputChannelsToBeClosed();
- if (this.owner.isCanceled()) {
+ if (this.owner.isCanceledOrFailed()) {
throw new CancelTaskException();
}
@@ -262,7 +262,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
}
catch (Throwable t) {
- if (!this.owner.isCanceled()) {
+ if (!this.owner.isCanceledOrFailed()) {
// Perform clean up when the task failed and has been not canceled by the user
try {
@@ -275,10 +275,13 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();
- if (this.owner.isCanceled() || t instanceof CancelTaskException) {
+ // if we are already set as cancelled or failed (when failure is triggered externally),
+ // mark that the thread is done.
+ if (this.owner.isCanceledOrFailed() || t instanceof CancelTaskException) {
this.owner.cancelingDone();
}
else {
+ // failure from inside the task thread. notify the task of teh failure
this.owner.markFailed(t);
}
}
@@ -429,7 +432,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
*/
private void waitForOutputChannelsToBeClosed() throws InterruptedException {
// Make sure, we leave this method with an InterruptedException when the task has been canceled
- if (this.owner.isCanceled()) {
+ if (this.owner.isCanceledOrFailed()) {
return;
}
@@ -449,7 +452,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
while (!canceled.get()) {
// Make sure, we leave this method with an InterruptedException when the task has been canceled
- if (this.owner.isCanceled()) {
+ if (this.owner.isCanceledOrFailed()) {
throw new InterruptedException();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
index c3a36bb..b1d7adf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
@@ -138,4 +138,8 @@ public final class GlobalBufferPool {
this.buffers.clear();
}
}
+
+ public boolean isDestroyed() {
+ return isDestroyed;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f106614..d393e2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -147,9 +147,10 @@ public final class Task {
return environment;
}
- public boolean isCanceled() {
+ public boolean isCanceledOrFailed() {
return executionState == ExecutionState.CANCELING ||
- executionState == ExecutionState.CANCELED;
+ executionState == ExecutionState.CANCELED ||
+ executionState == ExecutionState.FAILED;
}
public String getTaskName() {
@@ -242,11 +243,60 @@ public final class Task {
}
}
+ /**
+ * Sets the tasks to be cancelled and reports a failure back to the master.
+ * This method is important if a failure needs to be reported to the master, because
+ * a simple canceled m
+ *
+ * @param cause The exception to report in the error message
+ */
+ public void failExternally(Throwable cause) {
+ while (true) {
+ ExecutionState current = this.executionState;
+
+ // if the task is already canceled (or canceling) or finished or failed,
+ // then we need not do anything
+ if (current == ExecutionState.FINISHED || current == ExecutionState.CANCELED ||
+ current == ExecutionState.CANCELING || current == ExecutionState.FAILED)
+ {
+ return;
+ }
+
+ if (current == ExecutionState.DEPLOYING) {
+ // directly set to canceled
+ if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+
+ notifyObservers(ExecutionState.FAILED, null);
+ taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, cause);
+ return;
+ }
+ }
+ else if (current == ExecutionState.RUNNING) {
+ // go to canceling and perform the actual task canceling
+ if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+ try {
+ this.environment.cancelExecution();
+ } catch (Throwable e) {
+ LOG.error("Error while cancelling the task.", e);
+ }
+
+ notifyObservers(ExecutionState.FAILED, null);
+ taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, cause);
+
+ return;
+ }
+ }
+ else {
+ throw new RuntimeException("unexpected state for cancelling: " + current);
+ }
+ }
+ }
+
public void cancelingDone() {
while (true) {
ExecutionState current = this.executionState;
- if (current == ExecutionState.CANCELED) {
+ if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) {
return;
}
if (!(current == ExecutionState.RUNNING || current == ExecutionState.CANCELING)) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 1819b79..305d39f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -398,7 +398,7 @@ public class TaskManager implements TaskOperationProtocol {
LOG.info("Shutting down TaskManager");
- cancelAndClearEverything();
+ cancelAndClearEverything(new Exception("Task Manager is shutting down"));
// first, stop the heartbeat thread and wait for it to terminate
this.heartbeatThread.interrupt();
@@ -699,11 +699,14 @@ public class TaskManager implements TaskOperationProtocol {
/**
* Removes all tasks from this TaskManager.
*/
- public void cancelAndClearEverything() {
- LOG.info("Cancelling all computations and discarding all cached data.");
- for (Task t : runningTasks.values()) {
- t.cancelExecution();
- runningTasks.remove(t.getExecutionId());
+ public void cancelAndClearEverything(Throwable cause) {
+ if (runningTasks.size() > 0) {
+ LOG.info("Cancelling all computations and discarding all cached data.");
+
+ for (Task t : runningTasks.values()) {
+ t.failExternally(cause);
+ runningTasks.remove(t.getExecutionId());
+ }
}
}
@@ -841,7 +844,7 @@ public class TaskManager implements TaskOperationProtocol {
// mark us as disconnected and abort all computation
this.registeredId = null;
- cancelAndClearEverything();
+ cancelAndClearEverything(new Exception("TaskManager lost heartbeat connection to JobManager"));
// wait for a while, then attempt to register again
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index eebe9b0..8ed7a6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -32,11 +32,16 @@ import org.apache.flink.runtime.jobmanager.JobManager;
public class JobManagerTestUtils {
public static final JobManager startJobManager(int numSlots) throws Exception {
+ return startJobManager(1, numSlots);
+ }
+
+ public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager) throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+ cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
GlobalConfiguration.includeConfiguration(cfg);
@@ -46,11 +51,13 @@ public class JobManagerTestUtils {
// max time is 5 seconds
long deadline = System.currentTimeMillis() + 5000;
- while (jm.getNumberOfSlotsAvailableToScheduler() < numSlots && System.currentTimeMillis() < deadline) {
+ while (jm.getNumberOfSlotsAvailableToScheduler() < numTaskManagers * numSlotsPerTaskManager &&
+ System.currentTimeMillis() < deadline)
+ {
Thread.sleep(10);
}
- assertEquals(numSlots, jm.getNumberOfSlotsAvailableToScheduler());
+ assertEquals(numTaskManagers * numSlotsPerTaskManager, jm.getNumberOfSlotsAvailableToScheduler());
return jm;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
new file mode 100644
index 0000000..def20a0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.*;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.tasks.BlockingReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.junit.Test;
+
+public class TaskManagerFailsITCase {
+
+ @Test
+ public void testExecutionWithFailingTaskManager() {
+ final int NUM_TASKS = 31;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(BlockingReceiver.class);
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ final JobManager jm = startJobManager(2, NUM_TASKS);
+
+ final TaskManager tm1 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+ final TaskManager tm2 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[1];
+
+ final GlobalBufferPool bp1 = tm1.getChannelManager().getGlobalBufferPool();
+ final GlobalBufferPool bp2 = tm2.getChannelManager().getGlobalBufferPool();
+
+ try {
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ // wait until everyone has settled in
+ long deadline = System.currentTimeMillis() + 2000;
+ while (System.currentTimeMillis() < deadline) {
+
+ boolean allrunning = true;
+ for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+ if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+ allrunning = false;
+ break;
+ }
+ }
+
+ if (allrunning) {
+ break;
+ }
+ Thread.sleep(200);
+ }
+
+ // kill one task manager
+ TaskManager tm = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+ tm.shutdown();
+
+ eg.waitForJobEnd();
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertTrue(bp1.isDestroyed() || bp1.numBuffers() == bp1.numAvailableBuffers());
+ assertTrue(bp2.isDestroyed() || bp2.numBuffers() == bp2.numAvailableBuffers());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
new file mode 100644
index 0000000..d6fec59
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.*;
+
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.BlockingReceiver;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.junit.Test;
+
+public class TaskManagerFailsWithSlotSharingITCase {
+
+ @Test
+ public void testExecutionWithFailingTaskManager() {
+ final int NUM_TASKS = 20;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(BlockingReceiver.class);
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+
+ final JobManager jm = startJobManager(2, NUM_TASKS / 2);
+
+ final TaskManager tm1 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+ final TaskManager tm2 = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[1];
+
+ final GlobalBufferPool bp1 = tm1.getChannelManager().getGlobalBufferPool();
+ final GlobalBufferPool bp2 = tm2.getChannelManager().getGlobalBufferPool();
+
+ try {
+ // we need to register the job at the library cache manager (with no libraries)
+ LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ // wait until everyone has settled in
+ long deadline = System.currentTimeMillis() + 2000;
+ while (System.currentTimeMillis() < deadline) {
+
+ boolean allrunning = true;
+ for (ExecutionVertex v : eg.getJobVertex(receiver.getID()).getTaskVertices()) {
+ if (v.getCurrentExecutionAttempt().getState() != ExecutionState.RUNNING) {
+ allrunning = false;
+ break;
+ }
+ }
+
+ if (allrunning) {
+ break;
+ }
+ Thread.sleep(200);
+ }
+
+ // kill one task manager
+ TaskManager tm = ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0];
+ tm.shutdown();
+
+ eg.waitForJobEnd();
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertTrue(bp1.isDestroyed() || bp1.numBuffers() == bp1.numAvailableBuffers());
+ assertTrue(bp2.isDestroyed() || bp2.numBuffers() == bp2.numAvailableBuffers());
+ }
+ finally {
+ jm.shutdown();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a91be2e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java
new file mode 100644
index 0000000..f7d4ee5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/BlockingReceiver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class BlockingReceiver extends AbstractInvokable {
+
+ @Override
+ public void registerInputOutput() {
+ new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ Object o = new Object();
+ synchronized (o) {
+ o.wait();
+ }
+ }
+}
[2/6] git commit: [FLINK-1115] Local file streams retry file creation
on FileNotFoundException to increase resilience against spurious failures in
tests
Posted by se...@apache.org.
[FLINK-1115] Local file streams retry file creation on FileNotFoundException to increase resilience against spurious failures in tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1ddec930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1ddec930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1ddec930
Branch: refs/heads/master
Commit: 1ddec930a29c1d870d5b5bbde0098d10ff9b45ce
Parents: cf80d86
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 19:36:27 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 19:36:27 2014 +0200
----------------------------------------------------------------------
.../core/fs/local/LocalDataOutputStream.java | 27 +++++++++++++-------
1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1ddec930/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
index 14ffefb..afef7c1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
@@ -16,10 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.core.fs.local;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -28,14 +28,15 @@ import org.apache.flink.core.fs.FSDataOutputStream;
/**
* The <code>LocalDataOutputStream</code> class is a wrapper class for a data
* output stream to the local file system.
- *
*/
public class LocalDataOutputStream extends FSDataOutputStream {
+ private static final int MAX_OPEN_TRIES = 3;
+
/**
* The file output stream used to write data.
*/
- private FileOutputStream fos = null;
+ private FileOutputStream fos;
/**
* Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object.
@@ -46,26 +47,34 @@ public class LocalDataOutputStream extends FSDataOutputStream {
* thrown if the data output stream cannot be created
*/
public LocalDataOutputStream(final File file) throws IOException {
-
- this.fos = new FileOutputStream(file);
+ // we allow multiple tries to create the file, to increase resilience against spurious I/O failures
+
+ FileNotFoundException lastException = null;
+
+ for (int attempt = 0; attempt < MAX_OPEN_TRIES; attempt++) {
+ try {
+ this.fos = new FileOutputStream(file);
+ return;
+ }
+ catch (FileNotFoundException e) {
+ lastException = e;
+ }
+ }
+ throw lastException;
}
-
@Override
public void write(final int b) throws IOException {
fos.write(b);
}
-
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
fos.write(b, off, len);
}
-
@Override
public void close() throws IOException {
-
fos.close();
}
}
[5/6] git commit: Remove error message in execution graph for
concurrent state changes that are fully acceptable.
Posted by se...@apache.org.
Remove error message in execution graph for concurrent state changes that are fully acceptable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ab0b3a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ab0b3a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ab0b3a31
Branch: refs/heads/master
Commit: ab0b3a314c6b306300748826289944c028dbedc1
Parents: 6bd4d2c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 23 20:49:39 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 23 20:49:39 2014 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/executiongraph/ExecutionGraph.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ab0b3a31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9576a64..07e2455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -372,7 +372,10 @@ public class ExecutionGraph {
int nextPos = nextVertexToFinish;
if (nextPos >= verticesInCreationOrder.size()) {
// already done, and we still get a report?
- LOG.error("Job entered finished state a repeated time.");
+ // this can happen when:
+ // - two job vertices finish almost simultaneously
+ // - The first one advances the position for the second as well (second is in final state)
+ // - the second (after it could grab the lock) tries to advance the position again
return;
}