You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:51 UTC

[02/10] flink git commit: [hotfix] Improve logging and thread characteristics for 'EmbeddedNonHaServices'

[hotfix] Improve logging and thread characteristics for 'EmbeddedNonHaServices'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fc46db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fc46db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fc46db

Branch: refs/heads/flip-6
Commit: 44fc46dba0dcf91ee0f430f1e37f9f28e49ebbc2
Parents: 62e8e33
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 2 17:43:10 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../runtime/highavailability/EmbeddedNonHaServices.java     | 7 +++++--
 .../highavailability/nonha/AbstractNonHaServices.java       | 9 +++++++--
 .../highavailability/nonha/EmbeddedLeaderService.java       | 5 ++++-
 .../src/test/resources/log4j-test.properties                | 2 +-
 4 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index 523218e..b91cec1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -56,7 +56,10 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
 
 	@Override
 	public void shutdown() throws Exception {
-		super.shutdown();
-		resourceManagerLeaderService.shutdown();
+		try {
+			super.shutdown();
+		} finally {
+			resourceManagerLeaderService.shutdown();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 237727f..474faa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -55,7 +55,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 
 	private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
 
-	private final RunningJobsRegistry runningJobsRegistry;
+	private final NonHaRegistry runningJobsRegistry;
 
 	private boolean shutdown;
 
@@ -167,8 +167,13 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 
 		@Override
 		public Thread newThread(@Nonnull Runnable r) {
-			Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet());
+			Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet());
+
+			// HA threads should have a very high priority, but not
+			// keep the JVM running by themselves
+			thread.setPriority(Thread.MAX_PRIORITY);
 			thread.setDaemon(true);
+
 			return thread;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
index 84ac551..9fad9be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -225,7 +225,7 @@ public class EmbeddedLeaderService {
 				// check if the confirmation is for the same grant, or whether it is a stale grant 
 				if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
 					final String address = service.contender.getAddress();
-					LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId);
+					LOG.info("Received confirmation of leadership for leader {} , session={}", address, leaderSessionId);
 
 					// mark leadership
 					currentLeaderConfirmed = service;
@@ -271,6 +271,9 @@ public class EmbeddedLeaderService {
 				currentLeaderSessionId = leaderSessionId;
 				currentLeaderProposed = leaderService;
 
+				LOG.info("Proposing leadership to contender {} @ {}",
+						leaderService.contender, leaderService.contender.getAddress());
+
 				notificationExecutor.execute(
 						new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-streaming-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/log4j-test.properties b/flink-streaming-java/src/test/resources/log4j-test.properties
index 881dc06..e7cd3e0 100644
--- a/flink-streaming-java/src/test/resources/log4j-test.properties
+++ b/flink-streaming-java/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
 
 # A1 uses PatternLayout.
 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.A1.layout.ConversionPattern=%-5r [%-38t] %-5p %-60c %x - %m%n