You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/12/08 12:54:54 UTC

[1/3] flink git commit: [FLINK-4563] [metrics] scope caching not adjusted for multiple reporters

Repository: flink
Updated Branches:
  refs/heads/master 441400855 -> 4eb71927b


[FLINK-4563] [metrics] scope caching not adjusted for multiple reporters

This closes #2650.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86f784a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86f784a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86f784a3

Branch: refs/heads/master
Commit: 86f784a3613ccd5d78197d94198a64b6f1333578
Parents: fe843e1
Author: Anton Mushin <an...@epam.com>
Authored: Mon Oct 17 17:23:01 2016 +0400
Committer: zentol <ch...@apache.org>
Committed: Thu Dec 8 12:04:48 2016 +0100

----------------------------------------------------------------------
 .../metrics/groups/AbstractMetricGroup.java     |  53 ++++----
 .../metrics/groups/AbstractMetricGroupTest.java | 135 +++++++++++++++++++
 2 files changed, 165 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86f784a3/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 04b8158..6ff9776 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -85,9 +86,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	 *  For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */
 	private final String[] scopeComponents;
 
-	/** The metrics scope represented by this group, as a concatenated string, lazily computed.
+	/** Array containing the metrics scope represented by this group for each reporter, as a concatenated string, lazily computed.
 	 * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
-	private String scopeString;
+	private final String[] scopeStrings;
 
 	/** The logical metrics scope represented by this group, as a concatenated string, lazily computed.
 	 * For example: "taskmanager.job.task" */
@@ -105,6 +106,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 		this.registry = checkNotNull(registry);
 		this.scopeComponents = checkNotNull(scope);
 		this.parent = parent;
+		this.scopeStrings = new String[registry.getReporters() == null ? 0 : registry.getReporters().size()];
 	}
 
 	public Map<String, String> getAllVariables() {
@@ -210,19 +212,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	 * @return fully qualified metric name
 	 */
 	public String getMetricIdentifier(String metricName, CharacterFilter filter) {
-		if (scopeString == null) {
-			if (filter != null) {
-				scopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents);
-			} else {
-				scopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents);
-			}
-		}
-
-		if (filter != null) {
-			return scopeString + registry.getDelimiter() + filter.filterCharacters(metricName);
-		} else {
-			return scopeString + registry.getDelimiter() + metricName;
-		}
+		return getMetricIdentifier(metricName, filter, -1);
 	}
 
 	/**
@@ -235,12 +225,29 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	 * @return fully qualified metric name
 	 */
 	public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex) {
-		if (filter != null) {
-			scopeString = ScopeFormat.concat(filter, registry.getDelimiter(reporterIndex), scopeComponents);
-			return scopeString + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName);
+		if (scopeStrings.length == 0 || (reporterIndex < 0 || reporterIndex >= scopeStrings.length)) {
+			char delimiter = registry.getDelimiter();
+			String newScopeString;
+			if (filter != null) {
+				newScopeString = ScopeFormat.concat(filter, delimiter, scopeComponents);
+				metricName = filter.filterCharacters(metricName);
+			} else {
+				newScopeString = ScopeFormat.concat(delimiter, scopeComponents);
+			}
+			return newScopeString + delimiter + metricName;
 		} else {
-			scopeString = ScopeFormat.concat(registry.getDelimiter(reporterIndex), scopeComponents);
-			return scopeString + registry.getDelimiter(reporterIndex) + metricName;
+			char delimiter = registry.getDelimiter(reporterIndex);
+			if (scopeStrings[reporterIndex] == null) {
+				if (filter != null) {
+					scopeStrings[reporterIndex] = ScopeFormat.concat(filter, delimiter, scopeComponents);
+				} else {
+					scopeStrings[reporterIndex] = ScopeFormat.concat(delimiter, scopeComponents);
+				}
+			}
+			if (filter != null) {
+				metricName = filter.filterCharacters(metricName);
+			}
+			return scopeStrings[reporterIndex] + delimiter + metricName;
 		}
 	}
 	
@@ -353,7 +360,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 						// we warn here, rather than failing, because metrics are tools that should not fail the
 						// program when used incorrectly
 						LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" +
-								name + "'. Metric might not get properly reported. (" + scopeString + ')');
+								name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
 					}
 
 					registry.register(metric, name, this);
@@ -365,7 +372,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 					// we warn here, rather than failing, because metrics are tools that should not fail the
 					// program when used incorrectly
 					LOG.warn("Name collision: Group already contains a Metric with the name '" +
-							name + "'. Metric will not be reported. (" + scopeString + ')');
+							name + "'. Metric will not be reported." + Arrays.toString(scopeComponents));
 				}
 			}
 		}
