You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/09 19:24:47 UTC

[1/2] flink git commit: [FLINK-5038] [streaming runtime] Make sure Canceleables are canceled even them "cancelTask" throws an exception

Repository: flink
Updated Branches:
  refs/heads/release-1.1 2041ba02b -> 290f8a25f


[FLINK-5038] [streaming runtime] Make sure Canceleables are canceled even them "cancelTask" throws an exception


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/290f8a25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/290f8a25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/290f8a25

Branch: refs/heads/release-1.1
Commit: 290f8a25fc4127b9734f45e782391506207748bc
Parents: 32f7efc
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 9 13:09:37 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 9 20:24:10 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |  17 ++-
 .../streaming/runtime/tasks/StreamTaskTest.java | 136 +++++++++++++++----
 2 files changed, 125 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/290f8a25/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4f0839f..aaaead0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -357,8 +357,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	public final void cancel() throws Exception {
 		isRunning = false;
 		canceled = true;
-		cancelTask();
-		closeAllClosables();
+
+		// the "cancel task" call must come first, but the cancelables must be
+		// closed no matter what
+		try {
+			cancelTask();
+		}
+		finally {
+			closeAllClosables();
+		}
 	}
 
 	public final boolean isRunning() {
@@ -519,6 +526,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	public RecordWriterOutput<?>[] getStreamOutputs() {
 		return operatorChain.getStreamOutputs();
 	}
+	
+	// visible for testing!
+	Set<Closeable> getCancelables() {
+		return cancelables;
+	}
+
 
 	// ------------------------------------------------------------------------
 	//  Checkpoint and Restore

http://git-wip-us.apache.org/repos/asf/flink/blob/290f8a25/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index f6b350b..4bae710 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -55,18 +55,20 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
-
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.net.URL;
 import java.util.Collections;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -167,6 +169,27 @@ public class StreamTaskTest {
 		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
 	}
 
+	@Test
+	public void testCancellationFailsWithBlockingLock() throws Exception {
+		SYNC_LATCH = new OneShotLatch();
+
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		Task task = createTask(CancelFailingTask.class, cfg, new Configuration());
+
+		// start the task and wait until it runs
+		// execution state RUNNING is not enough, we need to wait until the stream task's run() method
+		// is entered
+		task.startTaskThread();
+		SYNC_LATCH.await();
+
+		// cancel the execution - this should lead to smooth shutdown
+		task.cancelExecution();
+		task.getExecutingThread().join();
+
+		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Utilities
 	// ------------------------------------------------------------------------
@@ -339,7 +362,7 @@ public class StreamTaskTest {
 			// we are at the point where cancelling can happen
 			SYNC_LATCH.trigger();
 
-			// just freeze this task until it is interrupted
+			// just put this to sleep until it is interrupted
 			try {
 				Thread.sleep(100000000);
 			} catch (InterruptedException ignored) {
@@ -350,8 +373,7 @@ public class StreamTaskTest {
 
 		@Override
 		protected void cleanup() {
-			holder.cancel();
-			holder.interrupt();
+			holder.close();
 		}
 
 		@Override
@@ -360,38 +382,100 @@ public class StreamTaskTest {
 			// do not interrupt the lock holder here, to simulate spawned threads that
 			// we cannot properly interrupt on cancellation
 		}
+	}
 
+	/**
+	 * A task that locks if cancellation attempts to cleanly shut down 
+	 */
+	public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> {
 
-		private static final class LockHolder extends Thread {
+		@Override
+		protected void init() {}
 
-			private final OneShotLatch trigger;
-			private final Object lock;
-			private volatile boolean canceled;
+		@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+		@Override
+		protected void run() throws Exception {
+			final OneShotLatch latch = new OneShotLatch();
+			final Object lock = new Object();
 
-			private LockHolder(Object lock, OneShotLatch trigger) {
-				this.lock = lock;
-				this.trigger = trigger;
-			}
+			LockHolder holder = new LockHolder(lock, latch);
+			holder.start();
+			try {
+				// cancellation should try and cancel this
+				Set<Closeable> canceleables = getCancelables();
+				synchronized (canceleables) {
+					canceleables.add(holder);
+				}
+
+				// wait till the lock holder has the lock
+				latch.await();
 
-			@Override
-			public void run() {
+				// we are at the point where cancelling can happen
+				SYNC_LATCH.trigger();
+	
+				// try to acquire the lock - this is not possible as long as the lock holder
+				// thread lives
 				synchronized (lock) {
-					while (!canceled) {
-						// signal that we grabbed the lock
-						trigger.trigger();
-
-						// basically freeze this thread
-						try {
-							//noinspection SleepWhileHoldingLock
-							Thread.sleep(1000000000);
-						} catch (InterruptedException ignored) {}
-					}
+					// nothing
 				}
 			}
+			finally {
+				holder.close();
+			}
+
+		}
+
+		@Override
+		protected void cleanup() {}
+
+		@Override
+		protected void cancelTask() throws Exception {
+			throw new Exception("test exception");
+		}
+
+	}
+
+	// ------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
 
-			public void cancel() {
-				canceled = true;
+	/**
+	 * A thread that holds a lock as long as it lives
+	 */
+	private static final class LockHolder extends Thread implements Closeable {
+
+		private final OneShotLatch trigger;
+		private final Object lock;
+		private volatile boolean canceled;
+
+		private LockHolder(Object lock, OneShotLatch trigger) {
+			this.lock = lock;
+			this.trigger = trigger;
+		}
+
+		@Override
+		public void run() {
+			synchronized (lock) {
+				while (!canceled) {
+					// signal that we grabbed the lock
+					trigger.trigger();
+
+					// basically freeze this thread
+					try {
+						//noinspection SleepWhileHoldingLock
+						Thread.sleep(1000000000);
+					} catch (InterruptedException ignored) {}
+				}
 			}
 		}
+
+		public void cancel() {
+			canceled = true;
+		}
+
+		@Override
+		public void close() {
+			canceled = true;
+			interrupt();
+		}
 	}
 }


[2/2] flink git commit: [hotfix] [web frontend] Reduce log level to DEBUG for requests againt unavailable jobs

Posted by se...@apache.org.
[hotfix] [web frontend] Reduce log level to DEBUG for requests againt unavailable jobs

This reduces log noise by a lot in cases where browsers are kept open and re-request
the status of old jobs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32f7efc8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32f7efc8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32f7efc8

Branch: refs/heads/release-1.1
Commit: 32f7efc86c903b33e139721d65adfff43d138f62
Parents: 2041ba0
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 9 16:52:27 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 9 20:24:10 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/32f7efc8/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index be7f952..c248f5d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -100,7 +100,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 					: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
 			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
 			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
-			LOG.warn("Error while handling request", e);
+			LOG.debug("Error while handling request", e);
 		}
 		catch (Exception e) {
 			byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);