You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/16 16:07:59 UTC

[1/6] flink git commit: [FLINK-5800] [checkpointing] Create CheckpointSteamFactory only once per operator

Repository: flink
Updated Branches:
  refs/heads/release-1.2 e9ada34f2 -> d3f2fe262


[FLINK-5800] [checkpointing] Create CheckpointSteamFactory only once per operator

Previously, the factory was created once per checkpoint, and its repeated initialization logic
(like ensuring existence of base paths) caused heavy load on some filesystems at very large scale.

This closes #3312


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c93a4c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c93a4c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c93a4c9

Branch: refs/heads/release-1.2
Commit: 2c93a4c93b68d5ea39d5ea71154a4212409da445
Parents: 8837c0c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 14 18:35:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 16 14:51:01 2017 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   | 44 ++++++++++--
 .../operators/StreamCheckpointedOperator.java   |  4 ++
 .../streaming/api/operators/StreamOperator.java | 26 ++++---
 .../streaming/runtime/tasks/StreamTask.java     | 48 ++-----------
 .../operators/AbstractStreamOperatorTest.java   | 18 +++--
 .../AbstractUdfStreamOperatorLifecycleTest.java | 15 +++-
 .../streaming/runtime/tasks/StreamTaskTest.java | 73 ++++++++------------
 .../util/AbstractStreamOperatorTestHarness.java | 16 ++---
 .../streaming/runtime/StateBackendITCase.java   |  5 +-
 9 files changed, 122 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c93a4c9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 9d56626..71d5501 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -114,6 +114,10 @@ public abstract class AbstractStreamOperator<OUT>
 	/** The runtime context for UDFs */
 	private transient StreamingRuntimeContext runtimeContext;
 
+	// ----------------- general state -------------------
+
+	/** The factory that give this operator access to checkpoint storage */
+	private transient CheckpointStreamFactory checkpointStreamFactory;
 
 	// ---------------- key/value state ------------------
 
@@ -127,10 +131,11 @@ public abstract class AbstractStreamOperator<OUT>
 	/** Keyed state store view on the keyed backend */
 	private transient DefaultKeyedStateStore keyedStateStore;
 
+	// ---------------- operator state ------------------
+
 	/** Operator state backend / store */
 	private transient OperatorStateBackend operatorStateBackend;
 
-
 	// --------------- Metrics ---------------------------
 
 	/** Metric group for the operator */
@@ -212,6 +217,8 @@ public abstract class AbstractStreamOperator<OUT>
 			}
 		}
 
+		checkpointStreamFactory = container.createCheckpointStreamFactory(this);
+
 		initOperatorState(operatorStateHandlesBackend);
 
 		StateInitializationContext initializationContext = new StateInitializationContextImpl(
@@ -333,8 +340,7 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	@Override
-	public final OperatorSnapshotResult snapshotState(
-			long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
+	public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception {
 
 		KeyGroupRange keyGroupRange = null != keyedStateBackend ?
 				keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
@@ -344,7 +350,7 @@ public abstract class AbstractStreamOperator<OUT>
 		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
 				checkpointId,
 				timestamp,
-				streamFactory,
+				checkpointStreamFactory,
 				keyGroupRange,
 				getContainingTask().getCancelables())) {
 
@@ -355,14 +361,14 @@ public abstract class AbstractStreamOperator<OUT>
 
 			if (null != operatorStateBackend) {
 				snapshotInProgress.setOperatorStateManagedFuture(
-					operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory));
+					operatorStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
 			}
 
 			if (null != keyedStateBackend) {
 				snapshotInProgress.setKeyedStateManagedFuture(
-					keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory));
+					keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
 			}
