You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/28 16:22:13 UTC

[1/6] flink git commit: [FLINK-5171] [runtime] fix wrong use of Preconditions.checkState in TaskManagerRunner

Repository: flink
Updated Branches:
  refs/heads/flip-6 8f45e314a -> 4afcc4abd


[FLINK-5171] [runtime] fix wrong use of Preconditions.checkState in TaskManagerRunner

This closes #2880.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09ec78a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09ec78a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09ec78a5

Branch: refs/heads/flip-6
Commit: 09ec78a5bd5db1b74c267c8a69e182543135a161
Parents: 8f45e31
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Mon Nov 28 17:21:26 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 28 17:11:28 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09ec78a5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 99a7c5d..a18ff40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -185,7 +185,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 		final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
 
-		Preconditions.checkState(rpcPort < 0 || rpcPort >65535, "Invalid value for " +
+		Preconditions.checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " +
 				"'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " +
 				"use 0 to let the system choose port automatically.",
 			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);


[2/6] flink git commit: [FLINK-5170] [akka] Fix hostname usage in AkkaUtils.getConfig

Posted by tr...@apache.org.
[FLINK-5170] [akka] Fix hostname usage in AkkaUtils.getConfig

This closes #2879.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8265b545
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8265b545
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8265b545

Branch: refs/heads/flip-6
Commit: 8265b545cff2fb5e6ca3771effd46a2fda488576
Parents: 09ec78a
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Mon Nov 28 17:14:35 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 28 17:11:29 2016 +0100

----------------------------------------------------------------------
 .../src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8265b545/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 9ac9773..ddb9fc3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -115,7 +115,7 @@ object AkkaUtils {
   }
 
   def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = {
-    getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) else None)
+    getAkkaConfig(configuration, if (hostname != null) Some((hostname, port)) else None)
   }
 
   /**


[5/6] flink git commit: [FLINK-5093] Fix bug about throwing ConcurrentModificationException when stopping TimerService.

Posted by tr...@apache.org.
[FLINK-5093] Fix bug about throwing ConcurrentModificationException when stopping TimerService.

[FLINK-5093] Remove useless import.

This closes #2828.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/029db00c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/029db00c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/029db00c

Branch: refs/heads/flip-6
Commit: 029db00c1d74222fd9f67b08213668fd0eea1e4d
Parents: dc8254d
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Fri Nov 18 18:15:37 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 28 17:12:56 2016 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/slot/TimerService.java | 16 ++++-
 .../taskexecutor/slot/TimerServiceTest.java     | 68 ++++++++++++++++++++
 2 files changed, 81 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/029db00c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index e28e801..14c9ab1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -60,9 +60,7 @@ public class TimerService<K> {
 	}
 
 	public void stop() {
-		for (K key: timeouts.keySet()) {
-			unregisterTimeout(key);
-		}
+		unregisterAllTimeouts();
 
 		timeoutListener = null;
 
@@ -101,6 +99,18 @@ public class TimerService<K> {
 	}
 
 	/**
+	 * Unregister all timeouts.
+	 */
+	protected void unregisterAllTimeouts() {
+		for (Timeout<K> timeout : timeouts.values()) {
+			if (timeout != null) {
+				timeout.cancel();
+			}
+		}
+		timeouts.clear();
+	}
+
+	/**
 	 * Check whether the timeout for the given key and ticket is still valid (not yet unregistered
 	 * and not yet overwritten).
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/029db00c/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
new file mode 100644
index 0000000..9dd5f39
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TimerServiceTest {
+	/**
+	 * Test all timeouts registered can be unregistered
+	 * @throws Exception
+   */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testUnregisterAllTimeouts() throws Exception {
+		// Prepare all instances.
+		ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+		ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
+		when(scheduledExecutorService.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
+			.thenReturn(scheduledFuture);
+		TimerService<AllocationID> timerService = new TimerService<>(scheduledExecutorService);
+		TimeoutListener<AllocationID> listener = mock(TimeoutListener.class);
+
+		timerService.start(listener);
+
+		// Invoke register and unregister.
+		timerService.registerTimeout(new AllocationID(), 10, TimeUnit.SECONDS);
+		timerService.registerTimeout(new AllocationID(), 10, TimeUnit.SECONDS);
+
+		timerService.unregisterAllTimeouts();
+
+		// Verify.
+		Map<?, ?> timeouts = (Map<?, ?>) Whitebox.getInternalState(timerService, "timeouts");
+		assertTrue(timeouts.isEmpty());
+		verify(scheduledFuture, times(2)).cancel(true);
+	}
+
+}


[6/6] flink git commit: [FLINK-5093] Add proper shutdown of scheduled executor service in TimerService

