You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/10 15:33:40 UTC
[1/4] flink git commit: [FLINK-5766] [distributed coordination] Unify
the handling of NoResourceAvailableException
Repository: flink
Updated Branches:
refs/heads/master 66305135b -> e29dfb840
[FLINK-5766] [distributed coordination] Unify the handling of NoResourceAvailableException
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bde6ffb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bde6ffb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bde6ffb
Branch: refs/heads/master
Commit: 3bde6ffb6f55ec7ff807633ab1e79d9238e5a942
Parents: ef77c25
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 9 19:12:32 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:30 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 4 +-
.../executiongraph/ExecutionJobVertex.java | 4 +-
.../runtime/executiongraph/ExecutionVertex.java | 3 +-
.../flink/runtime/instance/SlotProvider.java | 5 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 28 ++++++-----
.../ScheduleWithCoLocationHintTest.java | 16 ++++---
.../scheduler/SchedulerIsolatedTasksTest.java | 11 +++--
.../scheduler/SchedulerSlotSharingTest.java | 49 ++++++++++----------
8 files changed, 63 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e29e5b6..60e5575 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -248,9 +247,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* immediately deploy it.
*
* @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
- * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
*/
- public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+ public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
if (slotProvider == null) {
throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e8664f7..3828fc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -39,11 +39,11 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
import java.io.IOException;
@@ -386,7 +386,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
// Actions
//---------------------------------------------------------------------------------------------
- public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+ public void scheduleAll(SlotProvider slotProvider, boolean queued) {
ExecutionVertex[] vertices = this.taskVertices;
http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index cb2e177..92327fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -528,7 +527,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
}
- public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+ public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
return this.currentExecution.scheduleForExecution(slotProvider, queued);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index 49e6d9f..919f6a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
/**
@@ -41,8 +40,6 @@ public interface SlotProvider {
* @param task The task to allocate the slot for
* @param allowQueued Whether allow the task be queued if we do not have enough resource
* @return The future of the allocation
- *
- * @throws NoResourceAvailableException
*/
- Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException;
+ Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index dc82440..72f9789 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -145,19 +145,25 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
@Override
- public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)
- throws NoResourceAvailableException {
+ public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+ try {
+ final Object ret = scheduleTask(task, allowQueued);
- final Object ret = scheduleTask(task, allowQueued);
- if (ret instanceof SimpleSlot) {
- return FlinkCompletableFuture.completed((SimpleSlot) ret);
- }
- else if (ret instanceof Future) {
- return (Future<SimpleSlot>) ret;
+ if (ret instanceof SimpleSlot) {
+ return FlinkCompletableFuture.completed((SimpleSlot) ret);
+ }
+ else if (ret instanceof Future) {
+ @SuppressWarnings("unchecked")
+ Future<SimpleSlot> typed = (Future<SimpleSlot>) ret;
+ return typed;
+ }
+ else {
+ // this should never happen, simply guard this case with an exception
+ throw new RuntimeException();
+ }
}
- else {
- // this should never happen, simply guard this case with an exception
- throw new RuntimeException();
+ catch (NoResourceAvailableException e) {
+ return FlinkCompletableFuture.completedExceptionally(e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index b803702..afb9dac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -36,6 +36,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
+import java.util.concurrent.ExecutionException;
+
public class ScheduleWithCoLocationHintTest {
@Test
@@ -241,8 +243,8 @@ public class ScheduleWithCoLocationHintTest {
try {
scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
fail("Scheduled even though no resource was available.");
- } catch (NoResourceAvailableException e) {
- // expected
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
@@ -509,11 +511,13 @@ public class ScheduleWithCoLocationHintTest {
try {
scheduler.allocateSlot(
- new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false);
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
fail("should not be able to find a resource");
- } catch (NoResourceAvailableException e) {
- // good
- } catch (Exception e) {
+ }
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
+ }
+ catch (Exception e) {
fail("wrong exception");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 9c21533..643efae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.areAllDistinct;
@@ -133,11 +134,11 @@ public class SchedulerIsolatedTasksTest {
assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
try {
- scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false);
+ scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
fail("Scheduler accepted scheduling request without available resource.");
}
- catch (NoResourceAvailableException e) {
- // pass!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
// release some slots again
@@ -313,8 +314,8 @@ public class SchedulerIsolatedTasksTest {
scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
fail("Scheduler served a slot from a dead instance");
}
- catch (NoResourceAvailableException e) {
- // fine
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception type.");
http://git-wip-us.apache.org/repos/asf/flink/blob/3bde6ffb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index c4121f6..5238e95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
import java.util.Random;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,8 +80,8 @@ public class SchedulerSlotSharingTest {
scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get();
fail("Scheduler accepted too many tasks at the same time");
}
- catch (NoResourceAvailableException e) {
- // good!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception.");
@@ -164,8 +165,8 @@ public class SchedulerSlotSharingTest {
scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
fail("Scheduler accepted too many tasks at the same time");
}
- catch (NoResourceAvailableException e) {
- // good!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception.");
@@ -187,8 +188,8 @@ public class SchedulerSlotSharingTest {
scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
fail("Scheduler accepted too many tasks at the same time");
}
- catch (NoResourceAvailableException e) {
- // good!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception.");
@@ -208,8 +209,8 @@ public class SchedulerSlotSharingTest {
scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get();
fail("Scheduler accepted too many tasks at the same time");
}
- catch (NoResourceAvailableException e) {
- // good!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception.");
@@ -376,8 +377,8 @@ public class SchedulerSlotSharingTest {
scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get();
fail("Scheduler accepted too many tasks at the same time");
}
- catch (NoResourceAvailableException e) {
- // good!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception.");
@@ -519,8 +520,8 @@ public class SchedulerSlotSharingTest {
scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get();
fail("Scheduler accepted too many tasks at the same time");
}
- catch (NoResourceAvailableException e) {
- // good!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception.");
@@ -530,30 +531,30 @@ public class SchedulerSlotSharingTest {
scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get();
fail("Scheduler accepted too many tasks at the same time");
}
- catch (NoResourceAvailableException e) {
- // good!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception.");
}
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get();
fail("Scheduler accepted too many tasks at the same time");
}
- catch (NoResourceAvailableException e) {
- // good!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception.");
}
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false).get();
fail("Scheduler accepted too many tasks at the same time");
}
- catch (NoResourceAvailableException e) {
- // good!
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
catch (Exception e) {
fail("Wrong exception.");
@@ -1059,13 +1060,13 @@ public class SchedulerSlotSharingTest {
SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get();
try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false).get();
fail("should throw an exception");
}
- catch (NoResourceAvailableException e) {
- // expected
+ catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof NoResourceAvailableException);
}
-
+
assertEquals(0, scheduler.getNumberOfAvailableSlots());
s3_0.releaseSlot();
[3/4] flink git commit: [FLINK-5718] [core] TaskManagers exit the JVM
on fatal exceptions.
Posted by se...@apache.org.
[FLINK-5718] [core] TaskManagers exit the JVM on fatal exceptions.
This closes #3276
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dfc6fba5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dfc6fba5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dfc6fba5
Branch: refs/heads/master
Commit: dfc6fba5b9830e6a7804a6a0c9f69b36bf772730
Parents: 3bde6ff
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 6 15:52:39 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:31 2017 +0100
----------------------------------------------------------------------
docs/setup/config.md | 4 +-
.../flink/configuration/TaskManagerOptions.java | 5 +
.../org/apache/flink/util/ExceptionUtils.java | 37 +++
.../taskexecutor/TaskManagerConfiguration.java | 19 +-
.../apache/flink/runtime/taskmanager/Task.java | 14 +
.../taskmanager/TaskManagerRuntimeInfo.java | 8 +
...askManagerComponentsStartupShutdownTest.java | 3 +-
.../flink/runtime/testutils/TestJvmProcess.java | 9 +
.../runtime/util/JvmExitOnFatalErrorTest.java | 259 +++++++++++++++++++
.../util/TestingTaskManagerRuntimeInfo.java | 6 +
.../flink/core/testutils/CommonTestUtils.java | 25 ++
11 files changed, 385 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 2accdc2..b86c534 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m
- `taskmanager.memory.segment-size`: The size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
-- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`. If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC.
+- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`. If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. **Note:** For streaming setups, we highly recommend to set this value to `false` as the core state backends currently do not use the managed memory.
### Memory and Performance Debugging
@@ -265,6 +265,8 @@ The following parameters configure Flink's JobManager and TaskManagers.
- `taskmanager.refused-registration-pause`: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **10 s**)
+- `taskmanager.jvm-exit-on-oom`: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws an `OutOfMemoryError` (DEFAULT: **false**).
+
- `blob.fetch.retries`: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: **50**).
- `blob.fetch.num-concurrent`: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: **50**).
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 6f6238b..b7ee20a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -33,6 +33,11 @@ public class TaskManagerOptions {
// ------------------------------------------------------------------------
// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
+
+ /** Whether to kill the TaskManager when the task thread throws an OutOfMemoryError */
+ public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
+ key("taskmanager.jvm-exit-on-oom")
+ .defaultValue(false);
// ------------------------------------------------------------------------
// Network Options
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 1069f2d..6ba9ef6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -33,8 +33,13 @@ import java.io.StringWriter;
import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * A collection of utility functions for dealing with exceptions and exception workflows.
+ */
@Internal
public final class ExceptionUtils {
+
+ /** The stringified representation of a null exception reference */
public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
/**
@@ -64,6 +69,38 @@ public final class ExceptionUtils {
}
/**
+ * Checks whether the given exception indicates a situation that may leave the
+ * JVM in a corrupted state, meaning a state where continued normal operation can only be
+ * guaranteed via clean process restart.
+ *
+ * <p>Currently considered fatal exceptions are Virtual Machine errors indicating
+ * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
+ * and {@link java.util.zip.ZipError} (a special case of InternalError).
+ *
+ * @param t The exception to check.
+ * @return True, if the exception is considered fatal to the JVM, false otherwise.
+ */
+ public static boolean isJvmFatalError(Throwable t) {
+ return (t instanceof InternalError) || (t instanceof UnknownError);
+ }
+
+ /**
+ * Checks whether the given exception indicates a situation that may leave the
+ * JVM in a corrupted state, or an out-of-memory error.
+ *
+ * <p>See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a list of fatal JVM errors.
+ * This method additionally classifies the {@link OutOfMemoryError} as fatal, because it
+ * may occur in any thread (not the one that allocated the majority of the memory) and thus
+ * is often not recoverable by destroying the particular thread that threw the exception.
+ *
+ * @param t The exception to check.
+ * @return True, if the exception is fatal to the JVM or and OutOfMemoryError, false otherwise.
+ */
+ public static boolean isJvmFatalOrOutOfMemoryError(Throwable t) {
+ return isJvmFatalError(t) || t instanceof OutOfMemoryError;
+ }
+
+ /**
* Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
* to a prior exception, or returns the new exception, if no prior exception exists.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 1d1e732..a6e4748 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -21,12 +21,15 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.duration.Duration;
import java.io.File;
@@ -53,6 +56,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
private final UnmodifiableConfiguration configuration;
+ private final boolean exitJvmOnOutOfMemory;
+
public TaskManagerConfiguration(
int numberSlots,
String[] tmpDirectories,
@@ -62,7 +67,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
Time maxRegistrationPause,
Time refusedRegistrationPause,
long cleanupInterval,
- Configuration configuration) {
+ Configuration configuration,
+ boolean exitJvmOnOutOfMemory) {
this.numberSlots = numberSlots;
this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
@@ -73,6 +79,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
+ this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
}
public int getNumberSlots() {
@@ -113,6 +120,11 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
return tmpDirectories;
}
+ @Override
+ public boolean shouldExitJvmOnOutOfMemoryError() {
+ return exitJvmOnOutOfMemory;
+ }
+
// --------------------------------------------------------------------------------------------
// Static factory methods
// --------------------------------------------------------------------------------------------
@@ -205,6 +217,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
}
+ final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);
+
return new TaskManagerConfiguration(
numberSlots,
tmpDirPaths,
@@ -214,6 +228,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
maxRegistrationPause,
refusedRegistrationPause,
cleanupInterval,
- configuration);
+ configuration,
+ exitOnOom);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c2e6d09..64a83c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
@@ -700,6 +701,19 @@ public class Task implements Runnable, TaskActions {
// ----------------------------------------------------------------
try {
+ // check if the exception is unrecoverable
+ if (ExceptionUtils.isJvmFatalError(t) ||
+ (t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()))
+ {
+ // terminate the JVM immediately
+ // don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
+ try {
+ LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
+ } finally {
+ Runtime.getRuntime().halt(-1);
+ }
+ }
+
// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
// loop for multiple retries during concurrent state changes via calls to cancel() or
// to failExternally()
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index d1efe34..dbf8c73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -38,4 +38,12 @@ public interface TaskManagerRuntimeInfo {
* @return The list of temporary file directories.
*/
String[] getTmpDirectories();
+
+ /**
+ * Checks whether the TaskManager should exit the JVM when the task thread throws
+ * an OutOfMemoryError.
+ *
+ * @return True to terminate the JVM on an OutOfMemoryError, false otherwise.
+ */
+ boolean shouldExitJvmOnOutOfMemoryError();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index a6e1a2b..e26e176 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -109,7 +109,8 @@ public class TaskManagerComponentsStartupShutdownTest {
Time.seconds(30),
Time.seconds(10),
1000000, // cleanup interval
- config);
+ config,
+ false); // exit-jvm-on-fatal-error
final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 5954ee5..4578edf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -295,6 +295,15 @@ public abstract class TestJvmProcess {
}
}
+ public void waitFor() throws InterruptedException {
+ Process process = this.process;
+ if (process != null) {
+ process.waitFor();
+ } else {
+ throw new IllegalStateException("process not started");
+ }
+ }
+
// ---------------------------------------------------------------------------------------------
// File based synchronization utilities
// ---------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
new file mode 100644
index 0000000..10f4303
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.*;
+
+/**
+ * Test that verifies the behavior of blocking shutdown hooks and of the
+ * {@link JvmShutdownSafeguard} that guards against it.
+ */
+public class JvmExitOnFatalErrorTest {
+
+ @Test
+ public void testExitJvmOnOutOfMemory() throws Exception {
+ // this test works only on linux
+ assumeTrue(OperatingSystem.isLinux());
+
+ // this test leaves remaining processes if not executed with Java 8
+ CommonTestUtils.assumeJava8();
+
+ // to check what went wrong (when the test hangs) uncomment this line
+// ProcessEntryPoint.main(new String[0]);
+
+ final KillOnFatalErrorProcess testProcess = new KillOnFatalErrorProcess();
+
+ try {
+ testProcess.startProcess();
+ testProcess.waitFor();
+ }
+ finally {
+ testProcess.destroy();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Blocking Process Implementation
+ // ------------------------------------------------------------------------
+
+ private static final class KillOnFatalErrorProcess extends TestJvmProcess {
+
+ public KillOnFatalErrorProcess() throws Exception {}
+
+ @Override
+ public String getName() {
+ return "KillOnFatalErrorProcess";
+ }
+
+ @Override
+ public String[] getJvmArgs() {
+ return new String[0];
+ }
+
+ @Override
+ public String getEntryPointClassName() {
+ return ProcessEntryPoint.class.getName();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static final class ProcessEntryPoint {
+
+ public static void main(String[] args) throws Exception {
+
+ System.err.println("creating task");
+
+ // we suppress process exits via errors here to not
+ // have a test that exits accidentally due to a programming error
+ try {
+ final Configuration taskManagerConfig = new Configuration();
+ taskManagerConfig.setBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY, true);
+
+ final JobID jid = new JobID();
+ final JobVertexID jobVertexId = new JobVertexID();
+ final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+ final AllocationID slotAllocationId = new AllocationID();
+
+ final SerializedValue<ExecutionConfig> execConfig = new SerializedValue<>(new ExecutionConfig());
+
+ final JobInformation jobInformation = new JobInformation(
+ jid, "Test Job", execConfig, new Configuration(),
+ Collections.<BlobKey>emptyList(), Collections.<URL>emptyList());
+
+ final TaskInformation taskInformation = new TaskInformation(
+ jobVertexId, "Test Task", 1, 1, OomInvokable.class.getName(), new Configuration());
+
+ final MemoryManager memoryManager = new MemoryManager(1024 * 1024, 1);
+ final IOManager ioManager = new IOManagerAsync();
+
+ final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+ when(networkEnvironment.createKvStateTaskRegistry(jid, jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class));
+
+ final TaskManagerRuntimeInfo tmInfo = TaskManagerConfiguration.fromConfiguration(taskManagerConfig);
+
+ final Executor executor = Executors.newCachedThreadPool();
+
+ Task task = new Task(
+ jobInformation,
+ taskInformation,
+ executionAttemptID,
+ slotAllocationId,
+ 0, // subtaskIndex
+ 0, // attemptNumber
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ 0, // targetSlotNumber
+ null, // taskStateHandles,
+ memoryManager,
+ ioManager,
+ networkEnvironment,
+ new BroadcastVariableManager(),
+ new NoOpTaskManagerActions(),
+ new NoOpInputSplitProvider(),
+ new NoOpCheckpointResponder(),
+ new FallbackLibraryCacheManager(),
+ new FileCache(tmInfo.getTmpDirectories()),
+ tmInfo,
+ new UnregisteredTaskMetricsGroup(),
+ new NoOpResultPartitionConsumableNotifier(),
+ new NoOpPartitionProducerStateChecker(),
+ executor);
+
+ System.err.println("starting task thread");
+
+ task.startTaskThread();
+ }
+ catch (Throwable t) {
+ System.err.println("ERROR STARTING TASK");
+ t.printStackTrace();
+ }
+
+ System.err.println("parking the main thread");
+ CommonTestUtils.blockForeverNonInterruptibly();
+ }
+
+ public static final class OomInvokable extends AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ throw new OutOfMemoryError();
+ }
+ }
+
+ private static final class NoOpTaskManagerActions implements TaskManagerActions {
+
+ @Override
+ public void notifyFinalState(ExecutionAttemptID executionAttemptID) {}
+
+ @Override
+ public void notifyFatalError(String message, Throwable cause) {}
+
+ @Override
+ public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {}
+
+ @Override
+ public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {}
+ }
+
+ private static final class NoOpInputSplitProvider implements InputSplitProvider {
+
+ @Override
+ public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
+ return null;
+ }
+ }
+
+ private static final class NoOpCheckpointResponder implements CheckpointResponder {
+
+ @Override
+ public void acknowledgeCheckpoint(JobID j, ExecutionAttemptID e, CheckpointMetaData c, SubtaskState s) {}
+
+ @Override
+ public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}
+ }
+
+ private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+ @Override
+ public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
+ }
+
+ private static final class NoOpPartitionProducerStateChecker implements PartitionProducerStateChecker {
+
+ @Override
+ public Future<ExecutionState> requestPartitionProducerState(
+ JobID jobId, IntermediateDataSetID intermediateDataSetId, ResultPartitionID r) {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
index 0d0363f..4183152 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
@@ -59,4 +59,10 @@ public class TestingTaskManagerRuntimeInfo implements TaskManagerRuntimeInfo {
public String[] getTmpDirectories() {
return tmpDirectories;
}
+
+ @Override
+ public boolean shouldExitJvmOnOutOfMemoryError() {
+ // never kill the JVM in tests
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dfc6fba5/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 2eb18c1..639b065 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -97,6 +97,27 @@ public class CommonTestUtils {
}
/**
+ * Permanently blocks the current thread. The thread cannot be woken
+ * up via {@link Thread#interrupt()}.
+ */
+ public static void blockForeverNonInterruptibly() {
+ final Object lock = new Object();
+ //noinspection InfiniteLoopStatement
+ while (true) {
+ try {
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (lock) {
+ lock.wait();
+ }
+ } catch (InterruptedException ignored) {}
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Preconditions on the test environment
+ // ------------------------------------------------------------------------
+
+ /**
* Checks whether this code runs in a Java 8 (Java 1.8) JVM. If not, this throws a
* {@link AssumptionViolatedException}, which causes JUnit to skip the test that
* called this method.
@@ -117,6 +138,10 @@ public class CommonTestUtils {
}
}
+ // ------------------------------------------------------------------------
+ // Manipulation of environment
+ // ------------------------------------------------------------------------
+
public static void setEnv(Map<String, String> newenv) {
setEnv(newenv, true);
}
[4/4] flink git commit: [hotfix] [dist] Add notice about memory
pre-allocation to default 'flink-conf.yaml'
Posted by se...@apache.org.
[hotfix] [dist] Add notice about memory pre-allocation to default 'flink-conf.yaml'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e29dfb84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e29dfb84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e29dfb84
Branch: refs/heads/master
Commit: e29dfb840495e0c1fd01e87a4af1abbf98103fa4
Parents: dfc6fba
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 10 14:53:58 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:31 2017 +0100
----------------------------------------------------------------------
flink-dist/src/main/resources/flink-conf.yaml | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e29dfb84/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 0f30595..72acbeb 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -53,6 +53,8 @@ taskmanager.numberOfTaskSlots: 1
# Specify whether TaskManager memory should be allocated when starting up (true) or when
# memory is required in the memory manager (false)
+# Important Note: For pure streaming setups, we highly recommend to set this value to `false`
+# as the default state backends currently do not use the managed memory.
taskmanager.memory.preallocate: false
[2/4] flink git commit: [FLINK-5759] [jobmanager] Set
UncaughtExceptionHandlers for JobManager's Future and I/O thread pools
Posted by se...@apache.org.
[FLINK-5759] [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools
This closes #3290
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef77c254
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef77c254
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef77c254
Branch: refs/heads/master
Commit: ef77c254dadbe4c04810681fe765f5ec7d2a7400
Parents: 6630513
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 9 14:04:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:30 2017 +0100
----------------------------------------------------------------------
.../MesosApplicationMasterRunner.java | 10 +-
.../flink/runtime/filecache/FileCache.java | 3 +-
.../runtime/jobmaster/JobManagerServices.java | 6 +-
.../runtime/util/ExecutorThreadFactory.java | 123 ++++++++++++++-----
.../flink/runtime/util/NamedThreadFactory.java | 58 ---------
.../flink/runtime/jobmanager/JobManager.scala | 4 +-
.../runtime/minicluster/FlinkMiniCluster.scala | 10 +-
.../flink/yarn/YarnApplicationMasterRunner.java | 8 +-
8 files changed, 119 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 5033692..a23c9f6 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -22,10 +22,12 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -52,15 +54,17 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.mesos.Protos;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
@@ -216,11 +220,11 @@ public class MesosApplicationMasterRunner {
futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
- new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+ new ExecutorThreadFactory("mesos-jobmanager-future"));
ioExecutor = Executors.newFixedThreadPool(
numberProcessors,
- new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
+ new ExecutorThreadFactory("mesos-jobmanager-io"));
mesosServices = MesosServicesUtils.createMesosServices(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 21456de..4f2166f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -99,7 +99,8 @@ public class FileCache {
this.shutdownHook = createShutdownHook(this, LOG);
this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
- this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
+ this.executorService = Executors.newScheduledThreadPool(10,
+ new ExecutorThreadFactory("flink-file-cache"));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index 95500e5..8cda0f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -120,8 +120,12 @@ public class JobManagerServices {
throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
}
+ final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
+ Hardware.getNumberCPUCores(),
+ new ExecutorThreadFactory("jobmanager-future"));
+
return new JobManagerServices(
- Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE),
+ futureExecutor,
libraryCacheManager,
RestartStrategyFactory.createRestartStrategyFactory(config),
Time.of(timeout.length(), timeout.unit()));
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
index 2fb5972..4a79db3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
@@ -18,49 +18,114 @@
package org.apache.flink.runtime.util;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread factory intended for use by critical thread pools. Critical thread pools here
+ * mean thread pools that support Flink's core coordination and processing work, and which
+ * must not simply cause unnoticed errors.
+ *
+ * <p>The thread factory can be given an {@link UncaughtExceptionHandler} for the threads.
+ * If no handler is explicitly given, the default handler for uncaught exceptions will log
+ * the exceptions and kill the process afterwards. That guarantees that critical exceptions are
+ * not accidentally lost and leave the system running in an inconsistent state.
+ *
+ * <p>Threads created by this factory are all called '(pool-name)-thread-n', where
+ * <i>(pool-name)</i> is configurable, and <i>n</i> is an incrementing number.
+ *
+ * <p>All threads created by this factory are daemon threads and have the default (normal)
+ * priority.
+ */
public class ExecutorThreadFactory implements ThreadFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadFactory.class);
-
-
- private static final String THREAD_NAME_PREFIX = "Flink Executor Thread - ";
-
- private static final AtomicInteger COUNTER = new AtomicInteger(1);
-
- private static final ThreadGroup THREAD_GROUP = new ThreadGroup("Flink Executor Threads");
-
- private static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = new LoggingExceptionHander();
-
-
- public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
-
- // --------------------------------------------------------------------------------------------
-
- private ExecutorThreadFactory() {}
-
-
- public Thread newThread(Runnable target) {
- Thread t = new Thread(THREAD_GROUP, target, THREAD_NAME_PREFIX + COUNTER.getAndIncrement());
+
+ /** The thread pool name used when no explicit pool name has been specified */
+ private static final String DEFAULT_POOL_NAME = "flink-executor-pool";
+
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ private final ThreadGroup group;
+
+ private final String namePrefix;
+
+ private final UncaughtExceptionHandler exceptionHandler;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new thread factory using the default thread pool name ('flink-executor-pool')
+ * and the default uncaught exception handler (log exception and kill process).
+ */
+ public ExecutorThreadFactory() {
+ this(DEFAULT_POOL_NAME);
+ }
+
+ /**
+ * Creates a new thread factory using the given thread pool name and the default
+ * uncaught exception handler (log exception and kill process).
+ *
+ * @param poolName The pool name, used as the threads' name prefix
+ */
+ public ExecutorThreadFactory(String poolName) {
+ this(poolName, FatalExitExceptionHandler.INSTANCE);
+ }
+
+ /**
+ * Creates a new thread factory using the given thread pool name and the given
+ * uncaught exception handler.
+ *
+ * @param poolName The pool name, used as the threads' name prefix
+ * @param exceptionHandler The uncaught exception handler for the threads
+ */
+ public ExecutorThreadFactory(String poolName, UncaughtExceptionHandler exceptionHandler) {
+ checkNotNull(poolName, "poolName");
+
+ SecurityManager securityManager = System.getSecurityManager();
+ this.group = (securityManager != null) ? securityManager.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+
+ this.namePrefix = poolName + "-thread-";
+ this.exceptionHandler = exceptionHandler;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(true);
- t.setUncaughtExceptionHandler(EXCEPTION_HANDLER);
+
+ // normalize the priority
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+
+ // optional handler for uncaught exceptions
+ if (exceptionHandler != null) {
+ t.setUncaughtExceptionHandler(exceptionHandler);
+ }
+
return t;
}
-
+
// --------------------------------------------------------------------------------------------
-
- private static final class LoggingExceptionHander implements Thread.UncaughtExceptionHandler {
+
+ private static final class FatalExitExceptionHandler implements UncaughtExceptionHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FatalExitExceptionHandler.class);
+
+ static final FatalExitExceptionHandler INSTANCE = new FatalExitExceptionHandler();
@Override
public void uncaughtException(Thread t, Throwable e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e);
- }
+ LOG.error("FATAL: Thread '" + t.getName() + "' produced an uncaught exception. Stopping the process...", e);
+ System.exit(-17);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
deleted file mode 100644
index bd97963..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Thread factory which allows to specify a thread pool name and a thread name.
- *
- * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}.
- */
-public class NamedThreadFactory implements ThreadFactory {
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
-
- public NamedThreadFactory(final String poolName, final String threadName) {
- SecurityManager securityManager = System.getSecurityManager();
- group = (securityManager != null) ? securityManager.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
-
- namePrefix = poolName +
- poolNumber.getAndIncrement() +
- threadName;
- }
-
- @Override
- public Thread newThread(Runnable runnable) {
- Thread t = new Thread(group, runnable,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon()) {
- t.setDaemon(false);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d575f68..a335916 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2023,11 +2023,11 @@ object JobManager {
val futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
- new NamedThreadFactory("jobmanager-future-", "-thread-"))
+ new ExecutorThreadFactory("jobmanager-future"))
val ioExecutor = Executors.newFixedThreadPool(
numberProcessors,
- new NamedThreadFactory("jobmanager-io-", "-thread-"))
+ new ExecutorThreadFactory("jobmanager-io"))
val timeout = AkkaUtils.getTimeout(configuration)
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 64cc97d..07fb996 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.minicluster
-import java.net.InetAddress
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}
@@ -26,7 +25,7 @@ import akka.pattern.Patterns.gracefulStop
import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config
-import org.apache.flink.api.common.time.Time
+
import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
@@ -37,8 +36,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.{Hardware, NamedThreadFactory, ZooKeeperUtils}
+import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware, ZooKeeperUtils}
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+
import org.slf4j.LoggerFactory
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,11 +109,11 @@ abstract class FlinkMiniCluster(
val futureExecutor = Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
- new NamedThreadFactory("mini-cluster-future-", "-thread"))
+ new ExecutorThreadFactory("mini-cluster-future"))
val ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
- new NamedThreadFactory("mini-cluster-io-", "-thread"))
+ new ExecutorThreadFactory("mini-cluster-io"))
def configuration: Configuration = {
if (originalConfiguration.getInteger(
http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 29f1827..5cc51e4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -38,14 +38,14 @@ import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
-
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -230,11 +230,11 @@ public class YarnApplicationMasterRunner {
final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
- new NamedThreadFactory("yarn-jobmanager-future-", "-thread-"));
+ new ExecutorThreadFactory("yarn-jobmanager-future"));
final ExecutorService ioExecutor = Executors.newFixedThreadPool(
numberProcessors,
- new NamedThreadFactory("yarn-jobmanager-io-", "-thread-"));
+ new ExecutorThreadFactory("yarn-jobmanager-io"));
try {
// ------- (1) load and parse / validate all configurations -------