-		}  catch (Exception snapshotException) {
+		} catch (Exception snapshotException) {
 			try {
 				snapshotInProgress.cancel();
 			} catch (Exception e) {
@@ -422,6 +428,30 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 	}
 
+	@SuppressWarnings("deprecation")
+	@Deprecated
+	@Override
+	public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+		if (this instanceof StreamCheckpointedOperator) {
+
+			final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
+					checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+			getContainingTask().getCancelables().registerClosable(outStream);
+
+			try {
+				((StreamCheckpointedOperator) this).snapshotState(outStream, checkpointId, timestamp);
+				return outStream.closeAndGetHandle();
+			}
+			finally {
+				getContainingTask().getCancelables().unregisterClosable(outStream);
+				outStream.close();
+			}
+		} else {
+			return null;
+		}
+	}
+
 	/**
 	 * Stream operators with state which can be restored need to override this hook method.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/2c93a4c9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
index f93e7ea..a28cdc4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.core.fs.FSDataOutputStream;
 
+/**
+ * This interface is deprecated without replacement.
+ * All operators are now checkpointed.
+ */
 @Deprecated
 public interface StreamCheckpointedOperator extends CheckpointedRestoringOperator {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2c93a4c9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index f6e5472..d8e4d08 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -93,18 +93,24 @@ public interface StreamOperator<OUT> extends Serializable {
 	/**
 	 * Called to draw a state snapshot from the operator.
 	 *
-	 * @throws Exception Forwards exceptions that occur while preparing for the snapshot
-	 */
-
-	/**
-	 * Called to draw a state snapshot from the operator.
-	 *
 	 * @return a runnable future to the state handle that points to the snapshotted state. For synchronous implementations,
 	 * the runnable might already be finished.
+	 * 
 	 * @throws Exception exception that happened during snapshotting.
 	 */
-	OperatorSnapshotResult snapshotState(
-			long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception;
+	OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception;
+
+	/**
+	 * Takes a snapshot of the legacy operator state defined via {@link StreamCheckpointedOperator}.
+	 * 
+	 * @return The handle to the legacy operator state, or null, if no state was snapshotted.
+	 * @throws Exception This method should forward any type of exception that happens during snapshotting.
+	 * 
+	 * @deprecated This method will be removed as soon as no more operators use the legacy state code paths
+	 */
+	@SuppressWarnings("deprecation")
+	@Deprecated
+	StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception;
 
 	/**
 	 * Provides state handles to restore the operator state.
@@ -134,6 +140,6 @@ public interface StreamOperator<OUT> extends Serializable {
 	ChainingStrategy getChainingStrategy();
 
 	void setChainingStrategy(ChainingStrategy strategy);
-	
+
 	MetricGroup getMetricGroup();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2c93a4c9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index cdcd6b4..1e05f19 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -52,7 +52,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -1042,8 +1041,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		// ------------------------
 
-		private CheckpointStreamFactory streamFactory;
-
 		private final List<StreamStateHandle> nonPartitionedStates;
 		private final List<OperatorSnapshotResult> snapshotInProgressList;
 
@@ -1122,15 +1119,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			}
 		}
 
+		@SuppressWarnings("deprecation")
 		private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 			if (null != op) {
-				createStreamFactory(op);
-				snapshotNonPartitionableState(op);
+				// first call the legacy checkpoint code paths 
+				nonPartitionedStates.add(op.snapshotLegacyOperatorState(
+						checkpointMetaData.getCheckpointId(),
+						checkpointMetaData.getTimestamp()));
 
 				OperatorSnapshotResult snapshotInProgress = op.snapshotState(
 						checkpointMetaData.getCheckpointId(),
-						checkpointMetaData.getTimestamp(),
-						streamFactory);
+						checkpointMetaData.getTimestamp());
 
 				snapshotInProgressList.add(snapshotInProgress);
 			} else {
@@ -1140,41 +1139,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			}
 		}
 
-		private void createStreamFactory(StreamOperator<?> operator) throws IOException {
-			String operatorId = owner.createOperatorIdentifier(operator, owner.configuration.getVertexID());
-			this.streamFactory = owner.stateBackend.createStreamFactory(owner.getEnvironment().getJobID(), operatorId);
-		}
-
-		//TODO deprecated code path
-		private void snapshotNonPartitionableState(StreamOperator<?> operator) throws Exception {
-
-			StreamStateHandle stateHandle = null;
-
-			if (operator instanceof StreamCheckpointedOperator) {
-
-				CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-						streamFactory.createCheckpointStateOutputStream(
-								checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
-
-				owner.cancelables.registerClosable(outStream);
-
-				try {
-					((StreamCheckpointedOperator) operator).
-							snapshotState(
-									outStream,
-									checkpointMetaData.getCheckpointId(),
-									checkpointMetaData.getTimestamp());
-
-					stateHandle = outStream.closeAndGetHandle();
-				} finally {
-					owner.cancelables.unregisterClosable(outStream);
-					outStream.close();
-				}
-			}
-
-			nonPartitionedStates.add(stateHandle);
-		}
-
 		public void runAsyncCheckpointingAndAcknowledge() throws IOException {
 
 			AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(

http://git-wip-us.apache.org/repos/asf/flink/blob/2c93a4c9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 409a732..274611a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -55,7 +55,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -491,16 +490,15 @@ public class AbstractStreamOperatorTest {
 		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
 
 		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
-
-		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+		
 		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
-		operator.snapshotState(checkpointId, timestamp, streamFactory);
+		operator.snapshotState(checkpointId, timestamp);
 
 		verify(context).close();
 	}
@@ -522,19 +520,18 @@ public class AbstractStreamOperatorTest {
 
 		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
 
-		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
 		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
 		// lets fail when calling the actual snapshotState method
 		doThrow(failingException).when(operator).snapshotState(eq(context));
 
 		try {
-			operator.snapshotState(checkpointId, timestamp, streamFactory);
+			operator.snapshotState(checkpointId, timestamp);
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());
@@ -574,7 +571,7 @@ public class AbstractStreamOperatorTest {
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
 		RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
@@ -587,9 +584,10 @@ public class AbstractStreamOperatorTest {
 
 		Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
 		Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
+		Whitebox.setInternalState(operator, "checkpointStreamFactory", streamFactory);
 
 		try {
-			operator.snapshotState(checkpointId, timestamp, streamFactory);
+			operator.snapshotState(checkpointId, timestamp);
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());

http://git-wip-us.apache.org/repos/asf/flink/blob/2c93a4c9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 965aec6..357163c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -60,6 +61,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 			"UDF::open",
 			"OPERATOR::run",
 			"UDF::run",
+			"OPERATOR::snapshotLegacyOperatorState",
 			"OPERATOR::snapshotState",
 			"OPERATOR::close",
 			"UDF::close",
@@ -86,8 +88,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 			"setKeyContextElement2[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
 			"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
 			"org.apache.flink.streaming.api.graph.StreamConfig, interface " +
-			"org.apache.flink.streaming.api.operators.Output], snapshotState[long, long, " +
-			"interface org.apache.flink.runtime.state.CheckpointStreamFactory]]";
+			"org.apache.flink.streaming.api.operators.Output], " +
+			"snapshotLegacyOperatorState[long, long], " +
+			"snapshotState[long, long]]";
 
 	private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" +
 			", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
@@ -198,7 +201,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 	}
 
 	private static class LifecycleTrackingStreamSource<OUT, SRC extends SourceFunction<OUT>>
-			extends StreamSource<OUT, SRC> implements Serializable {
+			extends StreamSource<OUT, SRC> implements Serializable, StreamCheckpointedOperator {
 
 		private static final long serialVersionUID = 2431488948886850562L;
 		private transient Thread testCheckpointer;
@@ -254,6 +257,12 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		}
 
 		@Override
+		public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState");
+			return super.snapshotLegacyOperatorState(checkpointId, timestamp);
+		}
+
+		@Override
 		public void initializeState(StateInitializationContext context) throws Exception {
 			ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState");
 			super.initializeState(context);

http://git-wip-us.apache.org/repos/asf/flink/blob/2c93a4c9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e24defa..6075dc5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -94,6 +94,7 @@ import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import scala.concurrent.Await;
@@ -134,6 +135,8 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StreamTask.class)
+@PowerMockIgnore("org.apache.log4j.*")
+@SuppressWarnings("deprecation")
 public class StreamTaskTest extends TestLogger {
 
 	private static OneShotLatch SYNC_LATCH;
@@ -288,49 +291,42 @@ public class StreamTaskTest extends TestLogger {
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 		streamTask.setEnvironment(mockEnvironment);
 
+		// mock the operators
 		StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
+		// mock the returned snapshots
 		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
 		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
 
 		final Exception testException = new Exception("Test exception");
 
-		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
-		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
-		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenThrow(testException);
-
-		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
-
-		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
-		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+		when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong())).thenThrow(testException);
 
+		// mock the returned legacy snapshots
 		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
 
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
+		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
+		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
 
-		when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
-		when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
-		when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+		// set up the task
 
-		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
-		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
-			outStream1, outStream2, outStream3);
+		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
 
-		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
-		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
 
 		Whitebox.setInternalState(streamTask, "isRunning", true);
 		Whitebox.setInternalState(streamTask, "lock", new Object());
 		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
 		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
-		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
 		try {
 			streamTask.triggerCheckpoint(checkpointMetaData);
@@ -370,6 +366,8 @@ public class StreamTaskTest extends TestLogger {
 		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
+		// mock the new state handles / futures
+
 		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
 		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
 		OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class);
@@ -379,33 +377,23 @@ public class StreamTaskTest extends TestLogger {
 
 		when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
 
-		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
-		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
-		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult3);
-
-		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
-
-		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
-		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+		when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult3);
 
+		// mock the legacy state snapshot
 		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
 
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-
-		when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
-		when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
-		when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
+		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
+		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
 
-		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
-		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
-			outStream1, outStream2, outStream3);
+		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
 
-		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
-		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
 
 		Whitebox.setInternalState(streamTask, "isRunning", true);
 		Whitebox.setInternalState(streamTask, "lock", new Object());
@@ -413,7 +401,6 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
 		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", MoreExecutors.newDirectExecutorService());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
-		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
 		streamTask.triggerCheckpoint(checkpointMetaData);
 
@@ -478,7 +465,7 @@ public class StreamTaskTest extends TestLogger {
 			new DoneFuture<>(managedOperatorStateHandle),
 			new DoneFuture<>(rawOperatorStateHandle));
 
-		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+		when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
 
 		StreamOperator<?>[] streamOperators = {streamOperator};
 
@@ -594,7 +581,7 @@ public class StreamTaskTest extends TestLogger {
 			new DoneFuture<>(managedOperatorStateHandle),
 			new DoneFuture<>(rawOperatorStateHandle));
 
-		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+		when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
 
 		StreamOperator<?>[] streamOperators = {streamOperator};
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2c93a4c9/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 568410a..2df4efd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -459,15 +459,11 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	}
 
 	/**
-	 * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}.
+	 * Calls {@link StreamOperator#snapshotState(long, long)}.
 	 */
 	public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
 
-		CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(
-				new JobID(),
-				"test_op");
-
-		OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp, streamFactory);
+		OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp);
 
 		KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
 		KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
@@ -475,14 +471,13 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
 		OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
 
-		OperatorStateHandles handles = new OperatorStateHandles(
+		return new OperatorStateHandles(
 			0,
 			null,
 			keyedManaged != null ? Collections.singletonList(keyedManaged) : null,
 			keyedRaw != null ? Collections.singletonList(keyedRaw) : null,
 			opManaged != null ? Collections.singletonList(opManaged) : null,
 			opRaw != null ? Collections.singletonList(opRaw) : null);
-		return handles;
 	}
 
 	/**
@@ -490,6 +485,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	 * the operator implements this interface.
 	 */
 	@Deprecated
+	@SuppressWarnings("deprecation")
 	public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
 
 		CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
@@ -513,7 +509,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	/**
 	 * Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)} if
 	 * the operator implements this interface.
-	 */	@Deprecated
+	 */
+	@Deprecated
+	@SuppressWarnings("deprecation")
 	public void restore(StreamStateHandle snapshot) throws Exception {
 		if(operator instanceof StreamCheckpointedOperator) {
 			try (FSDataInputStream in = snapshot.openInputStream()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2c93a4c9/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 0e62fbb..ec6a8f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
@@ -81,9 +82,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		}
 		catch (JobExecutionException e) {
 			Throwable t = e.getCause();
-			if (!(t != null && t.getCause() instanceof SuccessException)) {
-				throw e;
-			}
+			assertTrue("wrong exception", t instanceof SuccessException);
 		}
 	}
 


[3/6] flink git commit: [hotfix] [streaming] Properly mark non-transported fields as transient in AbstractStreamOperator

Posted by se...@apache.org.
[hotfix] [streaming] Properly mark non-transported fields as transient in AbstractStreamOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8837c0c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8837c0c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8837c0c5

Branch: refs/heads/release-1.2
Commit: 8837c0c5f357c52c15e139ed0697e3710c610db9
Parents: 8fd374c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 14 18:16:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 16 14:51:01 2017 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/operators/AbstractStreamOperator.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8837c0c5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 16fe2c1..9d56626 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -134,9 +134,9 @@ public abstract class AbstractStreamOperator<OUT>
 	// --------------- Metrics ---------------------------
 
 	/** Metric group for the operator */
-	protected MetricGroup metrics;
+	protected transient MetricGroup metrics;
 
-	protected LatencyGauge latencyGauge;
+	protected transient LatencyGauge latencyGauge;
 
 	// ---------------- timers ------------------
 


[4/6] flink git commit: [hotfix] [tests] Stabilize FastFailuresITCase

Posted by se...@apache.org.
[hotfix] [tests] Stabilize FastFailuresITCase

The test triggers 200 immediate failures and recoveries. The restart strategy allowed 200 restarts.

It may happen that another failure occurs as during the execution, in which case the restart attempts are not
sufficient.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3429ea0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3429ea0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3429ea0a

Branch: refs/heads/release-1.2
Commit: 3429ea0aa7478fa3161742592a26c87660ff4617
Parents: 2c93a4c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 15 18:49:29 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 16 14:51:20 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/test/recovery/FastFailuresITCase.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3429ea0a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
index 8a43ee4..d80c826 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -58,7 +58,7 @@ public class FastFailuresITCase extends TestLogger {
 		env.getConfig().disableSysoutLogging();
 		env.setParallelism(4);
 		env.enableCheckpointing(1000);
-		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(200, 0));
+		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(210, 0));
 		
 		DataStream<Tuple2<Integer, Integer>> input = env.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
 


[5/6] flink git commit: [FLINK-5814] [build] Fix packaging flink-dist in unclean source directory

Posted by se...@apache.org.
[FLINK-5814] [build] Fix packaging flink-dist in unclean source directory

If "<flink-dir>/build-target" already existed, running 'mvn package' for
flink-dist would create a symbolic link inside "<flink-dir>/build-target"
instead of replacing that symlink. This commit fixes this behaviour of 'ln -sf'
by adding the --no-dereference parameter.

This closes #3331


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6114c5b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6114c5b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6114c5b0

Branch: refs/heads/release-1.2
Commit: 6114c5b01d60d37efdd7db47bf9378f8dea4385c
Parents: 3429ea0
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Feb 15 15:50:45 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 16 14:51:27 2017 +0100

----------------------------------------------------------------------
 flink-dist/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6114c5b0/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 1af0775..b00b7eb 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -275,7 +275,7 @@ under the License.
 								<configuration>
 									<executable>ln</executable>
 									<arguments>
-										<argument>-sf</argument>
+										<argument>-sfn</argument>
 										<argument>${project.basedir}/target/flink-${project.version}-bin/flink-${project.version}</argument>
 										<argument>${project.basedir}/../build-target</argument>
 									</arguments>


[6/6] flink git commit: [FLINK-5705] [WebMonitor] WebMonitor request/response use UTF-8 explicitly

Posted by se...@apache.org.
[FLINK-5705] [WebMonitor] WebMonitor request/response use UTF-8 explicitly

This closes #3257


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3f2fe26
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3f2fe26
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3f2fe26

Branch: refs/heads/release-1.2
Commit: d3f2fe2625171f89404e1b90fa8c9493f5403b3a
Parents: 6114c5b
Author: shijinkui <sh...@huawei.com>
Authored: Fri Feb 3 17:26:18 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 16 15:09:56 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/webmonitor/HttpRequestHandler.java | 4 ++--
 .../apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java  | 5 ++++-
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3f2fe26/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
index 703b621..585a2f3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
@@ -107,8 +107,8 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject>
 				else if (currentRequest.getMethod() == HttpMethod.POST) {
 					// POST comes in multiple objects. First the request, then the contents
 					// keep the request and path for the remaining objects of the POST request
-					currentRequestPath = new QueryStringDecoder(currentRequest.getUri()).path();
-					currentDecoder = new HttpPostRequestDecoder(DATA_FACTORY, currentRequest);
+					currentRequestPath = new QueryStringDecoder(currentRequest.getUri(), ENCODING).path();
+					currentDecoder = new HttpPostRequestDecoder(DATA_FACTORY, currentRequest, ENCODING);
 				}
 				else {
 					throw new IOException("Unsupported HTTP method: " + currentRequest.getMethod().name());

http://git-wip-us.apache.org/repos/asf/flink/blob/d3f2fe26/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 68e1735..8dbd135 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -30,17 +30,18 @@ import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.router.KeepAliveWrite;
 import io.netty.handler.codec.http.router.Routed;
 
-import java.net.URLDecoder;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetSocketAddress;
+import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
@@ -116,6 +117,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 		}
 
 		response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+		// Content-Encoding:utf-8
+		response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, ENCODING.name());
 
 		KeepAliveWrite.flush(ctx, routed.request(), response);
 	}


