You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:58 UTC

[39/52] [abbrv] flink git commit: [FLINK-5093] Fix bug about throwing ConcurrentModificationException when stopping TimerService.

[FLINK-5093] Fix bug about throwing ConcurrentModificationException when stopping TimerService.

[FLINK-5093] Remove useless import.

This closes #2828.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eefcbbde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eefcbbde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eefcbbde

Branch: refs/heads/master
Commit: eefcbbded159ab5819fe2b09606f8a33b9150254
Parents: c424900
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Fri Nov 18 18:15:37 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:26 2016 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/slot/TimerService.java | 16 ++++-
 .../taskexecutor/slot/TimerServiceTest.java     | 68 ++++++++++++++++++++
 2 files changed, 81 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eefcbbde/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index e28e801..14c9ab1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -60,9 +60,7 @@ public class TimerService<K> {
 	}
 
 	public void stop() {
-		for (K key: timeouts.keySet()) {
-			unregisterTimeout(key);
-		}
+		unregisterAllTimeouts();
 
 		timeoutListener = null;
 
@@ -101,6 +99,18 @@ public class TimerService<K> {
 	}
 
 	/**
+	 * Unregister all timeouts.
+	 */
+	protected void unregisterAllTimeouts() {
+		for (Timeout<K> timeout : timeouts.values()) {
+			if (timeout != null) {
+				timeout.cancel();
+			}
+		}
+		timeouts.clear();
+	}
+
+	/**
 	 * Check whether the timeout for the given key and ticket is still valid (not yet unregistered
 	 * and not yet overwritten).
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/eefcbbde/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
new file mode 100644
index 0000000..9dd5f39
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TimerServiceTest {
+	/**
+	 * Test all timeouts registered can be unregistered
+	 * @throws Exception
+   */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testUnregisterAllTimeouts() throws Exception {
+		// Prepare all instances.
+		ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+		ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
+		when(scheduledExecutorService.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
+			.thenReturn(scheduledFuture);
+		TimerService<AllocationID> timerService = new TimerService<>(scheduledExecutorService);
+		TimeoutListener<AllocationID> listener = mock(TimeoutListener.class);
+
+		timerService.start(listener);
+
+		// Invoke register and unregister.
+		timerService.registerTimeout(new AllocationID(), 10, TimeUnit.SECONDS);
+		timerService.registerTimeout(new AllocationID(), 10, TimeUnit.SECONDS);
+
+		timerService.unregisterAllTimeouts();
+
+		// Verify.
+		Map<?, ?> timeouts = (Map<?, ?>) Whitebox.getInternalState(timerService, "timeouts");
+		assertTrue(timeouts.isEmpty());
+		verify(scheduledFuture, times(2)).cancel(true);
+	}
+
+}