You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:58 UTC
[39/52] [abbrv] flink git commit: [FLINK-5093] Fix bug about throwing
ConcurrentModificationException when stopping TimerService.
[FLINK-5093] Fix bug about throwing ConcurrentModificationException when stopping TimerService.
[FLINK-5093] Remove useless import.
This closes #2828.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eefcbbde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eefcbbde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eefcbbde
Branch: refs/heads/master
Commit: eefcbbded159ab5819fe2b09606f8a33b9150254
Parents: c424900
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Fri Nov 18 18:15:37 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:26 2016 +0100
----------------------------------------------------------------------
.../runtime/taskexecutor/slot/TimerService.java | 16 ++++-
.../taskexecutor/slot/TimerServiceTest.java | 68 ++++++++++++++++++++
2 files changed, 81 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eefcbbde/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index e28e801..14c9ab1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -60,9 +60,7 @@ public class TimerService<K> {
}
public void stop() {
- for (K key: timeouts.keySet()) {
- unregisterTimeout(key);
- }
+ unregisterAllTimeouts();
timeoutListener = null;
@@ -101,6 +99,18 @@ public class TimerService<K> {
}
/**
+ * Unregister all timeouts.
+ */
+ protected void unregisterAllTimeouts() {
+ for (Timeout<K> timeout : timeouts.values()) {
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ }
+ timeouts.clear();
+ }
+
+ /**
* Check whether the timeout for the given key and ticket is still valid (not yet unregistered
* and not yet overwritten).
*
http://git-wip-us.apache.org/repos/asf/flink/blob/eefcbbde/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
new file mode 100644
index 0000000..9dd5f39
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TimerServiceTest {
+ /**
+ * Test all timeouts registered can be unregistered
+ * @throws Exception
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testUnregisterAllTimeouts() throws Exception {
+ // Prepare all instances.
+ ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+ ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
+ when(scheduledExecutorService.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
+ .thenReturn(scheduledFuture);
+ TimerService<AllocationID> timerService = new TimerService<>(scheduledExecutorService);
+ TimeoutListener<AllocationID> listener = mock(TimeoutListener.class);
+
+ timerService.start(listener);
+
+ // Invoke register and unregister.
+ timerService.registerTimeout(new AllocationID(), 10, TimeUnit.SECONDS);
+ timerService.registerTimeout(new AllocationID(), 10, TimeUnit.SECONDS);
+
+ timerService.unregisterAllTimeouts();
+
+ // Verify.
+ Map<?, ?> timeouts = (Map<?, ?>) Whitebox.getInternalState(timerService, "timeouts");
+ assertTrue(timeouts.isEmpty());
+ verify(scheduledFuture, times(2)).cancel(true);
+ }
+
+}