[2/6] flink git commit: [FLINK-5805] [docs] Improvements to docs for ProcessFunction

Posted by se...@apache.org.
[FLINK-5805] [docs] Improvements to docs for ProcessFunction

This closes #3317


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fd374c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fd374c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fd374c9

Branch: refs/heads/release-1.2
Commit: 8fd374c9ed5913e42298f71c43922f11e353987e
Parents: e9ada34
Author: David Anderson <da...@alpinegizmo.com>
Authored: Wed Feb 15 10:58:55 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 16 14:51:01 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/process_function.md | 23 ++++++++++++++---------
 1 file changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8fd374c9/docs/dev/stream/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md
index a8da4a2..ce3c670 100644
--- a/docs/dev/stream/process_function.md
+++ b/docs/dev/stream/process_function.md
@@ -47,7 +47,7 @@ stream.keyBy("id").process(new MyProcessFunction())
 
 The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
 Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
-event time timestamp, and the *TimerService*. The `TimerService` can be used to register callbacks for future
+event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future
 event-/processing- time instants. When a timer's particular time is reached, the `onTimer(...)` method is
 called. During that call, all states are again scoped to the key with which the timer was created, allowing
 timers to perform keyed state manipulation as well.
@@ -55,30 +55,35 @@ timers to perform keyed state manipulation as well.
 
 ## Low-level Joins
 