Posted by tr...@apache.org.
[FLINK-5093] Add proper shutdown of scheduled executor service in TimerService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4afcc4ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4afcc4ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4afcc4ab

Branch: refs/heads/flip-6
Commit: 4afcc4abda1848f6beaeaccc190d078f1775fd12
Parents: 029db00
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 28 15:25:57 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 28 17:12:57 2016 +0100

----------------------------------------------------------------------
 .../taskexecutor/TaskManagerServices.java       |  4 ++-
 .../TaskManagerServicesConfiguration.java       | 20 ++++++++++++--
 .../runtime/taskexecutor/slot/TimerService.java | 29 +++++++++++++++++---
 .../taskexecutor/TaskExecutorITCase.java        |  2 +-
 .../taskexecutor/slot/TimerServiceTest.java     |  2 +-
 5 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 7966078..d53c328 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -175,7 +175,9 @@ public class TaskManagerServices {
 			resourceProfiles.add(new ResourceProfile(1.0, 42L));
 		}
 
-		final TimerService<AllocationID> timerService = new TimerService<>(new ScheduledThreadPoolExecutor(1));
+		final TimerService<AllocationID> timerService = new TimerService<>(
+			new ScheduledThreadPoolExecutor(1),
+			taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
 
 		final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 702142f..ff80bca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.HybridMemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -35,6 +36,7 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -57,6 +59,8 @@ public class TaskManagerServicesConfiguration {
 
 	private final float memoryFraction;
 
+	private final long timerServiceShutdownTimeout;
+
 	public TaskManagerServicesConfiguration(
 		InetAddress taskManagerAddress,
 		String[] tmpDirPaths,
@@ -64,7 +68,8 @@ public class TaskManagerServicesConfiguration {
 		int numberOfSlots,
 		long configuredMemory,
 		boolean preAllocateMemory,
-		float memoryFraction) {
+		float memoryFraction,
+		long timerServiceShutdownTimeout) {
 
 		this.taskManagerAddress = checkNotNull(taskManagerAddress);
 		this.tmpDirPaths = checkNotNull(tmpDirPaths);
@@ -74,6 +79,10 @@ public class TaskManagerServicesConfiguration {
 		this.configuredMemory = configuredMemory;
 		this.preAllocateMemory = preAllocateMemory;
 		this.memoryFraction = memoryFraction;
+
+		checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
+			"service shutdown timeout must be greater or equal to 0.");
+		this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -107,6 +116,10 @@ public class TaskManagerServicesConfiguration {
 		return preAllocateMemory;
 	}
 
+	public long getTimerServiceShutdownTimeout() {
+		return timerServiceShutdownTimeout;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Parsing of Flink configuration
 	// --------------------------------------------------------------------------------------------
@@ -161,6 +174,8 @@ public class TaskManagerServicesConfiguration {
 			ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
 			"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
 
+		long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();
+
 		return new TaskManagerServicesConfiguration(
 			remoteAddress,
 			tmpDirs,
@@ -168,7 +183,8 @@ public class TaskManagerServicesConfiguration {
 			slots,
 			configuredMemory,
 			preAllocateMemory,
-			memoryFraction);
+			memoryFraction,
+			timerServiceShutdownTimeout);
 	}
 
 	// --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index 14c9ab1..8ec9a2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.taskexecutor.slot;
 
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -35,18 +37,28 @@ import java.util.concurrent.TimeUnit;
  */
 public class TimerService<K> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(TimerService.class);
+
 	/** Executor service for the scheduled timeouts */
 	private final ScheduledExecutorService scheduledExecutorService;
 
+	/** Timeout for the shutdown of the service. */
+	private final long shutdownTimeout;
+
 	/** Map of currently active timeouts */
 	private final Map<K, Timeout<K>> timeouts;
 
 	/** Listener which is notified about occurring timeouts */
 	private TimeoutListener<K> timeoutListener;
 
-	public TimerService(final ScheduledExecutorService scheduledExecutorService) {
+	public TimerService(
+			final ScheduledExecutorService scheduledExecutorService,
+			final long shutdownTimeout) {
 		this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
 
+		Preconditions.checkArgument(shutdownTimeout >= 0L, "The shut down timeout must be larger than or equal than 0.");
+		this.shutdownTimeout = shutdownTimeout;
+
 		this.timeouts = new HashMap<>(16);
 		this.timeoutListener = null;
 	}
@@ -65,6 +77,17 @@ public class TimerService<K> {
 		timeoutListener = null;
 
 		scheduledExecutorService.shutdown();
+
+		try {
+			if(!scheduledExecutorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
+				LOG.debug("The scheduled executor service did not properly terminate. Shutting " +
+					"it down now.");
+				scheduledExecutorService.shutdownNow();
+			}
+		} catch (InterruptedException e) {
+			LOG.debug("Could not properly await the termination of the scheduled executor service.", e);
+			scheduledExecutorService.shutdownNow();
+		}
 	}
 
 	/**
@@ -103,9 +126,7 @@ public class TimerService<K> {
 	 */
 	protected void unregisterAllTimeouts() {
 		for (Timeout<K> timeout : timeouts.values()) {
-			if (timeout != null) {
-				timeout.cancel();
-			}
+			timeout.cancel();
 		}
 		timeouts.clear();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 050db44..36fd65b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -106,7 +106,7 @@ public class TaskExecutorITCase {
 		final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
 		final BroadcastVariableManager broadcastVariableManager = mock(BroadcastVariableManager.class);
 		final FileCache fileCache = mock(FileCache.class);
-		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService));
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService, 100L));
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
index 9dd5f39..cad3624 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
@@ -48,7 +48,7 @@ public class TimerServiceTest {
 		ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
 		when(scheduledExecutorService.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
 			.thenReturn(scheduledFuture);
-		TimerService<AllocationID> timerService = new TimerService<>(scheduledExecutorService);
+		TimerService<AllocationID> timerService = new TimerService<>(scheduledExecutorService, 100L);
 		TimeoutListener<AllocationID> listener = mock(TimeoutListener.class);
 
 		timerService.start(listener);


[3/6] flink git commit: [FLINK-5076] Shutting down TM when shutting down mini cluster.

Posted by tr...@apache.org.
[FLINK-5076] Shutting down TM when shutting down mini cluster.

This closes #2817.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc8254d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc8254d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc8254d4

Branch: refs/heads/flip-6
Commit: dc8254d4b3eb7333d1e3a2717e01bab051da33a1
Parents: 53f4ace
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Wed Nov 16 17:54:48 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 28 17:12:56 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/minicluster/MiniCluster.java  | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc8254d4/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 611d4c4..3ede5b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -312,6 +312,19 @@ public class MiniCluster {
 			resourceManagerRunners = null;
 		}
 
+		if (taskManagerRunners != null) {
+			for (TaskManagerRunner tm : taskManagerRunners) {
+				if (tm != null) {
+					try {
+						tm.shutDown(null);
+					} catch (Throwable t) {
+						exception = firstOrSuppressed(t, exception);
+					}
+				}
+			}
+			taskManagerRunners = null;
+		}
+
 		// shut down the RpcServices
 		exception = shutDownRpc(commonRpcService, exception);
 		exception = shutDownRpcs(jobManagerRpcServices, exception);


[4/6] flink git commit: [FLINK-5170] [akka] Extend AkkaUtils.getAkkaConfig methods to properly work with Java

Posted by tr...@apache.org.
[FLINK-5170] [akka] Extend AkkaUtils.getAkkaConfig methods to properly work with Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53f4acec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53f4acec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53f4acec

Branch: refs/heads/flip-6
Commit: 53f4acec84a8a7a9ab7c623dfec9ca7b48d7251e
Parents: 8265b54
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 28 12:10:43 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 28 17:12:56 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/rpc/RpcServiceUtils.java      | 10 +++++++++-
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 20 +++++++++++++++++++-
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53f4acec/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index d40e336..1ac54ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -50,7 +50,15 @@ public class RpcServiceUtils {
 		final ActorSystem actorSystem;
 
 		try {
-			Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+			Config akkaConfig;
+
+			if (hostname != null && !hostname.isEmpty()) {
+				// remote akka config
+				akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+			} else {
+				// local akka config
+				akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+			}
 
 			LOG.debug("Using akka configuration \n {}.", akkaConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53f4acec/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index ddb9fc3..5a620f3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -114,8 +114,26 @@ object AkkaUtils {
     createActorSystem(getDefaultAkkaConfig)
   }
 
+  /**
+    * Return a remote Akka config for the given configuration values.
+    *
+    * @param configuration containing the user provided configuration values
+    * @param hostname to bind against. If null, then the loopback interface is used
+    * @param port to bind against
+    * @return A remote Akka config
+    */
   def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = {
-    getAkkaConfig(configuration, if (hostname != null) Some((hostname, port)) else None)
+    getAkkaConfig(configuration, Some((hostname, port)))
+  }
+
+  /**
+    * Return a local Akka config for the given configuration values.
+    *
+    * @param configuration containing the user provided configuration values
+    * @return A local Akka config
+    */
+  def getAkkaConfig(configuration: Configuration): Config = {
+    getAkkaConfig(configuration, None)
   }
 
   /**