You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:51 UTC
[02/10] flink git commit: [hotfix] Improve logging and thread
characteristics for 'EmbeddedNonHaServices'
[hotfix] Improve logging and thread characteristics for 'EmbeddedNonHaServices'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fc46db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fc46db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fc46db
Branch: refs/heads/flip-6
Commit: 44fc46dba0dcf91ee0f430f1e37f9f28e49ebbc2
Parents: 62e8e33
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 17:43:10 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100
----------------------------------------------------------------------
.../runtime/highavailability/EmbeddedNonHaServices.java | 7 +++++--
.../highavailability/nonha/AbstractNonHaServices.java | 9 +++++++--
.../highavailability/nonha/EmbeddedLeaderService.java | 5 ++++-
.../src/test/resources/log4j-test.properties | 2 +-
4 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index 523218e..b91cec1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -56,7 +56,10 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
@Override
public void shutdown() throws Exception {
- super.shutdown();
- resourceManagerLeaderService.shutdown();
+ try {
+ super.shutdown();
+ } finally {
+ resourceManagerLeaderService.shutdown();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 237727f..474faa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -55,7 +55,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
- private final RunningJobsRegistry runningJobsRegistry;
+ private final NonHaRegistry runningJobsRegistry;
private boolean shutdown;
@@ -167,8 +167,13 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
@Override
public Thread newThread(@Nonnull Runnable r) {
- Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet());
+ Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
+
+ // HA threads should have a very high priority, but not
+ // keep the JVM running by themselves
+ thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(true);
+
return thread;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
index 84ac551..9fad9be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -225,7 +225,7 @@ public class EmbeddedLeaderService {
// check if the confirmation is for the same grant, or whether it is a stale grant
if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
final String address = service.contender.getAddress();
- LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId);
+ LOG.info("Received confirmation of leadership for leader {} , session={}", address, leaderSessionId);
// mark leadership
currentLeaderConfirmed = service;
@@ -271,6 +271,9 @@ public class EmbeddedLeaderService {
currentLeaderSessionId = leaderSessionId;
currentLeaderProposed = leaderService;
+ LOG.info("Proposing leadership to contender {} @ {}",
+ leaderService.contender, leaderService.contender.getAddress());
+
notificationExecutor.execute(
new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-streaming-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/log4j-test.properties b/flink-streaming-java/src/test/resources/log4j-test.properties
index 881dc06..e7cd3e0 100644
--- a/flink-streaming-java/src/test/resources/log4j-test.properties
+++ b/flink-streaming-java/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.A1.layout.ConversionPattern=%-5r [%-38t] %-5p %-60c %x - %m%n