-To realize low-level operations on two inputs, applications can use the `CoProcessFunction`. It relates to the `ProcessFunction`
-in the same way as a `CoFlatMapFunction` relates to the `FlatMapFunction`: The function is typed to two different inputs and
+To realize low-level operations on two inputs, applications can use `CoProcessFunction`. It relates to `ProcessFunction`
+in the same way that `CoFlatMapFunction` relates to `FlatMapFunction`: the function is bound to two different inputs and
 gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
 
-Implementing a low level join follows typically the pattern:
+Implementing a low level join typically follows this pattern:
 
   - Create a state object for one input (or both)
   - Update the state upon receiving elements from its input
   - Upon receiving elements from the other input, probe the state and produce the joined result
 
+For example, you might be joining customer data to financial trades,
+while keeping state for the customer data. If you care about having
+complete and deterministic joins in the face of out-of-order events,
+you can use a timer to evaluate and emit the join for a trade when the
+watermark for the customer data stream has passed the time of that
+trade.
 
 ## Example
 
-The following example maintains counts per key, and emits the key/count pair if no update happened to the key for one minute
-(in event time):
+The following example maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key:
 
   - The count, key, and last-modification-timestamp are stored in a `ValueState`, which is implicitly scoped by key.
   - For each record, the `ProcessFunction` increments the counter and sets the last-modification timestamp
   - The function also schedules a callback one minute into the future (in event time)
   - Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count
-    and emits the key/count if the match (no further update happened in that minute)
+    and emits the key/count if they match (i.e., no further update occurred during that minute)
 
-*Note:* This simple example could also have been implemented on top of session windows, we simple use it to illustrate
-the basic pattern of how to use the `ProcessFunction`.
+*Note:* This simple example could have been implemented with session windows. We use `ProcessFunction` here to illustrate
+the basic pattern it provides.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">