You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/10 10:52:32 UTC

flink git commit: [FLINK-2339] Prevent asynchronous checkpoint calls from overtaking each other

Repository: flink
Updated Branches:
  refs/heads/master c7ec74e45 -> cbde2c2a3


[FLINK-2339] Prevent asynchronous checkpoint calls from overtaking each other


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cbde2c2a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cbde2c2a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cbde2c2a

Branch: refs/heads/master
Commit: cbde2c2a3d71e17990d76d603e1bb6d275c888be
Parents: c7ec74e
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 9 16:34:58 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 9 16:34:58 2015 +0200

----------------------------------------------------------------------
 .../io/network/api/TaskEventHandler.java        |   4 +-
 .../taskmanager/DispatherThreadFactory.java     |  50 ++++
 .../flink/runtime/taskmanager/MemoryLogger.java |  14 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  69 +++++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  | 247 +++++++++++++++++++
 5 files changed, 370 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cbde2c2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
index 95fce96..ccd0feb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.util.event.EventListener;
  */
 public class TaskEventHandler {
 
-	// Listeners for each event type
+	/** Listeners for each event type */
 	private final Multimap<Class<? extends TaskEvent>, EventListener<TaskEvent>> listeners = HashMultimap.create();
 
 	public void subscribe(EventListener<TaskEvent> listener, Class<? extends TaskEvent> eventType) {
@@ -45,7 +45,7 @@ public class TaskEventHandler {
 	}
 
 	/**
-	 * Publishes the task event to all subscribed event listeners..
+	 * Publishes the task event to all subscribed event listeners.
 	 *
 	 * @param event The event to publish.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/cbde2c2a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java
new file mode 100644
index 0000000..f5f1565
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatherThreadFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Thread factory that creates threads with a given name, associates them with a given
+ * thread group, and set them to daemon mode.
+ */
+public class DispatherThreadFactory implements ThreadFactory {
+	
+	private final ThreadGroup group;
+	
+	private final String threadName;
+	
+	/**
+	 * Creates a new thread factory.
+	 * 
+	 * @param group The group that the threads will be associated with.
+	 * @param threadName The name for the threads.
+	 */
+	public DispatherThreadFactory(ThreadGroup group, String threadName) {
+		this.group = group;
+		this.threadName = threadName;
+	}
+
+	@Override
+	public Thread newThread(Runnable r) {
+		Thread t = new Thread(group, r, threadName);
+		t.setDaemon(true);
+		return t;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cbde2c2a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
index 5c821e9..9258482 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -53,12 +53,16 @@ public class MemoryLogger extends Thread {
 	private final ActorSystem monitored;
 	
 	private volatile boolean running = true;
-
 	
-	public MemoryLogger(Logger logger, long interval) {
-		this(logger, interval, null);
-	}
-		
+	/**
+	 * Creates a new memory logger that logs in the given interval and lives as long as the
+	 * given actor system.
+	 * 
+	 * @param logger The logger to use for outputting the memory statistics.
+	 * @param interval The interval in which the thread logs.
+	 * @param monitored The actor system to whose life the thread is bound. The thread terminates
+	 *                  once the actor system terminates.   
+	 */
 	public MemoryLogger(Logger logger, long interval, ActorSystem monitored) {
 		super("Memory Logger");
 		setDaemon(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/cbde2c2a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 25ad28d..d9168e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
 import akka.util.Timeout;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
@@ -64,7 +65,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -188,6 +192,7 @@ public class Task implements Runnable {
 	//  proper happens-before semantics on parallel modification
 	// ------------------------------------------------------------------------
 
+	/** atomic flag that makes sure the invokable is canceled exactly once upon error */
 	private final AtomicBoolean invokableHasBeenCanceled;
 	
 	/** The invokable of this task, if initialized */
@@ -199,6 +204,9 @@ public class Task implements Runnable {
 	/** The observed exception, in case the task execution failed */
 	private volatile Throwable failureCause;
 
+	/** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized */
+	private volatile ExecutorService asyncCallDispatcher;
+	
 	/** The handle to the state that the operator was initialized with. Will be set to null after the
 	 * initialization, to be memory friendly */
 	private volatile SerializedValue<StateHandle<?>> operatorState;
@@ -290,11 +298,11 @@ public class Task implements Runnable {
 			this.inputGates[i] = gate;
 			inputGatesById.put(gate.getConsumedResultId(), gate);
 		}
+
+		invokableHasBeenCanceled = new AtomicBoolean(false);
 		
 		// finally, create the executing thread, but do not start it
 		executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
-		
-		invokableHasBeenCanceled = new AtomicBoolean(false);
 	}
 
 	// ------------------------------------------------------------------------
@@ -646,9 +654,17 @@ public class Task implements Runnable {
 			try {
 				LOG.info("Freeing task resources for " + taskNameWithSubtask);
 				
+				// stop the async dispatcher.
+				// copy dispatcher reference to stack, against concurrent release
+				ExecutorService dispatcher = this.asyncCallDispatcher;
+				if (dispatcher != null && !dispatcher.isShutdown()) {
+					dispatcher.shutdownNow();
+				}
+				
 				// free the network resources
 				network.unregisterTask(this);
 
+				// free memory resources
 				if (invokable != null) {
 					memoryManager.releaseAll(invokable);
 				}
@@ -797,6 +813,7 @@ public class Task implements Runnable {
 						Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask);
 						Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
 								"Canceler for " + taskNameWithSubtask);
+						cancelThread.setDaemon(true);
 						cancelThread.start();
 					}
 					return;
@@ -955,11 +972,49 @@ public class Task implements Runnable {
 			LOG.debug("Ignoring partition state notification for not running task.");
 		}
 	}
-	
+
+	/**
+	 * Utility method to dispatch an asynchronous call on the invokable.
+	 * 
+	 * @param runnable The async call runnable.
+	 * @param callName The name of the call, for logging purposes.
+	 */
 	private void executeAsyncCallRunnable(Runnable runnable, String callName) {
-		Thread thread = new Thread(runnable, callName);
-		thread.setDaemon(true);
-		thread.start();
+		// make sure the executor is initialized. lock against concurrent calls to this function
+		synchronized (this) {
+			if (isCanceledOrFailed()) {
+				return;
+			}
+			
+			// get ourselves a reference on the stack that cannot be concurrently modified
+			ExecutorService executor = this.asyncCallDispatcher;
+			if (executor == null) {
+				// first time use, initialize
+				executor = Executors.newSingleThreadExecutor(
+						new DispatherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + taskNameWithSubtask));
+				this.asyncCallDispatcher = executor;
+				
+				// double-check for execution state, and make sure we clean up after ourselves
+				// if we created the dispatcher while the task was concurrently canceled
+				if (isCanceledOrFailed()) {
+					executor.shutdown();
+					asyncCallDispatcher = null;
+					return;
+				}
+			}
+
+			LOG.debug("Invoking async call {} on task {}", callName, taskNameWithSubtask);
+
+			try {
+				executor.submit(runnable);
+			}
+			catch (RejectedExecutionException e) {
+				// may be that we are concurrently canceled. if not, report that something is fishy
+				if (!isCanceledOrFailed()) {
+					throw new RuntimeException("Async call was rejected, even though the task was not canceled.", e);
+				}
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -1051,7 +1106,7 @@ public class Task implements Runnable {
 
 					executer.interrupt();
 					try {
-						executer.join(5000);
+						executer.join(10000);
 					}
 					catch (InterruptedException e) {
 						// we can ignore this

http://git-wip-us.apache.org/repos/asf/flink/blob/cbde2c2a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
new file mode 100644
index 0000000..618c01f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TaskAsyncCallTest {
+
+	private static final int NUM_CALLS = 1000;
+	
+	private static ActorSystem actorSystem;
+	
+	private static OneShotLatch awaitLatch;
+	private static OneShotLatch triggerLatch;
+
+	// ------------------------------------------------------------------------
+	//  Init & Shutdown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void startActorSystem() {
+		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void shutdown() {
+		actorSystem.shutdown();
+		actorSystem.awaitTermination();
+	}
+
+	@Before
+	public void createQueuesAndActors() {
+		awaitLatch = new OneShotLatch();
+		triggerLatch = new OneShotLatch();
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Tests 
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testCheckpointCallsInOrder() {
+		try {
+			Task task = createTask();
+			task.startTaskThread();
+			
+			awaitLatch.await();
+			
+			for (int i = 1; i <= NUM_CALLS; i++) {
+				task.triggerCheckpointBarrier(i, 156865867234L);
+			}
+			
+			triggerLatch.await();
+			
+			assertFalse(task.isCanceledOrFailed());
+			assertEquals(ExecutionState.RUNNING, task.getExecutionState());
+			
+			task.cancelExecution();
+			task.getExecutingThread().join();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMixedAsyncCallsInOrder() {
+		try {
+			Task task = createTask();
+			task.startTaskThread();
+
+			awaitLatch.await();
+
+			for (int i = 1; i <= NUM_CALLS; i++) {
+				task.triggerCheckpointBarrier(i, 156865867234L);
+				task.confirmCheckpoint(i, null);
+			}
+
+			triggerLatch.await();
+
+			assertFalse(task.isCanceledOrFailed());
+			assertEquals(ExecutionState.RUNNING, task.getExecutionState());
+
+			task.cancelExecution();
+			task.getExecutingThread().join();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static Task createTask() {
+		
+		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
+		when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
+		
+		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+		when(networkEnvironment.getPartitionManager()).thenReturn(partitionManager);
+		when(networkEnvironment.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+		when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+
+		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+				new JobID(), new JobVertexID(), new ExecutionAttemptID(),
+				"Test Task", 0, 1,
+				new Configuration(), new Configuration(),
+				CheckpointsInOrderInvokable.class.getName(),
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				Collections.<BlobKey>emptyList(),
+				0);
+
+		return new Task(tdd,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				networkEnvironment,
+				mock(BroadcastVariableManager.class),
+				actorSystem.actorOf(Props.create(BlackHoleActor.class)),
+				actorSystem.actorOf(Props.create(BlackHoleActor.class)),
+				new FiniteDuration(60, TimeUnit.SECONDS),
+				libCache,
+				mock(FileCache.class));
+	}
+	
+	public static class CheckpointsInOrderInvokable extends AbstractInvokable
+			implements CheckpointedOperator, CheckpointCommittingOperator {
+
+		private volatile long lastCheckpointId = 0;
+		
+		private volatile Exception error;
+		
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() throws Exception {
+			awaitLatch.trigger();
+			
+			// wait forever (until canceled)
+			synchronized (this) {
+				while (error == null && lastCheckpointId < NUM_CALLS) {
+					wait();
+				}
+			}
+			
+			triggerLatch.trigger();
+			if (error != null) {
+				throw error;
+			}
+		}
+
+		@Override
+		public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+			lastCheckpointId++;
+			if (checkpointId == lastCheckpointId) {
+				if (lastCheckpointId == NUM_CALLS) {
+					triggerLatch.trigger();
+				}
+			}
+			else if (this.error == null) {
+				this.error = new Exception("calls out of order");
+				synchronized (this) {
+					notifyAll();
+				}
+			}
+		}
+
+		@Override
+		public void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception {
+			if (checkpointId != lastCheckpointId && this.error == null) {
+				this.error = new Exception("calls out of order");
+				synchronized (this) {
+					notifyAll();
+				}
+			}
+		}
+	}
+	
+	public static class BlackHoleActor extends UntypedActor {
+
+		@Override
+		public void onReceive(Object message) {}
+	}
+}