@@ -389,7 +396,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 				// program when used incorrectly
 				if (metrics.containsKey(name)) {
 					LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" +
-							name + "'. Metric might not get properly reported. (" + scopeString + ')');
+							name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
 				}
 
 				AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name);

http://git-wip-us.apache.org/repos/asf/flink/blob/86f784a3/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index c7b392f..da14bfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -17,12 +17,19 @@
  */
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class AbstractMetricGroupTest {
@@ -49,4 +56,132 @@ public class AbstractMetricGroupTest {
 		
 		registry.shutdown();
 	}
+
+	// ========================================================================
+	// Scope Caching
+	// ========================================================================
+
+	private static final CharacterFilter FILTER_C = new CharacterFilter() {
+		@Override
+		public String filterCharacters(String input) {
+			return input.replace("C", "X");
+		}
+	};
+	private static final CharacterFilter FILTER_B = new CharacterFilter() {
+		@Override
+		public String filterCharacters(String input) {
+			return input.replace("B", "X");
+		}
+	};
+
+	@Test
+	public void testScopeCachingForMultipleReporters() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
+		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
+
+		MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+		try {
+			MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
+			tmGroup.counter("1");
+			assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size());
+			for (MetricReporter reporter : testRegistry.getReporters()) {
+				ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter;
+				if (typedReporter.failureCause != null) {
+					throw typedReporter.failureCause;
+				}
+			}
+		} finally {
+			testRegistry.shutdown();
+		}
+	}
+
+	private abstract static class ScopeCheckingTestReporter extends TestReporter {
+		protected Exception failureCause;
+
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+			try {
+				checkScopes(metric, metricName, group);
+			} catch (Exception e) {
+				if (failureCause == null) {
+					failureCause = e;
+				}
+			}
+		}
+
+		public abstract void checkScopes(Metric metric, String metricName, MetricGroup group);
+
+	}
+
+	public static class TestReporter1 extends ScopeCheckingTestReporter {
+		@Override
+		public String filterCharacters(String input) {
+			return FILTER_B.filterCharacters(input);
+		}
+		@Override
+		public void checkScopes(Metric metric, String metricName, MetricGroup group) {
+			// the first call determines which filter is applied to all future calls; in this case no filter is used at all
+			assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName));
+			// from now on the scope string is cached and should not be reliant on the given filter
+			assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName, FILTER_C));
+			assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName, this));
+			// the metric name however is still affected by the filter as it is not cached
+			assertEquals("A-B-C-D-4", group.getMetricIdentifier(metricName, new CharacterFilter() {
+				@Override
+				public String filterCharacters(String input) {
+					return input.replace("B", "X").replace("1", "4");
+				}
+			}));
+		}
+	}
+
+	public static class TestReporter2 extends ScopeCheckingTestReporter {
+		@Override
+		public String filterCharacters(String input) {
+			return FILTER_C.filterCharacters(input);
+		}
+
+		@Override
+		public void checkScopes(Metric metric, String metricName, MetricGroup group) {
+			// the first call determines which filter is applied to all future calls
+			assertEquals("A!B!X!D!1", group.getMetricIdentifier(metricName, this));
+			// from now on the scope string is cached and should not be reliant on the given filter
+			assertEquals("A!B!X!D!1", group.getMetricIdentifier(metricName));
+			assertEquals("A!B!X!D!1", group.getMetricIdentifier(metricName, FILTER_C));
+			// the metric name however is still affected by the filter as it is not cached
+			assertEquals("A!B!X!D!3", group.getMetricIdentifier(metricName, new CharacterFilter() {
+				@Override
+				public String filterCharacters(String input) {
+					return input.replace("A", "X").replace("1", "3");
+				}
+			}));
+		}
+	}
+
+	@Test
+	public void testScopeGenerationWithoutReporters() {
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
+		MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+
+		try {
+			TaskManagerMetricGroup group = new TaskManagerMetricGroup(testRegistry, "host", "id");
+			assertEquals("MetricReporters list should be empty", 0, testRegistry.getReporters().size());
+			
+			// default delimiter should be used
+			assertEquals("A.B.X.D.1", group.getMetricIdentifier("1", FILTER_C));
+			// no caching should occur
+			assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B));
+			// invalid reporter indices do not throw errors
+			assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, -1));
+			assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, 2));
+		} finally {
+			testRegistry.shutdown();
+		}
+	}
 }


[3/3] flink git commit: [FLINK-5020] Make the GenericWriteAheadSink rescalable.

Posted by ch...@apache.org.
[FLINK-5020] Make the GenericWriteAheadSink rescalable.

Integrates the new state abstractions with the GenericWriteAheadSink
so that the latter can change its parallelism when resuming execution
from a savepoint, without geopardizing the provided guarantees.

This closes #2759


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb71927
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb71927
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb71927

Branch: refs/heads/master
Commit: 4eb71927bc4f0832eb08a79394ad6864a3c2e142
Parents: 86f784a
Author: kl0u <kk...@gmail.com>
Authored: Wed Oct 26 17:19:12 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Dec 8 12:27:14 2016 +0100

----------------------------------------------------------------------
 .../cassandra/CassandraConnectorITCase.java     |  38 ++--
 .../runtime/operators/CheckpointCommitter.java  |   1 +
 .../operators/GenericWriteAheadSink.java        | 105 ++++++-----
 .../operators/GenericWriteAheadSinkTest.java    |  50 +++---
 .../operators/WriteAheadSinkTestBase.java       | 172 +++++++++++++++++--
 5 files changed, 276 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 2bb6fd1..f2e8f8b 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -47,7 +47,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.TestEnvironment;
 
@@ -71,6 +70,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Scanner;
 import java.util.UUID;
 
@@ -262,9 +262,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	}
 
 	@Override
-	protected void verifyResultsIdealCircumstances(
-		OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
-		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+	protected void verifyResultsIdealCircumstances(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
 		ResultSet result = session.execute(SELECT_DATA_QUERY);
 		ArrayList<Integer> list = new ArrayList<>();
@@ -279,9 +277,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	}
 
 	@Override
-	protected void verifyResultsDataPersistenceUponMissedNotify(
-		OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
-		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+	protected void verifyResultsDataPersistenceUponMissedNotify(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
 		ResultSet result = session.execute(SELECT_DATA_QUERY);
 		ArrayList<Integer> list = new ArrayList<>();
@@ -296,9 +292,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	}
 
 	@Override
-	protected void verifyResultsDataDiscardingUponRestore(
-		OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
-		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+	protected void verifyResultsDataDiscardingUponRestore(CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
 		ResultSet result = session.execute(SELECT_DATA_QUERY);
 		ArrayList<Integer> list = new ArrayList<>();
@@ -315,6 +309,30 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
 	}
 
+	@Override
+	protected void verifyResultsWhenReScaling(
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink, int startElementCounter, int endElementCounter) {
+
+		// IMPORTANT NOTE:
+		//
+		// for cassandra we always have to start from 1 because
+		// all operators will share the same final db
+
+		ArrayList<Integer> expected = new ArrayList<>();
+		for (int i = 1; i <= endElementCounter; i++) {
+			expected.add(i);
+		}
+
+		ArrayList<Integer> actual = new ArrayList<>();
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
+		for (Row s : result) {
+			actual.add(s.getInt("counter"));
+		}
+
+		Collections.sort(actual);
+		Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+	}
+
 	@Test
 	public void testCassandraCommitter() throws Exception {
 		CassandraCommitter cc1 = new CassandraCommitter(builder);

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
index 90e3a57..6e50dde 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/CheckpointCommitter.java
@@ -40,6 +40,7 @@ import java.io.Serializable;
  * and as such should kept as small as possible.
  */
 public abstract class CheckpointCommitter implements Serializable {
+
 	protected static final Logger LOG = LoggerFactory.getLogger(CheckpointCommitter.class);
 
 	protected String jobId;

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index b08b2e9..564fa22 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -17,20 +17,20 @@
  */
 package org.apache.flink.streaming.runtime.operators;
 
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ import java.util.UUID;
  * @param <IN> Type of the elements emitted by this sink
  */
 public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<IN>
-		implements OneInputStreamOperator<IN, IN>, StreamCheckpointedOperator {
+		implements OneInputStreamOperator<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -65,9 +65,15 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	private transient CheckpointStreamFactory.CheckpointStateOutputStream out;
 	private transient CheckpointStreamFactory checkpointStreamFactory;
 
+	private transient ListState<PendingCheckpoint> checkpointedState;
+
 	private final Set<PendingCheckpoint> pendingCheckpoints = new TreeSet<>();
 
-	public GenericWriteAheadSink(CheckpointCommitter committer,	TypeSerializer<IN> serializer, String jobID) throws Exception {
+	public GenericWriteAheadSink(
+			CheckpointCommitter committer,
+			TypeSerializer<IN> serializer,
+			String jobID) throws Exception {
+
 		this.committer = Preconditions.checkNotNull(committer);
 		this.serializer = Preconditions.checkNotNull(serializer);
 		this.id = UUID.randomUUID().toString();
@@ -77,12 +83,39 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	}
 
 	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+
+		Preconditions.checkState(this.checkpointedState == null,
+			"The reader state has already been initialized.");
+
+		checkpointedState = context.getOperatorStateStore()
+			.getSerializableListState("pending-checkpoints");
+
+		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+		if (context.isRestored()) {
+			LOG.info("Restoring state for the GenericWriteAheadSink (taskIdx={}).", subtaskIdx);
+
+			for (PendingCheckpoint pendingCheckpoint : checkpointedState.get()) {
+				this.pendingCheckpoints.add(pendingCheckpoint);
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("GenericWriteAheadSink idx {} restored {}.", subtaskIdx, this.pendingCheckpoints);
+			}
+		} else {
+			LOG.info("No state to restore for the GenericWriteAheadSink (taskIdx={}).", subtaskIdx);
+		}
+	}
+
+	@Override
 	public void open() throws Exception {
 		super.open();
 		committer.setOperatorId(id);
 		committer.open();
 
-		checkpointStreamFactory = getContainingTask().createCheckpointStreamFactory(this);
+		checkpointStreamFactory = getContainingTask()
+			.createCheckpointStreamFactory(this);
 
 		cleanRestoredHandles();
 	}
@@ -99,12 +132,14 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	 * @throws IOException in case something went wrong when handling the stream to the backend.
 	 */
 	private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {
+
 		//only add handle if a new OperatorState was created since the last snapshot
 		if (out != null) {
 			int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 			StreamStateHandle handle = out.closeAndGetHandle();
 
-			PendingCheckpoint pendingCheckpoint = new PendingCheckpoint(checkpointId, subtaskIdx, timestamp, handle);
+			PendingCheckpoint pendingCheckpoint = new PendingCheckpoint(
+				checkpointId, subtaskIdx, timestamp, handle);
 
 			if (pendingCheckpoints.contains(pendingCheckpoint)) {
 				//we already have a checkpoint stored for that ID that may have been partially written,
@@ -118,22 +153,23 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	}
 
 	@Override
-	public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
-		saveHandleInState(checkpointId, timestamp);
+	public void snapshotState(StateSnapshotContext context) throws Exception {
+		super.snapshotState(context);
+
+		Preconditions.checkState(this.checkpointedState != null,
+			"The operator state has not been properly initialized.");
 
-		DataOutputViewStreamWrapper outStream = new DataOutputViewStreamWrapper(out);
-		outStream.writeInt(pendingCheckpoints.size());
+		saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());
+
+		this.checkpointedState.clear();
 		for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
-			pendingCheckpoint.serialize(outStream);
+			// create a new partition for each entry.
+			this.checkpointedState.add(pendingCheckpoint);
 		}
-	}
 
-	@Override
-	public void restoreState(FSDataInputStream in) throws Exception {
-		final DataInputViewStreamWrapper inStream = new DataInputViewStreamWrapper(in);
-		int numPendingHandles = inStream.readInt();
-		for (int i = 0; i < numPendingHandles; i++) {
-			pendingCheckpoints.add(PendingCheckpoint.restore(inStream, getUserCodeClassloader()));
+		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} (taskIdx= {}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, this.pendingCheckpoints);
 		}
 	}
 
@@ -162,9 +198,12 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 		super.notifyOfCompletedCheckpoint(checkpointId);
 
 		synchronized (pendingCheckpoints) {
+
 			Iterator<PendingCheckpoint> pendingCheckpointIt = pendingCheckpoints.iterator();
 			while (pendingCheckpointIt.hasNext()) {
+
 				PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next();
+
 				long pastCheckpointId = pendingCheckpoint.checkpointId;
 				int subtaskId = pendingCheckpoint.subtaskId;
 				long timestamp = pendingCheckpoint.timestamp;
@@ -241,34 +280,15 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 			this.stateHandle = handle;
 		}
 
-		void serialize(DataOutputViewStreamWrapper outputStream) throws IOException {
-			outputStream.writeLong(checkpointId);
-			outputStream.writeInt(subtaskId);
-			outputStream.writeLong(timestamp);
-			InstantiationUtil.serializeObject(outputStream, stateHandle);
-		}
-
-		static PendingCheckpoint restore(
-				DataInputViewStreamWrapper inputStream,
-				ClassLoader classLoader) throws IOException, ClassNotFoundException {
-
-			long checkpointId = inputStream.readLong();
-			int subtaskId = inputStream.readInt();
-			long timestamp = inputStream.readLong();
-			StreamStateHandle handle = InstantiationUtil.deserializeObject(inputStream, classLoader);
-
-			return new PendingCheckpoint(checkpointId, subtaskId, timestamp, handle);
-		}
-
 		@Override
 		public int compareTo(PendingCheckpoint o) {
 			int res = Long.compare(this.checkpointId, o.checkpointId);
-			return res != 0 ? res : Integer.compare(this.subtaskId, o.subtaskId);
+			return res != 0 ? res : this.subtaskId - o.subtaskId;
 		}
 
 		@Override
 		public boolean equals(Object o) {
-			if (!(o instanceof GenericWriteAheadSink.PendingCheckpoint)) {
+			if (o == null || !(o instanceof GenericWriteAheadSink.PendingCheckpoint)) {
 				return false;
 			}
 			PendingCheckpoint other = (PendingCheckpoint) o;
@@ -285,5 +305,10 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 			hash = 31 * hash + (int) (timestamp ^ (timestamp >>> 32));
 			return hash;
 		}
+
+		@Override
+		public String toString() {
+			return "Pending Checkpoint: id=" + checkpointId + "/" + subtaskId + "@" + timestamp;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
index 8d092ed..9bcd2e6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -29,6 +30,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Integer>, GenericWriteAheadSinkTest.ListSink> {
@@ -50,9 +52,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 
 
 	@Override
-	protected void verifyResultsIdealCircumstances(
-		OneInputStreamOperatorTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness,
-		ListSink sink) {
+	protected void verifyResultsIdealCircumstances(ListSink sink) {
 
 		ArrayList<Integer> list = new ArrayList<>();
 		for (int x = 1; x <= 60; x++) {
@@ -67,9 +67,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 	}
 
 	@Override
-	protected void verifyResultsDataPersistenceUponMissedNotify(
-		OneInputStreamOperatorTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness,
-		ListSink sink) {
+	protected void verifyResultsDataPersistenceUponMissedNotify(ListSink sink) {
 
 		ArrayList<Integer> list = new ArrayList<>();
 		for (int x = 1; x <= 60; x++) {
@@ -84,9 +82,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 	}
 
 	@Override
-	protected void verifyResultsDataDiscardingUponRestore(
-		OneInputStreamOperatorTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness,
-		ListSink sink) {
+	protected void verifyResultsDataDiscardingUponRestore(ListSink sink) {
 
 		ArrayList<Integer> list = new ArrayList<>();
 		for (int x = 1; x <= 20; x++) {
@@ -103,6 +99,18 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 		Assert.assertTrue("The sink emitted to many values: " + (sink.values.size() - 40), sink.values.size() == 40);
 	}
 
+	@Override
+	protected void verifyResultsWhenReScaling(ListSink sink, int startElementCounter, int endElementCounter) throws Exception {
+
+		ArrayList<Integer> list = new ArrayList<>();
+		for (int i = startElementCounter; i <= endElementCounter; i++) {
+			list.add(i);
+		}
+
+		Collections.sort(sink.values);
+		Assert.assertArrayEquals(list.toArray(), sink.values.toArray());
+	}
+
 	@Test
 	/**
 	 * Verifies that exceptions thrown by a committer do not fail a job and lead to an abort of notify()
@@ -124,33 +132,33 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(0, 0);
+		testHarness.snapshot(0, 0);
 		testHarness.notifyOfCompletedCheckpoint(0);
 
 		//isCommitted should have failed, thus sendValues() should never have been called
 		Assert.assertEquals(0, sink.values.size());
 
-		for (int x = 0; x < 10; x++) {
+		for (int x = 0; x < 11; x++) {
 			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 1)));
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(1, 0);
+		testHarness.snapshot(1, 0);
 		testHarness.notifyOfCompletedCheckpoint(1);
 
 		//previous CP should be retried, but will fail the CP commit. Second CP should be skipped.
 		Assert.assertEquals(10, sink.values.size());
 
-		for (int x = 0; x < 10; x++) {
+		for (int x = 0; x < 12; x++) {
 			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2)));
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(2, 0);
+		testHarness.snapshot(2, 0);
 		testHarness.notifyOfCompletedCheckpoint(2);
 
-		//all CP's should be retried and succeed; since one CP was written twice we have 2 * 10 + 10 + 10 = 40 values
-		Assert.assertEquals(40, sink.values.size());
+		//all CP's should be retried and succeed; since one CP was written twice we have 2 * 10 + 11 + 12 = 43 values
+		Assert.assertEquals(43, sink.values.size());
 	}
 
 	/**
@@ -177,7 +185,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 	public static class SimpleCommitter extends CheckpointCommitter {
 		private static final long serialVersionUID = 1L;
 
-		private List<Long> checkpoints;
+		private List<Tuple2<Long, Integer>> checkpoints;
 
 		@Override
 		public void open() throws Exception {
@@ -194,12 +202,12 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 
 		@Override
 		public void commitCheckpoint(int subtaskIdx, long checkpointID) {
-			checkpoints.add(checkpointID);
+			checkpoints.add(new Tuple2<>(checkpointID, subtaskIdx));
 		}
 
 		@Override
 		public boolean isCheckpointCommitted(int subtaskIdx, long checkpointID) {
-			return checkpoints.contains(checkpointID);
+			return checkpoints.contains(new Tuple2<>(checkpointID, subtaskIdx));
 		}
 	}
 
@@ -227,7 +235,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 	public static class FailingCommitter extends CheckpointCommitter {
 		private static final long serialVersionUID = 1L;
 
-		private List<Long> checkpoints;
+		private List<Tuple2<Long, Integer>> checkpoints;
 		private boolean failIsCommitted = true;
 		private boolean failCommit = true;
 
@@ -250,7 +258,7 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int
 				failCommit = false;
 				throw new RuntimeException("Expected exception");
 			} else {
-				checkpoints.add(checkpointID);
+				checkpoints.add(new Tuple2<>(checkpointID, subtaskIdx));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index a9c5792..46d92af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -19,8 +19,9 @@
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 
@@ -34,14 +35,13 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 
 	protected abstract IN generateValue(int counter, int checkpointID);
 
-	protected abstract void verifyResultsIdealCircumstances(
-		OneInputStreamOperatorTestHarness<IN, IN> harness, S sink) throws Exception;
+	protected abstract void verifyResultsIdealCircumstances(S sink) throws Exception;
 
-	protected abstract void verifyResultsDataPersistenceUponMissedNotify(
-			OneInputStreamOperatorTestHarness<IN, IN> harness, S sink) throws Exception;
+	protected abstract void verifyResultsDataPersistenceUponMissedNotify(S sink) throws Exception;
 
-	protected abstract void verifyResultsDataDiscardingUponRestore(
-		OneInputStreamOperatorTestHarness<IN, IN> harness, S sink) throws Exception;
+	protected abstract void verifyResultsDataDiscardingUponRestore(S sink) throws Exception;
+
+	protected abstract void verifyResultsWhenReScaling(S sink, int startElementCounter, int endElementCounter) throws Exception;
 
 	@Test
 	public void testIdealCircumstances() throws Exception {
@@ -60,7 +60,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(snapshotCount++, 0);
+		testHarness.snapshot(snapshotCount++, 0);
 		testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
 		for (int x = 0; x < 20; x++) {
@@ -68,7 +68,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(snapshotCount++, 0);
+		testHarness.snapshot(snapshotCount++, 0);
 		testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
 		for (int x = 0; x < 20; x++) {
@@ -76,10 +76,10 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(snapshotCount++, 0);
+		testHarness.snapshot(snapshotCount++, 0);
 		testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
-		verifyResultsIdealCircumstances(testHarness, sink);
+		verifyResultsIdealCircumstances(sink);
 	}
 
 	@Test
@@ -99,7 +99,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(snapshotCount++, 0);
+		testHarness.snapshot(snapshotCount++, 0);
 		testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
 		for (int x = 0; x < 20; x++) {
@@ -107,17 +107,17 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(snapshotCount++, 0);
+		testHarness.snapshot(snapshotCount++, 0);
 
 		for (int x = 0; x < 20; x++) {
 			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2)));
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(snapshotCount++, 0);
+		testHarness.snapshot(snapshotCount++, 0);
 		testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
-		verifyResultsDataPersistenceUponMissedNotify(testHarness, sink);
+		verifyResultsDataPersistenceUponMissedNotify(sink);
 	}
 
 	@Test
@@ -137,7 +137,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 			elementCounter++;
 		}
 
-		StreamStateHandle latestSnapshot = testHarness.snapshotLegacy(snapshotCount++, 0);
+		OperatorStateHandles latestSnapshot = testHarness.snapshot(snapshotCount++, 0);
 		testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
 		for (int x = 0; x < 20; x++) {
@@ -152,7 +152,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 		testHarness = new OneInputStreamOperatorTestHarness<>(sink);
 
 		testHarness.setup();
-		testHarness.restore(latestSnapshot);
+		testHarness.initializeState(latestSnapshot);
 		testHarness.open();
 
 		for (int x = 0; x < 20; x++) {
@@ -160,9 +160,143 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 			elementCounter++;
 		}
 
-		testHarness.snapshotLegacy(snapshotCount++, 0);
+		testHarness.snapshot(snapshotCount++, 0);
 		testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
-		verifyResultsDataDiscardingUponRestore(testHarness, sink);
+		verifyResultsDataDiscardingUponRestore(sink);
+	}
+
+	@Test
+	public void testScalingDown() throws Exception {
+		S sink1 = createSink();
+		OneInputStreamOperatorTestHarness<IN, IN> testHarness1 =
+			new OneInputStreamOperatorTestHarness<>(sink1, 10, 2, 0);
+		testHarness1.open();
+
+		S sink2 = createSink();
+		OneInputStreamOperatorTestHarness<IN, IN> testHarness2 =
+			new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 1);
+		testHarness2.open();
+
+		int elementCounter = 1;
+		int snapshotCount = 0;
+
+		for (int x = 0; x < 10; x++) {
+			testHarness1.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+			elementCounter++;
+		}
+
+		for (int x = 0; x < 11; x++) {
+			testHarness2.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+			elementCounter++;
+		}
+
+		// snapshot at checkpoint 0 for testHarness1 and testHarness 2
+		OperatorStateHandles snapshot1 = testHarness1.snapshot(snapshotCount, 0);
+		OperatorStateHandles snapshot2 = testHarness2.snapshot(snapshotCount, 0);
+
+		// merge the two partial states
+		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness
+			.repackageState(snapshot1, snapshot2);
+
+		testHarness1.close();
+		testHarness2.close();
+
+		// and create a third instance that operates alone but
+		// has the merged state of the previous 2 instances
+
+		S sink3 = createSink();
+		OneInputStreamOperatorTestHarness<IN, IN> mergedTestHarness =
+			new OneInputStreamOperatorTestHarness<>(sink3, 10, 1, 0);
+
+		mergedTestHarness.setup();
+		mergedTestHarness.initializeState(mergedSnapshot);
+		mergedTestHarness.open();
+
+		for (int x = 0; x < 12; x++) {
+			mergedTestHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+			elementCounter++;
+		}
+
+		snapshotCount++;
+		mergedTestHarness.snapshot(snapshotCount, 1);
+		mergedTestHarness.notifyOfCompletedCheckpoint(snapshotCount);
+
+		verifyResultsWhenReScaling(sink3, 1, 33);
+		mergedTestHarness.close();
+	}
+
+	@Test
+	public void testScalingUp() throws Exception {
+
+		S sink1 = createSink();
+		OneInputStreamOperatorTestHarness<IN, IN> testHarness1 =
+			new OneInputStreamOperatorTestHarness<>(sink1, 10, 1, 0);
+
+		int elementCounter = 1;
+		int snapshotCount = 0;
+
+		testHarness1.open();
+
+		// put two more checkpoints as pending
+
+		for (int x = 0; x < 10; x++) {
+			testHarness1.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+			elementCounter++;
+		}
+		testHarness1.snapshot(++snapshotCount, 0);
+
+		for (int x = 0; x < 11; x++) {
+			testHarness1.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+			elementCounter++;
+		}
+
+		// this will be the state that will be split between the two new operators
+		OperatorStateHandles snapshot = testHarness1.snapshot(++snapshotCount, 0);
+
+		testHarness1.close();
+
+		// verify no elements are in the sink
+		verifyResultsWhenReScaling(sink1, 0, -1);
+
+		// we will create two operator instances, testHarness2 and testHarness3,
+		// that will share the state of testHarness1
+
+		++snapshotCount;
+
+		S sink2 = createSink();
+		OneInputStreamOperatorTestHarness<IN, IN> testHarness2 =
+			new OneInputStreamOperatorTestHarness<>(sink2, 10, 2, 0);
+
+		testHarness2.setup();
+		testHarness2.initializeState(snapshot);
+		testHarness2.open();
+
+		testHarness2.notifyOfCompletedCheckpoint(snapshotCount);
+
+		verifyResultsWhenReScaling(sink2, 1, 10);
+
+		S sink3 = createSink();
+		OneInputStreamOperatorTestHarness<IN, IN> testHarness3 =
+			new OneInputStreamOperatorTestHarness<>(sink3, 10, 2, 1);
+
+		testHarness3.setup();
+		testHarness3.initializeState(snapshot);
+		testHarness3.open();
+
+		// add some more elements to verify that everything functions normally from now on...
+
+		for (int x = 0; x < 10; x++) {
+			testHarness3.processElement(new StreamRecord<>(generateValue(elementCounter, 0)));
+			elementCounter++;
+		}
+
+		testHarness3.snapshot(snapshotCount, 1);
+		testHarness3.notifyOfCompletedCheckpoint(snapshotCount);
+
+		verifyResultsWhenReScaling(sink3, 11, 31);
+
+		testHarness2.close();
+		testHarness3.close();
 	}
 }


[2/3] flink git commit: [FLINK-5164] Disable some Hadoop-compat tests on Windows

Posted by ch...@apache.org.
[FLINK-5164] Disable some Hadoop-compat tests on Windows

This closes #2889.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe843e13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe843e13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe843e13

Branch: refs/heads/master
Commit: fe843e1377aa08a10394bbfa67dc9d3b2a23b805
Parents: 4414008
Author: zentol <ch...@apache.org>
Authored: Fri Nov 25 14:58:48 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Dec 8 12:04:48 2016 +0100

----------------------------------------------------------------------
 .../test/hadoopcompatibility/mapred/HadoopMapredITCase.java | 9 +++++++++
 .../mapreduce/HadoopInputOutputITCase.java                  | 8 ++++++++
 .../flink/test/hadoop/mapred/HadoopIOFormatsITCase.java     | 9 +++++++++
 .../flink/test/hadoop/mapred/WordCountMapredITCase.java     | 9 +++++++++
 .../test/hadoop/mapreduce/WordCountMapreduceITCase.java     | 9 +++++++++
 .../api/scala/hadoop/mapred/WordCountMapredITCase.scala     | 8 ++++++++
 .../scala/hadoop/mapreduce/WordCountMapreduceITCase.scala   | 8 ++++++++
 7 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index ccc0d82..0b5a366 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -21,12 +21,21 @@ package org.apache.flink.test.hadoopcompatibility.mapred;
 import org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.OperatingSystem;
+import org.junit.Assume;
+import org.junit.Before;
 
 public class HadoopMapredITCase extends JavaProgramTestBase {
 	
 	protected String textPath;
 	protected String resultPath;
 
+	@Before
+	public void checkOperatingSystem() {
+		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
+
 	@Override
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 698e356..48aa258 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -21,12 +21,20 @@ package org.apache.flink.test.hadoopcompatibility.mapreduce;
 import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.OperatingSystem;
+import org.junit.Assume;
+import org.junit.Before;
 
 public class HadoopInputOutputITCase extends JavaProgramTestBase {
 	
 	protected String textPath;
 	protected String resultPath;
 	
+	@Before
+	public void checkOperatingSystem() {
+		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
 	
 	@Override
 	protected void preSubmit() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
index 0cb1ac5..468b780 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.OperatingSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
@@ -35,6 +36,8 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.junit.Assume;
+import org.junit.Before;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -61,6 +64,12 @@ public class HadoopIOFormatsITCase extends JavaProgramTestBase {
 		super(config);	
 	}
 	
+	@Before
+	public void checkOperatingSystem() {
+		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
+
 	@Override
 	protected void preSubmit() throws Exception {
 		resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };

http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index 80f311a..9528d94 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -28,18 +28,27 @@ import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OperatingSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.junit.Assume;
+import org.junit.Before;
 
 public class WordCountMapredITCase extends JavaProgramTestBase {
 
 	protected String textPath;
 	protected String resultPath;
 
+	@Before
+	public void checkOperatingSystem() {
+		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
+
 	@Override
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index 3293770..64062d2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -28,18 +28,27 @@ import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OperatingSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Assume;
+import org.junit.Before;
 
 public class WordCountMapreduceITCase extends JavaProgramTestBase {
 
 	protected String textPath;
 	protected String resultPath;
 
+	@Before
+	public void checkOperatingSystem() {
+		// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
+
 	@Override
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
index 6b414d6..9d04ca59 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -21,14 +21,22 @@ import org.apache.flink.api.scala._
 import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
 import org.apache.flink.test.testdata.WordCountData
 import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
+import org.apache.flink.util.OperatingSystem
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextInputFormat, TextOutputFormat}
+import org.junit.{Assume, Before}
 
 class WordCountMapredITCase extends JavaProgramTestBase {
   protected var textPath: String = null
   protected var resultPath: String = null
 
+  @Before
+  def checkOperatingSystem() {
+    // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+    Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows)
+  }
+
   protected override def preSubmit() {
     textPath = createTempFile("text.txt", WordCountData.TEXT)
     resultPath = getTempDirPath("result")

http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
index e393d23..3b23a13 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -22,16 +22,24 @@ import org.apache.flink.api.scala._
 import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
 import org.apache.flink.test.testdata.WordCountData
 import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
+import org.apache.flink.util.OperatingSystem
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{Text, LongWritable}
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+import org.junit.{Assume, Before}
 
 class WordCountMapreduceITCase extends JavaProgramTestBase {
   protected var textPath: String = null
   protected var resultPath: String = null
 
+  @Before
+  def checkOperatingSystem() {
+    // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
+    Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows)
+  }
+
   protected override def preSubmit() {
     textPath = createTempFile("text.txt", WordCountData.TEXT)
     resultPath = getTempDirPath("result")