You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:16 UTC

[01/15] flink git commit: [FLINK-5292] Add "restoreFromLegacySnapshot" in AbstractStreamOperatorTestHarness.

Repository: flink
Updated Branches:
  refs/heads/master bfdaa3821 -> 1220230c6


[FLINK-5292] Add "restoreFromLegacySnapshot" in AbstractStreamOperatorTestHarness.

For unit testing the code in operators that restores from Flink 1.1
snapshots.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9b4f91a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9b4f91a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9b4f91a

Branch: refs/heads/master
Commit: f9b4f91a2b1b5045ef03cdb23617eed5074d080b
Parents: 896fbae
Author: kl0u <kk...@gmail.com>
Authored: Fri Dec 16 10:04:26 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:53 2016 +0100

----------------------------------------------------------------------
 .../util/AbstractStreamOperatorTestHarness.java | 36 +++++++++++++++++++-
 .../KeyedOneInputStreamOperatorTestHarness.java | 31 ++++++++++++++---
 2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9b4f91a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index b623fa1..346d5c3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -25,6 +25,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.migration.util.MigrationInstantiationUtil;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
@@ -58,6 +61,7 @@ import org.apache.flink.util.Preconditions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.FileInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -263,6 +267,36 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		setupCalled = true;
 	}
 
+	public void initializeStateFromLegacyCheckpoint(String checkpointFilename) throws Exception {
+
+		FileInputStream fin = new FileInputStream(checkpointFilename);
+		StreamTaskState state = MigrationInstantiationUtil.deserializeObject(fin, ClassLoader.getSystemClassLoader());
+		fin.close();
+
+		if (!setupCalled) {
+			setup();
+		}
+
+		StreamStateHandle stateHandle = SavepointV0Serializer.convertOperatorAndFunctionState(state);
+
+		List<KeyGroupsStateHandle> keyGroupStatesList = new ArrayList<>();
+		if (state.getKvStates() != null) {
+			KeyGroupsStateHandle keyedStateHandle = SavepointV0Serializer.convertKeyedBackendState(
+					state.getKvStates(),
+					environment.getTaskInfo().getIndexOfThisSubtask(),
+					0);
+			keyGroupStatesList.add(keyedStateHandle);
+		}
+
+		// finally calling the initializeState() with the legacy operatorStateHandles
+		initializeState(new OperatorStateHandles(0,
+				stateHandle,
+				keyGroupStatesList,
+				Collections.<KeyGroupsStateHandle>emptyList(),
+				Collections.<OperatorStateHandle>emptyList(),
+				Collections.<OperatorStateHandle>emptyList()));
+	}
+
 	/**
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
@@ -323,7 +357,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 			OperatorStateHandles massagedOperatorStateHandles = new OperatorStateHandles(
 					0,
-					null,
+					operatorStateHandles.getLegacyOperatorState(),
 					localManagedKeyGroupState,
 					localRawKeyGroupState,
 					localManagedOperatorState,

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b4f91a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 0bdf5da..3a47a1d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -34,11 +34,14 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.util.Migration;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.RunnableFuture;
@@ -184,6 +187,16 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		}
 	}
 
+
+	private static boolean hasMigrationHandles(Collection<KeyGroupsStateHandle> allKeyGroupsHandles) {
+		for (KeyGroupsStateHandle handle : allKeyGroupsHandles) {
+			if (handle instanceof Migration) {
+				return true;
+			}
+		}
+		return false;
+	}
+
 	@Override
 	public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
 		if (operatorStateHandles != null) {
@@ -201,10 +214,20 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 					keyGroupPartitions.get(subtaskIndex);
 
 			restoredKeyedState = null;
-			if (operatorStateHandles.getManagedKeyedState() != null) {
-				restoredKeyedState = StateAssignmentOperation.getKeyGroupsStateHandles(
-						operatorStateHandles.getManagedKeyedState(),
-						localKeyGroupRange);
+			Collection<KeyGroupsStateHandle> managedKeyedState = operatorStateHandles.getManagedKeyedState();
+			if (managedKeyedState != null) {
+
+				// if we have migration handles, don't reshuffle state and preserve
+				// the migration tag
+				if (hasMigrationHandles(managedKeyedState)) {
+					List<KeyGroupsStateHandle> result = new ArrayList<>(managedKeyedState.size());
+					result.addAll(managedKeyedState);
+					restoredKeyedState = result;
+				} else {
+					restoredKeyedState = StateAssignmentOperation.getKeyGroupsStateHandles(
+							managedKeyedState,
+							localKeyGroupRange);
+				}
 			}
 		}
 


[14/15] flink git commit: [FLINK-5366] Add Initial version of SavepointUtil

Posted by al...@apache.org.
[FLINK-5366] Add Initial version of SavepointUtil

This will serve as the basis for end-to-end tests of savepoint restore
from Flink 1.1.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/434013af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/434013af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/434013af

Branch: refs/heads/master
Commit: 434013afc91325697a659f736eb816a631659d36
Parents: b43067b
Author: twalthr <tw...@apache.org>
Authored: Wed Dec 14 09:17:54 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../test/checkpointing/utils/SavepointUtil.java | 341 +++++++++++++++++++
 .../utils/UserFunctionStateJob.java             | 113 ++++++
 .../src/test/resources/log4j-test.properties    |   2 +-
 flink-tests/src/test/resources/log4j.properties |  27 ++
 4 files changed, 482 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/434013af/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
new file mode 100644
index 0000000..85e21c5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+public class SavepointUtil {
+
+	// list of JobGraphs to create savepoints for
+	private static final ArrayList<Class<? extends SavepointTestJob>> savepointJobs = new ArrayList<>();
+	static {
+		savepointJobs.add(UserFunctionStateJob.class);
+	}
+
+	private static final Logger LOG = LoggerFactory.getLogger(SavepointUtil.class);
+	private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+	private static final String SAVEPOINT_BASE_DIR = "./flink-tests/src/test/resources/savepoints/";
+
+	private static final int STATE_WAIT_FOR_JOB = 0;
+	private static final int STATE_REQUEST_SAVEPOINT = 1;
+	private static final int STATE_SAVEPOINT_DONE = 2;
+	private static final int STATE_WAIT_FOR_TEST_JOB = 3;
+	private static final int STATE_TEST_RESULT = 4;
+	private static final int STATE_END = 5;
+
+	private static volatile int state = STATE_WAIT_FOR_JOB;
+
+	private static TestingCluster flink = null;
+	private static ActorGateway jobManager = null;
+	private static JobID jobId = null;
+	private static File savepointDir = null;
+	private static Exception testResult = null;
+
+	public static void main(String[] args) throws Exception {
+
+		// clean up
+//		FileUtils.deleteDirectory(new File(SAVEPOINT_BASE_DIR));
+
+		for (Class<? extends SavepointTestJob> testJob : savepointJobs) {
+			SavepointTestJob job = testJob.newInstance();
+
+//			runJobAndCreateSavepoint(job);
+
+			runJobAndCompareState(job);
+
+			triggerEndOfTest();
+		}
+	}
+
+	public static synchronized void triggerSavepoint() {
+		SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
+	}
+
+	public static synchronized boolean allowStateChange() {
+		return SavepointUtil.state < SavepointUtil.STATE_REQUEST_SAVEPOINT;
+	}
+
+	public static synchronized void triggerOrTestSavepoint(RichFunction function, Object expected, Object actual) throws Exception {
+		if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
+			if (expected.equals(actual)) {
+				LOG.info("Test was successful.");
+				SavepointUtil.testResult = null;
+				SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
+			} else {
+				LOG.info("Test failed.");
+				SavepointUtil.testResult = new Exception("Comparison of state failed. Expected: " + expected + " but was: " + actual);
+				SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
+			}
+		} else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
+			final StateCondition condition = new StateCondition(function.getClass(), function.getRuntimeContext().getIndexOfThisSubtask());
+			if (!testCounters.containsKey(condition)) {
+				testCounters.put(condition, 0);
+			}
+			final Integer counter = testCounters.get(condition);
+			testCounters.put(condition, counter + 1);
+			// check if all counters are ready
+			if (checkIfReadyForSavepoint()) {
+				SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
+			}
+		}
+	}
+
+	public static void triggerEndOfTest() throws Exception {
+		LOG.info("Cancelling Flink.");
+		if (flink != null) {
+			flink.stop();
+		}
+		SavepointUtil.state = SavepointUtil.STATE_END;
+	}
+
+	public static void runJobAndCreateSavepoint(SavepointTestJob job) throws Exception {
+		LOG.info("Waiting for job.");
+		SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_JOB;
+
+		final Thread t = new Thread(new SavepointPerformer());
+		t.start();
+
+		runJob(job);
+
+		while(SavepointUtil.state != SavepointUtil.STATE_SAVEPOINT_DONE && DEADLINE.hasTimeLeft()) {
+			Thread.sleep(100);
+		}
+	}
+
+	public static void runJobAndCompareState(SavepointTestJob job) throws Exception {
+		LOG.info("Waiting for test job.");
+		SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_TEST_JOB;
+
+		runJob(job);
+
+		while(SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT && DEADLINE.hasTimeLeft()) {
+			Thread.sleep(100);
+		}
+
+		if (SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT) {
+			throw new Exception("No test result available.");
+		}
+		if (testResult != null) {
+			throw testResult;
+		}
+	}
+
+	public static void setTestResult(Exception e) {
+		SavepointUtil.testResult = e;
+		SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static void runJob(SavepointTestJob job) throws Exception {
+		// Config
+		int numTaskManagers = 2;
+		int numSlotsPerTaskManager = 2;
+		int parallelism = numTaskManagers * numSlotsPerTaskManager;
+		String savepointPath = SAVEPOINT_BASE_DIR + job.getClass().getSimpleName();
+
+		// Flink configuration
+		final Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+
+		final File checkpointDir = File.createTempFile("checkpoints", Long.toString(System.nanoTime()));
+		savepointDir = new File(savepointPath);
+		savepointDir.mkdirs();
+
+		if (!checkpointDir.exists() || !savepointDir.exists()) {
+			throw new Exception("Test setup failed: failed to create (temporary) directories.");
+		}
+
+		LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
+		LOG.info("Created savepoint directory: " + savepointDir + ".");
+
+		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
+		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
+		config.setString("state.savepoints.dir", savepointDir.toURI().toString());
+
+		LOG.info("Flink configuration: " + config + ".");
+
+		// Start Flink
+		flink = new TestingCluster(config);
+		flink.start();
+
+		// Retrieve the job manager
+		jobManager = Await.result(flink.leaderGateway().future(), DEADLINE.timeLeft());
+
+		// Submit the job
+		final JobGraph jobGraph = job.createJobGraph();
+		if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
+			savepointCondition = job.getSavepointCondition();
+			testCounters.clear();
+		} else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
+			final File[] dir = savepointDir.listFiles();
+			if (dir.length == 0) {
+				throw new RuntimeException("Savepoint of " + job.getClass().getSimpleName() + " does not exist.");
+			}
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(dir[0].getAbsolutePath()));
+		}
+		jobId = jobGraph.getJobID();
+
+		LOG.info("Submitting job " + jobGraph.getJobID() + " and waiting...");
+
+		flink.submitJobDetached(jobGraph);
+	}
+
+	private static final HashMap<StateCondition, Integer> testCounters = new HashMap<>();
+	private static SavepointCondition[] savepointCondition = null;
+
+	private static boolean checkIfReadyForSavepoint() {
+		for (SavepointCondition condition : savepointCondition) {
+			final StateCondition stateCondition = new StateCondition(condition.clazz, condition.subtask);
+			if (!testCounters.containsKey(stateCondition) || testCounters.get(stateCondition) != condition.invocation) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static void performSavepointAndShutdown() throws Exception {
+		LOG.info("Triggering a savepoint.");
+
+		// Flink 1.2
+		final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.<String>empty()), DEADLINE.timeLeft());
+		// Flink 1.1
+//        final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId), DEADLINE.timeLeft());
+
+		final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(savepointPathFuture, DEADLINE.timeLeft())).savepointPath();
+		LOG.info("Saved savepoint: " + savepointPath);
+
+		// Retrieve the savepoint from the testing job manager
+		LOG.info("Requesting the savepoint.");
+		Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), DEADLINE.timeLeft());
+
+		Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
+		LOG.info("Retrieved savepoint: " + savepointPath + ".");
+
+		LOG.info("Storing savepoint to file.");
+
+		// Flink 1.2
+		// it might be that the savepoint has already been written to file in Flink 1.2
+		// this is just the command how to do it in 1.2
+//        org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepoint(savepointDir.getAbsolutePath(), savepoint);
+		// Flink 1.1
+		// this writes it for FLink 1.1
+//        Configuration config = new Configuration();
+//        config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+//        config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file://" + savepointDir.getAbsolutePath());
+//        org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
+
+		LOG.info("Cancelling Flink.");
+		flink.stop();
+
+		SavepointUtil.state = SavepointUtil.STATE_SAVEPOINT_DONE;
+	}
+
+	private static class StateCondition {
+		private Class<?> clazz;
+		private Integer subtask;
+
+		StateCondition(Class<?> clazz, Integer subtask) {
+			this.clazz = clazz;
+			this.subtask = subtask;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) return true;
+			if (o == null || getClass() != o.getClass()) return false;
+
+			StateCondition that = (StateCondition) o;
+
+			return clazz.equals(that.clazz) && subtask.equals(that.subtask);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = clazz.hashCode();
+			result = 31 * result + subtask.hashCode();
+			return result;
+		}
+	}
+
+	public static class SavepointCondition {
+		Class<? extends RichFunction> clazz;
+		int subtask;
+		int invocation;
+
+		SavepointCondition(Class<? extends RichFunction> clazz, int subtask, int invocation) {
+			this.clazz = clazz;
+			this.subtask = subtask;
+			this.invocation = invocation;
+		}
+	}
+
+	public interface SavepointTestJob {
+		JobGraph createJobGraph();
+
+		SavepointCondition[] getSavepointCondition();
+	}
+
+	private static class SavepointPerformer implements Runnable {
+
+		@Override
+		public void run() {
+			try {
+				while (SavepointUtil.state != SavepointUtil.STATE_END) {
+					Thread.sleep(100);
+					if (SavepointUtil.state == SavepointUtil.STATE_REQUEST_SAVEPOINT) {
+						try {
+							performSavepointAndShutdown();
+						} catch (Exception e) {
+							throw new RuntimeException("Performing savepoint failed.", e);
+						}
+					}
+				}
+			} catch (InterruptedException e) {
+				// stop execution
+			}
+			LOG.info("SavepointPerformer Thread finished.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/434013af/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
new file mode 100644
index 0000000..1df7938
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointCondition;
+import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointTestJob;
+import org.apache.flink.util.Collector;
+
+public class UserFunctionStateJob implements SavepointTestJob {
+
+	@Override
+	public JobGraph createJobGraph() {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(1);
+		env.setMaxParallelism(1);
+
+		// create source
+		final DataStream<Tuple2<Long, Long>> source = env
+			.addSource(new SourceFunction<Tuple2<Long, Long>>() {
+
+				private volatile boolean isRunning = true;
+
+				@Override
+				public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+					while (isRunning) {
+						synchronized (ctx.getCheckpointLock()) {
+							ctx.collect(new Tuple2<>(1L, 1L));
+						}
+					}
+				}
+
+				@Override
+				public void cancel() {
+					isRunning = false;
+				}
+			}).uid("CustomSourceFunction");
+
+		// non-keyed operator state
+		source.flatMap(new SumFlatMapperNonKeyedCheckpointed()).uid("SumFlatMapperNonKeyedCheckpointed").startNewChain().print();
+
+		return env.getStreamGraph().getJobGraph();
+	}
+
+	@Override
+	public SavepointCondition[] getSavepointCondition() {
+		return new SavepointCondition[] {
+			new SavepointCondition(SumFlatMapperNonKeyedCheckpointed.class, 0, 4)
+		};
+	}
+
+	public static class SumFlatMapperNonKeyedCheckpointed extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<Long, Long>> {
+
+		private transient Tuple2<Long, Long> sum;
+
+		@Override
+		public void restoreState(Tuple2<Long, Long> state) throws Exception {
+			sum = state;
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			if (SavepointUtil.allowStateChange()) {
+				if (sum == null) {
+					sum = value;
+					out.collect(sum);
+				} else {
+					sum.f1 += value.f1;
+					out.collect(sum);
+				}
+			}
+
+			SavepointUtil.triggerOrTestSavepoint(
+				this,
+				new Tuple2<>(value.f1, value.f1 * 4),
+				sum);
+		}
+
+		@Override
+		public Tuple2<Long, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return sum;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/434013af/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index f55d9e2..7bfcee5 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/434013af/flink-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j.properties b/flink-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..9fbcecd
--- /dev/null
+++ b/flink-tests/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n


[08/15] flink git commit: [FLINK-5295] Migrate the AlignedWindowOperators to the WindowOperator.

Posted by al...@apache.org.
[FLINK-5295] Migrate the AlignedWindowOperators to the WindowOperator.

This adds code that lets WindowOperator restore from the Flink 1.1
fast aligned processing-time windows operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0819dc7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0819dc7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0819dc7

Branch: refs/heads/master
Commit: e0819dc7b72487670dd3ba06628980e27fdbedb0
Parents: b0e2a2c
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 17:59:13 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../datastream/LegacyWindowOperatorType.java    |  63 +++++
 .../api/datastream/WindowedStream.java          | 155 +++++------
 ...ractAlignedProcessingTimeWindowOperator.java |   1 +
 ...ccumulatingProcessingTimeWindowOperator.java | 109 +-------
 ...AggregatingProcessingTimeWindowOperator.java |   1 +
 .../operators/windowing/WindowOperator.java     | 259 +++++++++++++++++--
 .../windowing/TimeWindowTranslationTest.java    |   1 +
 .../windowing/WindowingTestHarnessTest.java     |   3 +-
 .../streaming/util/WindowingTestHarness.java    |  14 +-
 .../api/scala/WindowTranslationTest.scala       |  10 +-
 10 files changed, 380 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
new file mode 100644
index 0000000..bb6e4bc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+/**
+ * For specifying what type of window operator was used to create the state
+ * that a {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}
+ * is restoring from. This is used to signal that state written using an aligned processing-time
+ * window operator should be restored.
+ */
+public enum LegacyWindowOperatorType {
+
+	FAST_ACCUMULATING(true, false),
+
+	FAST_AGGREGATING(false, true),
+
+	NONE(false, false);
+
+	// ------------------------------------------------------------------------
+
+	private final boolean fastAccumulating;
+	private final boolean fastAggregating;
+
+	LegacyWindowOperatorType(boolean fastAccumulating, boolean fastAggregating) {
+		this.fastAccumulating = fastAccumulating;
+		this.fastAggregating = fastAggregating;
+	}
+
+	public boolean isFastAccumulating() {
+		return fastAccumulating;
+	}
+
+	public boolean isFastAggregating() {
+		return fastAggregating;
+	}
+
+	@Override
+	public String toString() {
+		if (fastAccumulating) {
+			return "AccumulatingProcessingTimeWindowOperator";
+		} else if (fastAggregating) {
+			return "AggregatingProcessingTimeWindowOperator";
+		} else {
+			return "WindowOperator";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index ad7f371..98bf89a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -49,10 +49,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
@@ -190,16 +187,44 @@ public class WindowedStream<T, K, W extends Window> {
 
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
+		LegacyWindowOperatorType legacyOpType = getLegacyWindowType(function);
+		return reduce(function, new PassThroughWindowFunction<K, W, T>(), legacyOpType);
+	}
 
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "WindowedStream." + callLocation;
-
-		SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
-		if (result != null) {
-			return result;
-		}
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
+		return reduce(reduceFunction, function, LegacyWindowOperatorType.NONE);
+	}
 
-		return reduce(function, new PassThroughWindowFunction<K, W, T>());
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Arriving data is incrementally aggregated using the given reducer.
+	 *
+	 * @param reduceFunction The reduce function that is used for incremental aggregation.
+	 * @param function The window function.
+	 * @param resultType Type information for the result type of the window function
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> SingleOutputStreamOperator<R> reduce(
+		ReduceFunction<T> reduceFunction,
+		WindowFunction<T, R, K, W> function,
+		TypeInformation<R> resultType) {
+		return reduce(reduceFunction, function, resultType, LegacyWindowOperatorType.NONE);
 	}
 
 	/**
@@ -212,14 +237,20 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
+	 * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
+	 *                           the type of the previous operator whose state we inherit.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
+	private <R> SingleOutputStreamOperator<R> reduce(
+			ReduceFunction<T> reduceFunction,
+			WindowFunction<T, R, K, W> function,
+			LegacyWindowOperatorType legacyWindowOpType) {
+
 		TypeInformation<T> inType = input.getType();
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
 			function, WindowFunction.class, true, true, inType, null, false);
 
-		return reduce(reduceFunction, function, resultType);
+		return reduce(reduceFunction, function, resultType, legacyWindowOpType);
 	}
 
 	/**
@@ -232,10 +263,17 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
-	 * @param resultType Type information for the result type of the window function
+	 * @param resultType Type information for the result type of the window function.
+	 * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
+	 *                           the type of the previous operator whose state we inherit.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+	private <R> SingleOutputStreamOperator<R> reduce(
+			ReduceFunction<T> reduceFunction,
+			WindowFunction<T, R, K, W> function,
+			TypeInformation<R> resultType,
+			LegacyWindowOperatorType legacyWindowOpType) {
+
 		if (reduceFunction instanceof RichFunction) {
 			throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
 		}
@@ -288,7 +326,8 @@ public class WindowedStream<T, K, W extends Window> {
 					stateDesc,
 					new InternalSingleValueWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					legacyWindowOpType);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -458,7 +497,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * interpreted as a regular non-windowed stream.
 	 *
 	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
+	 * Note that this function requires that all data in the windows is buffered until the window
 	 * is evaluated, as the function provides no means of incremental aggregation.
 	 *
 	 * @param function The window function.
@@ -473,12 +512,7 @@ public class WindowedStream<T, K, W extends Window> {
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "WindowedStream." + callLocation;
 
-		SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
-		if (result != null) {
-			return result;
-		}
-
-
+		LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function);
 		String opName;
 		KeySelector<T, K> keySel = input.getKeySelector();
 
@@ -519,7 +553,8 @@ public class WindowedStream<T, K, W extends Window> {
 					stateDesc,
 					new InternalIterableWindowFunction<>(function),
 					trigger,
-					allowedLateness);
+					allowedLateness,
+					legacyWindowOpType);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -925,77 +960,21 @@ public class WindowedStream<T, K, W extends Window> {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-			Function function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
+	private LegacyWindowOperatorType getLegacyWindowType(Function function) {
 		if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSlide();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
 			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(), 
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
+				return LegacyWindowOperatorType.FAST_AGGREGATING;
+			} else if (function instanceof WindowFunction) {
+				return LegacyWindowOperatorType.FAST_ACCUMULATING;
 			}
 		} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSize();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
 			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer,
-								input.getKeySelector(),
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
+				return LegacyWindowOperatorType.FAST_AGGREGATING;
+			} else if (function instanceof WindowFunction) {
+				return LegacyWindowOperatorType.FAST_ACCUMULATING;
 			}
 		}
-
-		return null;
+		return LegacyWindowOperatorType.NONE;
 	}
 
 	public StreamExecutionEnvironment getExecutionEnvironment() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 24fd0de..14500ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import static java.util.Objects.requireNonNull;
 
 @Internal
+@Deprecated
 public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function> 
 		extends AbstractUdfStreamOperator<OUT, F> 
 		implements OneInputStreamOperator<IN, OUT>, ProcessingTimeCallback {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 9ea2949..90e4b52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -22,16 +22,15 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
-import java.io.IOException;
 import java.util.ArrayList;
 
 @Internal
+@Deprecated
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
 		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
 
@@ -57,108 +56,4 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 		
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}
-	
-	// ------------------------------------------------------------------------
-	//  Utility Serializer for Lists of Elements
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("ForLoopReplaceableByForEach")
-	private static final class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
-
-		private static final long serialVersionUID = 1119562170939152304L;
-		
-		private final TypeSerializer<T> elementSerializer;
-
-		ArrayListSerializer(TypeSerializer<T> elementSerializer) {
-			this.elementSerializer = elementSerializer;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public TypeSerializer<ArrayList<T>> duplicate() {
-			TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
-			return duplicateElement == elementSerializer ? this : new ArrayListSerializer<T>(duplicateElement);
-		}
-
-		@Override
-		public ArrayList<T> createInstance() {
-			return new ArrayList<>();
-		}
-
-		@Override
-		public ArrayList<T> copy(ArrayList<T> from) {
-			ArrayList<T> newList = new ArrayList<>(from.size());
-			for (int i = 0; i < from.size(); i++) {
-				newList.add(elementSerializer.copy(from.get(i)));
-			}
-			return newList;
-		}
-
-		@Override
-		public ArrayList<T> copy(ArrayList<T> from, ArrayList<T> reuse) {
-			return copy(from);
-		}
-
-		@Override
-		public int getLength() {
-			return -1; // var length
-		}
-
-		@Override
-		public void serialize(ArrayList<T> list, DataOutputView target) throws IOException {
-			final int size = list.size();
-			target.writeInt(size);
-			for (int i = 0; i < size; i++) {
-				elementSerializer.serialize(list.get(i), target);
-			}
-		}
-
-		@Override
-		public ArrayList<T> deserialize(DataInputView source) throws IOException {
-			final int size = source.readInt();
-			final ArrayList<T> list = new ArrayList<>(size);
-			for (int i = 0; i < size; i++) {
-				list.add(elementSerializer.deserialize(source));
-			}
-			return list;
-		}
-
-		@Override
-		public ArrayList<T> deserialize(ArrayList<T> reuse, DataInputView source) throws IOException {
-			return deserialize(source);
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			// copy number of elements
-			final int num = source.readInt();
-			target.writeInt(num);
-			for (int i = 0; i < num; i++) {
-				elementSerializer.copy(source, target);
-			}
-		}
-
-		// --------------------------------------------------------------------
-		
-		@Override
-		public boolean equals(Object obj) {
-			return obj == this || 
-					(obj != null && obj.getClass() == getClass() && 
-						elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return true;
-		}
-
-		@Override
-		public int hashCode() {
-			return elementSerializer.hashCode();
-		}
-	} 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index 9d85cf0..2175fb5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 
 @Internal
+@Deprecated
 public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
 		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, IN, ReduceFunction<IN>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 1cfeba8..990162e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.commons.math3.util.ArithmeticUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.AppendingState;
@@ -35,10 +36,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -47,16 +51,21 @@ import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
 import java.util.PriorityQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -148,11 +157,29 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	// State restored in case of migration from an older version (backwards compatibility)
 	// ------------------------------------------------------------------------
 
-	/** The restored processing time timers. */
-	protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
+	/**
+	 * A flag indicating if we are migrating from a regular {@link WindowOperator}
+	 * or one of the deprecated {@link AccumulatingProcessingTimeWindowOperator} and
+	 * {@link AggregatingProcessingTimeWindowOperator}.
+	 */
+	private final LegacyWindowOperatorType legacyWindowOperatorType;
+
+	/**
+	 * The elements restored when migrating from an older, deprecated
+	 * {@link AccumulatingProcessingTimeWindowOperator} or
+	 * {@link AggregatingProcessingTimeWindowOperator}. */
+	private transient PriorityQueue<StreamRecord<IN>> restoredFromLegacyAlignedOpRecords;
+
+	/**
+	 * The restored processing time timers when migrating from an
+	 * older version of the {@link WindowOperator}.
+	 */
+	private transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
 
-	/** The restored event time timers. */
-	protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
+	/** The restored event time timer when migrating from an
+	 * older version of the {@link WindowOperator}.
+	 */
+	private transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
 
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
@@ -167,6 +194,24 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			Trigger<? super IN, ? super W> trigger,
 			long allowedLateness) {
 
+		this(windowAssigner, windowSerializer, keySelector, keySerializer,
+			windowStateDescriptor, windowFunction, trigger, allowedLateness, LegacyWindowOperatorType.NONE);
+	}
+
+	/**
+	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
+	 */
+	public WindowOperator(
+			WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
+			KeySelector<IN, K> keySelector,
+			TypeSerializer<K> keySerializer,
+			StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
+			InternalWindowFunction<ACC, OUT, K, W> windowFunction,
+			Trigger<? super IN, ? super W> trigger,
+			long allowedLateness,
+			LegacyWindowOperatorType legacyWindowOperatorType) {
+
 		super(windowFunction);
 
 		checkArgument(allowedLateness >= 0);
@@ -181,6 +226,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		this.windowStateDescriptor = windowStateDescriptor;
 		this.trigger = checkNotNull(trigger);
 		this.allowedLateness = allowedLateness;
+		this.legacyWindowOperatorType = legacyWindowOperatorType;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			@SuppressWarnings({"unchecked", "rawtypes"})
@@ -211,26 +257,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			}
 		};
 
-		// if we restore from an older version,
-		// we have to re-register the timers.
-
-		if (restoredFromLegacyEventTimeTimers != null) {
-			for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
-				setCurrentKey(timer.key);
-				internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
-			}
-		}
-
-		if (restoredFromLegacyProcessingTimeTimers != null) {
-			for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
-				setCurrentKey(timer.key);
-				internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
-			}
-		}
-
-		// gc friendliness
-		this.restoredFromLegacyEventTimeTimers = null;
-		this.restoredFromLegacyProcessingTimeTimers = null;
+		registerRestoredLegacyStateState();
 	}
 
 	@Override
@@ -745,17 +772,157 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	//  Restoring / Migrating from an older Flink version.
 	// ------------------------------------------------------------------------
 
+	private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
+
+	private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
+
 	@Override
 	public void restoreState(FSDataInputStream in) throws Exception {
 		super.restoreState(in);
 
-		LOG.info("{} (taskIdx={}) restoring timers from an older Flink version.",
-			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+		LOG.info("{} (taskIdx={}) restoring {} state from an older Flink version.",
+			getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask());
+
+		DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
 
-		restoreTimers(new DataInputViewStreamWrapper(in));
+		switch (legacyWindowOperatorType) {
+			case NONE:
+				restoreFromLegacyWindowOperator(streamWrapper);
+				break;
+			case FAST_ACCUMULATING:
+			case FAST_AGGREGATING:
+				restoreFromLegacyAlignedWindowOperator(streamWrapper);
+				break;
+		}
 	}
 
-	private void restoreTimers(DataInputViewStreamWrapper in) throws IOException {
+	public void registerRestoredLegacyStateState() throws Exception {
+
+		LOG.info("{} (taskIdx={}) re-registering state from an older Flink version.",
+			getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask());
+
+		switch (legacyWindowOperatorType) {
+			case NONE:
+				reregisterStateFromLegacyWindowOperator();
+				break;
+			case FAST_ACCUMULATING:
+			case FAST_AGGREGATING:
+				reregisterStateFromLegacyAlignedWindowOperator();
+				break;
+		}
+	}
+
+	private void restoreFromLegacyAlignedWindowOperator(DataInputViewStreamWrapper in) throws IOException {
+		Preconditions.checkArgument(legacyWindowOperatorType != LegacyWindowOperatorType.NONE);
+
+		final long nextEvaluationTime = in.readLong();
+		final long nextSlideTime = in.readLong();
+
+		validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, in.readInt());
+
+		restoredFromLegacyAlignedOpRecords = new PriorityQueue<>(42,
+			new Comparator<StreamRecord<IN>>() {
+				@Override
+				public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
+					return Long.compare(o1.getTimestamp(), o2.getTimestamp());
+				}
+			}
+		);
+
+		switch (legacyWindowOperatorType) {
+			case FAST_ACCUMULATING:
+				restoreElementsFromLegacyAccumulatingAlignedWindowOperator(in, nextSlideTime);
+				break;
+			case FAST_AGGREGATING:
+				restoreElementsFromLegacyAggregatingAlignedWindowOperator(in, nextSlideTime);
+				break;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} (taskIdx={}) restored {} events from legacy {}.",
+				getClass().getSimpleName(),
+				getRuntimeContext().getIndexOfThisSubtask(),
+				restoredFromLegacyAlignedOpRecords.size(),
+				legacyWindowOperatorType);
+		}
+	}
+
+	private void restoreElementsFromLegacyAccumulatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException {
+		int numPanes = in.readInt();
+		final long paneSize = getPaneSize();
+		long nextElementTimestamp = nextSlideTime - (numPanes * paneSize);
+
+		@SuppressWarnings("unchecked")
+		ArrayListSerializer<IN> ser = new ArrayListSerializer<>((TypeSerializer<IN>) getStateDescriptor().getSerializer());
+
+		while (numPanes > 0) {
+			validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, in.readInt());
+
+			nextElementTimestamp += paneSize - 1; // the -1 is so that the elements fall into the correct time-frame
+
+			final int numElementsInPane = in.readInt();
+			for (int i = numElementsInPane - 1; i >= 0; i--) {
+				K key = keySerializer.deserialize(in);
+
+				@SuppressWarnings("unchecked")
+				List<IN> valueList = ser.deserialize(in);
+				for (IN record: valueList) {
+					restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(record, nextElementTimestamp));
+				}
+			}
+			numPanes--;
+		}
+	}
+
+	private void restoreElementsFromLegacyAggregatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException {
+		int numPanes = in.readInt();
+		final long paneSize = getPaneSize();
+		long nextElementTimestamp = nextSlideTime - (numPanes * paneSize);
+
+		while (numPanes > 0) {
+			validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, in.readInt());
+
+			nextElementTimestamp += paneSize - 1; // the -1 is so that the elements fall into the correct time-frame
+
+			final int numElementsInPane = in.readInt();
+			for (int i = numElementsInPane - 1; i >= 0; i--) {
+				K key = keySerializer.deserialize(in);
+
+				@SuppressWarnings("unchecked")
+				IN value = (IN) getStateDescriptor().getSerializer().deserialize(in);
+				restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(value, nextElementTimestamp));
+			}
+			numPanes--;
+		}
+	}
+
+	private long getPaneSize() {
+		Preconditions.checkArgument(
+			legacyWindowOperatorType == LegacyWindowOperatorType.FAST_ACCUMULATING ||
+				legacyWindowOperatorType == LegacyWindowOperatorType.FAST_AGGREGATING);
+
+		final long paneSlide;
+		if (windowAssigner instanceof SlidingProcessingTimeWindows) {
+			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+			paneSlide = ArithmeticUtils.gcd(timeWindows.getSize(), timeWindows.getSlide());
+		} else {
+			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+			paneSlide = timeWindows.getSize(); // this is valid as windowLength == windowSlide == timeWindows.getSize
+		}
+		return paneSlide;
+	}
+
+	private static void validateMagicNumber(int expected, int found) throws IOException {
+		if (expected != found) {
+			throw new IOException("Corrupt state stream - wrong magic number. " +
+				"Expected '" + Integer.toHexString(expected) +
+				"', found '" + Integer.toHexString(found) + '\'');
+		}
+	}
+
+	private void restoreFromLegacyWindowOperator(DataInputViewStreamWrapper in) throws IOException {
+		Preconditions.checkArgument(legacyWindowOperatorType == LegacyWindowOperatorType.NONE);
+
 		int numWatermarkTimers = in.readInt();
 		this.restoredFromLegacyEventTimeTimers = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
 
@@ -806,6 +973,42 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 	}
 
+	public void reregisterStateFromLegacyWindowOperator() {
+		// if we restore from an older version,
+		// we have to re-register the recovered state.
+
+		if (restoredFromLegacyEventTimeTimers != null) {
+			for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
+				setCurrentKey(timer.key);
+				internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
+			}
+		}
+
+		if (restoredFromLegacyProcessingTimeTimers != null) {
+			for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
+				setCurrentKey(timer.key);
+				internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
+			}
+		}
+
+		// gc friendliness
+		restoredFromLegacyEventTimeTimers = null;
+		restoredFromLegacyProcessingTimeTimers = null;
+	}
+
+	public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception {
+		if (restoredFromLegacyAlignedOpRecords != null) {
+			while (!restoredFromLegacyAlignedOpRecords.isEmpty()) {
+				StreamRecord<IN> record = restoredFromLegacyAlignedOpRecords.poll();
+				setCurrentKey(keySelector.getKey(record.getValue()));
+				processElement(record);
+			}
+
+			// gc friendliness
+			restoredFromLegacyAlignedOpRecords = null;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index c1ad0fc..5aa8151 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -55,6 +55,7 @@ public class TimeWindowTranslationTest {
 	 * conditions are right.
 	 */
 	@Test
+	@Ignore
 	public void testFastTimeWindows() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
index 8e33c92..82c3d71 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.WindowingTestHarness;
 import org.junit.Test;
 
@@ -171,7 +172,7 @@ public class WindowingTestHarnessTest {
 		testHarness.compareActualToExpectedOutput("Output was not correct.");
 
 		// do a snapshot, close and restore again
-		StreamStateHandle snapshot = testHarness.snapshot(0L, 0L);
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
 		testHarness.restore(snapshot);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index 25deb54..efb0d7e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -31,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
@@ -159,20 +159,20 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 	/**
 	 * Takes a snapshot of the current state of the operator. This can be used to test fault-tolerance.
 	 */
-	public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
-		return testHarness.snapshotLegacy(checkpointId, timestamp);
+	public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
+		return testHarness.snapshot(checkpointId, timestamp);
 	}
 
 	/**
-	 * Resumes execution from a provided {@link StreamStateHandle}. This is used to test recovery after a failure.
+	 * Resumes execution from the provided {@link OperatorStateHandles}. This is used to test recovery after a failure.
 	 */
-	public void restore(StreamStateHandle stateHandle) throws Exception {
+	public void restore(OperatorStateHandles stateHandles) throws Exception {
 		Preconditions.checkArgument(!isOpen,
 			"You are trying to restore() while the operator is still open. " +
 				"Please call close() first.");
 
 		testHarness.setup();
-		testHarness.restore(stateHandle);
+		testHarness.initializeState(stateHandles);
 		openOperator();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index c67c215..299932f 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -24,17 +24,16 @@ import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDesc
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, SlidingProcessingTimeWindows, TumblingEventTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
 import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, ProcessingTimeTrigger}
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.{EvictingWindowOperator, WindowOperator, AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator}
+import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator, EvictingWindowOperator, WindowOperator}
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
-
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
@@ -43,6 +42,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
    * conditions are right.
    */
   @Test
+  @Ignore
   def testFastTimeWindows(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 


[05/15] flink git commit: [FLINK-5366] SavepointUtil into SavepointMigrationTestBase/Add Test

Posted by al...@apache.org.
[FLINK-5366] SavepointUtil into SavepointMigrationTestBase/Add Test

This also changes how the savepoint is being performed and now we're
waiting on accumulators to signal that a job is ready for savepointing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cbd9f5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cbd9f5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cbd9f5d

Branch: refs/heads/master
Commit: 2cbd9f5d1ba43059b8bf748f97d2392b1e8f0ab3
Parents: 74df763
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Dec 19 12:18:49 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../utils/SavepointMigrationTestBase.java       | 241 ++++++++
 .../test/checkpointing/utils/SavepointUtil.java | 341 -----------
 .../StatefulUDFSavepointMigrationITCase.java    | 562 +++++++++++++++++++
 .../utils/UserFunctionStateJob.java             | 113 ----
 ...eful-udf-migration-itcase-flink1.1-savepoint | Bin 0 -> 27146 bytes
 ...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 0 -> 22283 bytes
 6 files changed, 803 insertions(+), 454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
new file mode 100644
index 0000000..80a66ac
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static junit.framework.Assert.fail;
+
+public class SavepointMigrationTestBase extends TestBaseUtils {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
+	private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+	protected static final int DEFAULT_PARALLELISM = 4;
+	protected LocalFlinkMiniCluster cluster = null;
+
+	protected static String getResourceFilename(String filename) {
+		ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		if (resource == null) {
+			throw new NullPointerException("Missing snapshot resource.");
+		}
+		return resource.getFile();
+	}
+
+	@Before
+	public void setup() throws Exception {
+
+		// Flink configuration
+		final Configuration config = new Configuration();
+
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+
+		final File checkpointDir = tempFolder.newFolder("checkpoints").getAbsoluteFile();
+		final File savepointDir = tempFolder.newFolder("savepoints").getAbsoluteFile();
+
+		if (!checkpointDir.exists() || !savepointDir.exists()) {
+			throw new Exception("Test setup failed: failed to create (temporary) directories.");
+		}
+
+		LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
+		LOG.info("Created savepoint directory: " + savepointDir + ".");
+
+		config.setString(ConfigConstants.STATE_BACKEND, "memory");
+		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
+		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
+		config.setString("state.savepoints.dir", savepointDir.toURI().toString());
+
+		cluster = TestBaseUtils.startCluster(config, false);
+	}
+
+	@After
+	public void teardown() throws Exception {
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+
+	protected void executeAndSavepoint(
+			StreamExecutionEnvironment env,
+			String savepointPath,
+			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
+
+		// Retrieve the job manager
+		ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
+
+		// Submit the job
+		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+
+		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
+
+		LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());
+
+		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+
+		boolean done = false;
+		while (DEADLINE.hasTimeLeft()) {
+			Thread.sleep(100);
+			Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+
+			boolean allDone = true;
+			for (Tuple2<String, Integer> acc : expectedAccumulators) {
+				Integer numFinished = (Integer) accumulators.get(acc.f0);
+				if (numFinished == null) {
+					allDone = false;
+					break;
+				}
+				if (!numFinished.equals(acc.f1)) {
+					allDone = false;
+					break;
+				}
+			}
+			if (allDone) {
+				done = true;
+				break;
+			}
+		}
+
+		if (!done) {
+			fail("Did not see the expected accumulator results within time limit.");
+		}
+
+		LOG.info("Triggering savepoint.");
+		// Flink 1.2
+		final Future<Object> savepointResultFuture =
+				jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID(), Option.<String>empty()), DEADLINE.timeLeft());
+
+		// Flink 1.1
+//		final Future<Object> savepointResultFuture =
+//				jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID()), DEADLINE.timeLeft());
+
+
+		Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft());
+
+		if (savepointResult instanceof JobManagerMessages.TriggerSavepointFailure) {
+			fail("Error drawing savepoint: " + ((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause());
+		}
+
+		// jobmanager will store savepoint in heap, we have to retrieve it
+		final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
+		LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
+
+		// Flink 1.2
+		FileUtils.moveFile(new File(new URI(jobmanagerSavepointPath).getPath()), new File(savepointPath));
+
+		// Flink 1.1
+		// Retrieve the savepoint from the testing job manager
+//		LOG.info("Requesting the savepoint.");
+//		Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(jobmanagerSavepointPath), DEADLINE.timeLeft());
+//
+//		Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
+//		LOG.info("Retrieved savepoint: " + jobmanagerSavepointPath + ".");
+//
+//		LOG.info("Storing savepoint to file.");
+//		Configuration config = new Configuration();
+//		config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+//		config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file:///Users/aljoscha/Downloads");
+//		String path = org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
+//
+//		FileUtils.moveFile(new File(new URI(path).getPath()), new File(savepointPath));
+	}
+
+	protected void restoreAndExecute(
+			StreamExecutionEnvironment env,
+			String savepointPath,
+			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
+
+		int parallelism = env.getParallelism();
+
+		// Retrieve the job manager
+
+		ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
+
+		// Submit the job
+		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+
+		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
+
+		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
+
+		boolean done = false;
+		while (DEADLINE.hasTimeLeft()) {
+			Thread.sleep(100);
+			Map<String, Object> accumulators = clusterClient.getAccumulators(jobSubmissionResult.getJobID());
+
+			boolean allDone = true;
+			for (Tuple2<String, Integer> acc : expectedAccumulators) {
+				Integer numFinished = (Integer) accumulators.get(acc.f0);
+				if (numFinished == null) {
+					System.out.println("NO ACC FOR " + acc);
+					allDone = false;
+					break;
+				}
+				if (!numFinished.equals(acc.f1)) {
+					System.out.println("TO LOW FOR ACC" + acc);
+					allDone = false;
+					break;
+				}
+			}
+			System.out.println("ACC: " + accumulators);
+			if (allDone) {
+				done = true;
+				break;
+			}
+		}
+
+		if (!done) {
+			fail("Did not see the expected accumulator results within time limit.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
deleted file mode 100644
index 85e21c5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing.utils;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-public class SavepointUtil {
-
-	// list of JobGraphs to create savepoints for
-	private static final ArrayList<Class<? extends SavepointTestJob>> savepointJobs = new ArrayList<>();
-	static {
-		savepointJobs.add(UserFunctionStateJob.class);
-	}
-
-	private static final Logger LOG = LoggerFactory.getLogger(SavepointUtil.class);
-	private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-	private static final String SAVEPOINT_BASE_DIR = "./flink-tests/src/test/resources/savepoints/";
-
-	private static final int STATE_WAIT_FOR_JOB = 0;
-	private static final int STATE_REQUEST_SAVEPOINT = 1;
-	private static final int STATE_SAVEPOINT_DONE = 2;
-	private static final int STATE_WAIT_FOR_TEST_JOB = 3;
-	private static final int STATE_TEST_RESULT = 4;
-	private static final int STATE_END = 5;
-
-	private static volatile int state = STATE_WAIT_FOR_JOB;
-
-	private static TestingCluster flink = null;
-	private static ActorGateway jobManager = null;
-	private static JobID jobId = null;
-	private static File savepointDir = null;
-	private static Exception testResult = null;
-
-	public static void main(String[] args) throws Exception {
-
-		// clean up
-//		FileUtils.deleteDirectory(new File(SAVEPOINT_BASE_DIR));
-
-		for (Class<? extends SavepointTestJob> testJob : savepointJobs) {
-			SavepointTestJob job = testJob.newInstance();
-
-//			runJobAndCreateSavepoint(job);
-
-			runJobAndCompareState(job);
-
-			triggerEndOfTest();
-		}
-	}
-
-	public static synchronized void triggerSavepoint() {
-		SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
-	}
-
-	public static synchronized boolean allowStateChange() {
-		return SavepointUtil.state < SavepointUtil.STATE_REQUEST_SAVEPOINT;
-	}
-
-	public static synchronized void triggerOrTestSavepoint(RichFunction function, Object expected, Object actual) throws Exception {
-		if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
-			if (expected.equals(actual)) {
-				LOG.info("Test was successful.");
-				SavepointUtil.testResult = null;
-				SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
-			} else {
-				LOG.info("Test failed.");
-				SavepointUtil.testResult = new Exception("Comparison of state failed. Expected: " + expected + " but was: " + actual);
-				SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
-			}
-		} else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
-			final StateCondition condition = new StateCondition(function.getClass(), function.getRuntimeContext().getIndexOfThisSubtask());
-			if (!testCounters.containsKey(condition)) {
-				testCounters.put(condition, 0);
-			}
-			final Integer counter = testCounters.get(condition);
-			testCounters.put(condition, counter + 1);
-			// check if all counters are ready
-			if (checkIfReadyForSavepoint()) {
-				SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
-			}
-		}
-	}
-
-	public static void triggerEndOfTest() throws Exception {
-		LOG.info("Cancelling Flink.");
-		if (flink != null) {
-			flink.stop();
-		}
-		SavepointUtil.state = SavepointUtil.STATE_END;
-	}
-
-	public static void runJobAndCreateSavepoint(SavepointTestJob job) throws Exception {
-		LOG.info("Waiting for job.");
-		SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_JOB;
-
-		final Thread t = new Thread(new SavepointPerformer());
-		t.start();
-
-		runJob(job);
-
-		while(SavepointUtil.state != SavepointUtil.STATE_SAVEPOINT_DONE && DEADLINE.hasTimeLeft()) {
-			Thread.sleep(100);
-		}
-	}
-
-	public static void runJobAndCompareState(SavepointTestJob job) throws Exception {
-		LOG.info("Waiting for test job.");
-		SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_TEST_JOB;
-
-		runJob(job);
-
-		while(SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT && DEADLINE.hasTimeLeft()) {
-			Thread.sleep(100);
-		}
-
-		if (SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT) {
-			throw new Exception("No test result available.");
-		}
-		if (testResult != null) {
-			throw testResult;
-		}
-	}
-
-	public static void setTestResult(Exception e) {
-		SavepointUtil.testResult = e;
-		SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	private static void runJob(SavepointTestJob job) throws Exception {
-		// Config
-		int numTaskManagers = 2;
-		int numSlotsPerTaskManager = 2;
-		int parallelism = numTaskManagers * numSlotsPerTaskManager;
-		String savepointPath = SAVEPOINT_BASE_DIR + job.getClass().getSimpleName();
-
-		// Flink configuration
-		final Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-
-		final File checkpointDir = File.createTempFile("checkpoints", Long.toString(System.nanoTime()));
-		savepointDir = new File(savepointPath);
-		savepointDir.mkdirs();
-
-		if (!checkpointDir.exists() || !savepointDir.exists()) {
-			throw new Exception("Test setup failed: failed to create (temporary) directories.");
-		}
-
-		LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
-		LOG.info("Created savepoint directory: " + savepointDir + ".");
-
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
-		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
-		config.setString("state.savepoints.dir", savepointDir.toURI().toString());
-
-		LOG.info("Flink configuration: " + config + ".");
-
-		// Start Flink
-		flink = new TestingCluster(config);
-		flink.start();
-
-		// Retrieve the job manager
-		jobManager = Await.result(flink.leaderGateway().future(), DEADLINE.timeLeft());
-
-		// Submit the job
-		final JobGraph jobGraph = job.createJobGraph();
-		if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
-			savepointCondition = job.getSavepointCondition();
-			testCounters.clear();
-		} else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
-			final File[] dir = savepointDir.listFiles();
-			if (dir.length == 0) {
-				throw new RuntimeException("Savepoint of " + job.getClass().getSimpleName() + " does not exist.");
-			}
-			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(dir[0].getAbsolutePath()));
-		}
-		jobId = jobGraph.getJobID();
-
-		LOG.info("Submitting job " + jobGraph.getJobID() + " and waiting...");
-
-		flink.submitJobDetached(jobGraph);
-	}
-
-	private static final HashMap<StateCondition, Integer> testCounters = new HashMap<>();
-	private static SavepointCondition[] savepointCondition = null;
-
-	private static boolean checkIfReadyForSavepoint() {
-		for (SavepointCondition condition : savepointCondition) {
-			final StateCondition stateCondition = new StateCondition(condition.clazz, condition.subtask);
-			if (!testCounters.containsKey(stateCondition) || testCounters.get(stateCondition) != condition.invocation) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-	private static void performSavepointAndShutdown() throws Exception {
-		LOG.info("Triggering a savepoint.");
-
-		// Flink 1.2
-		final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.<String>empty()), DEADLINE.timeLeft());
-		// Flink 1.1
-//        final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId), DEADLINE.timeLeft());
-
-		final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(savepointPathFuture, DEADLINE.timeLeft())).savepointPath();
-		LOG.info("Saved savepoint: " + savepointPath);
-
-		// Retrieve the savepoint from the testing job manager
-		LOG.info("Requesting the savepoint.");
-		Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), DEADLINE.timeLeft());
-
-		Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
-		LOG.info("Retrieved savepoint: " + savepointPath + ".");
-
-		LOG.info("Storing savepoint to file.");
-
-		// Flink 1.2
-		// it might be that the savepoint has already been written to file in Flink 1.2
-		// this is just the command how to do it in 1.2
-//        org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepoint(savepointDir.getAbsolutePath(), savepoint);
-		// Flink 1.1
-		// this writes it for FLink 1.1
-//        Configuration config = new Configuration();
-//        config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-//        config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file://" + savepointDir.getAbsolutePath());
-//        org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
-
-		LOG.info("Cancelling Flink.");
-		flink.stop();
-
-		SavepointUtil.state = SavepointUtil.STATE_SAVEPOINT_DONE;
-	}
-
-	private static class StateCondition {
-		private Class<?> clazz;
-		private Integer subtask;
-
-		StateCondition(Class<?> clazz, Integer subtask) {
-			this.clazz = clazz;
-			this.subtask = subtask;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) return true;
-			if (o == null || getClass() != o.getClass()) return false;
-
-			StateCondition that = (StateCondition) o;
-
-			return clazz.equals(that.clazz) && subtask.equals(that.subtask);
-		}
-
-		@Override
-		public int hashCode() {
-			int result = clazz.hashCode();
-			result = 31 * result + subtask.hashCode();
-			return result;
-		}
-	}
-
-	public static class SavepointCondition {
-		Class<? extends RichFunction> clazz;
-		int subtask;
-		int invocation;
-
-		SavepointCondition(Class<? extends RichFunction> clazz, int subtask, int invocation) {
-			this.clazz = clazz;
-			this.subtask = subtask;
-			this.invocation = invocation;
-		}
-	}
-
-	public interface SavepointTestJob {
-		JobGraph createJobGraph();
-
-		SavepointCondition[] getSavepointCondition();
-	}
-
-	private static class SavepointPerformer implements Runnable {
-
-		@Override
-		public void run() {
-			try {
-				while (SavepointUtil.state != SavepointUtil.STATE_END) {
-					Thread.sleep(100);
-					if (SavepointUtil.state == SavepointUtil.STATE_REQUEST_SAVEPOINT) {
-						try {
-							performSavepointAndShutdown();
-						} catch (Exception e) {
-							throw new RuntimeException("Performing savepoint failed.", e);
-						}
-					}
-				}
-			} catch (InterruptedException e) {
-				// stop execution
-			}
-			LOG.info("SavepointPerformer Thread finished.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
new file mode 100644
index 0000000..cc21683
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.1 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
+ */
+public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestBase {
+	private static final int NUM_SOURCE_ELEMENTS = 4;
+	private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS";
+	private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS";
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.1.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink11() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		executeAndSavepoint(
+				env,
+				"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint",
+				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.1.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink11WithRocksDB() throws Exception {
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		RocksDBStateBackend rocksBackend =
+				new RocksDBStateBackend(new MemoryStateBackend());
+//		rocksBackend.enableFullyAsyncSnapshots();
+		env.setStateBackend(rocksBackend);
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		executeAndSavepoint(
+				env,
+				"src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb",
+				new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+	}
+
+
+	@Test
+	public void testSavepointRestoreFromFlink11() throws Exception {
+
+		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		restoreAndExecute(
+				env,
+				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
+				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+	}
+
+	@Test
+	public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
+
+		final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+		env.enableCheckpointing(500);
+		env.setParallelism(4);
+		env.setMaxParallelism(4);
+
+		// create source
+		env
+				.addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+				.flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+				.keyBy(0)
+				.flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+				.keyBy(0)
+				.flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+				.keyBy(0)
+				.transform(
+						"custom_operator",
+						new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+						new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+				.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+		restoreAndExecute(
+				env,
+				getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"),
+				new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+	}
+
+	private static class LegacyCheckpointedSource
+			implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
+
+		public static String CHECKPOINTED_STRING = "Here be dragons!";
+
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean isRunning = true;
+
+		private final int numElements;
+
+		public LegacyCheckpointedSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+			synchronized (ctx.getCheckpointLock()) {
+				for (long i = 0; i < numElements; i++) {
+					ctx.collect(new Tuple2<>(i, i));
+				}
+			}
+			while (isRunning) {
+				Thread.sleep(20);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			assertEquals(CHECKPOINTED_STRING, state);
+		}
+
+		@Override
+		public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_STRING;
+		}
+	}
+
+	private static class RestoringCheckingSource
+			extends RichSourceFunction<Tuple2<Long, Long>>
+			implements CheckpointedRestoring<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean isRunning = true;
+
+		private final int numElements;
+
+		private String restoredState;
+
+		public RestoringCheckingSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+			assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+			synchronized (ctx.getCheckpointLock()) {
+				for (long i = 0; i < numElements; i++) {
+					ctx.collect(new Tuple2<>(i, i));
+				}
+			}
+
+			while (isRunning) {
+				Thread.sleep(20);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public void restoreState(String state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+				new Tuple2<>("hello", 42L);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+		}
+
+		@Override
+		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_TUPLE;
+		}
+	}
+
+	public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient Tuple2<String, Long> restoredState;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class LegacyCheckpointedFlatMapWithKeyedState
+			extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+				new Tuple2<>("hello", 42L);
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			getRuntimeContext().getState(stateDescriptor).update(value.f1);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+		}
+
+		@Override
+		public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return CHECKPOINTED_TUPLE;
+		}
+	}
+
+	public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient Tuple2<String, Long> restoredState;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+			if (state == null) {
+				throw new RuntimeException("Missing key value state for " + value);
+			}
+
+			assertEquals(value.f1, state.value());
+			assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void restoreState(Tuple2<String, Long> state) throws Exception {
+			restoredState = state;
+		}
+	}
+
+	public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			getRuntimeContext().getState(stateDescriptor).update(value.f1);
+		}
+	}
+
+	public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Long> stateDescriptor =
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			out.collect(value);
+
+			ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+			if (state == null) {
+				throw new RuntimeException("Missing key value state for " + value);
+			}
+
+			assertEquals(value.f1, state.value());
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+		}
+	}
+
+	public static class CheckpointedUdfOperator
+			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
+
+		public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			output.collect(element);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			output.emitWatermark(mark);
+		}
+
+		// Flink 1.1
+//		@Override
+//		public StreamTaskState snapshotOperatorState(
+//				long checkpointId, long timestamp) throws Exception {
+//			StreamTaskState result = super.snapshotOperatorState(checkpointId, timestamp);
+//
+//			AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(
+//					checkpointId,
+//					timestamp);
+//
+//			out.writeUTF(CHECKPOINTED_STRING);
+//
+//			result.setOperatorState(out.closeAndGetHandle());
+//
+//			return result;
+//		}
+	}
+
+	public static class RestoringCheckingUdfOperator
+			extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+			implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		private static final long serialVersionUID = 1L;
+
+		private String restoredState;
+
+		public RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+			super(userFunction);
+		}
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+		}
+
+		@Override
+		public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+			userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
+
+			assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
+			getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public void restoreState(FSDataInputStream in) throws Exception {
+			super.restoreState(in);
+
+			DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
+
+			restoredState = streamWrapper.readUTF();
+		}
+	}
+
+	public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+		private static final long serialVersionUID = 1L;
+
+		private final String accumulatorName;
+
+		int count = 0;
+
+		public AccumulatorCountingSink(String accumulatorName) {
+			this.accumulatorName = accumulatorName;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			getRuntimeContext().addAccumulator(accumulatorName, new IntCounter());
+		}
+
+		@Override
+		public void invoke(T value) throws Exception {
+			count++;
+			getRuntimeContext().getAccumulator(accumulatorName).add(1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
deleted file mode 100644
index 1df7938..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing.utils;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointCondition;
-import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointTestJob;
-import org.apache.flink.util.Collector;
-
-public class UserFunctionStateJob implements SavepointTestJob {
-
-	@Override
-	public JobGraph createJobGraph() {
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		// we only test memory state backend yet
-		env.setStateBackend(new MemoryStateBackend());
-		env.enableCheckpointing(500);
-		env.setParallelism(1);
-		env.setMaxParallelism(1);
-
-		// create source
-		final DataStream<Tuple2<Long, Long>> source = env
-			.addSource(new SourceFunction<Tuple2<Long, Long>>() {
-
-				private volatile boolean isRunning = true;
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-					while (isRunning) {
-						synchronized (ctx.getCheckpointLock()) {
-							ctx.collect(new Tuple2<>(1L, 1L));
-						}
-					}
-				}
-
-				@Override
-				public void cancel() {
-					isRunning = false;
-				}
-			}).uid("CustomSourceFunction");
-
-		// non-keyed operator state
-		source.flatMap(new SumFlatMapperNonKeyedCheckpointed()).uid("SumFlatMapperNonKeyedCheckpointed").startNewChain().print();
-
-		return env.getStreamGraph().getJobGraph();
-	}
-
-	@Override
-	public SavepointCondition[] getSavepointCondition() {
-		return new SavepointCondition[] {
-			new SavepointCondition(SumFlatMapperNonKeyedCheckpointed.class, 0, 4)
-		};
-	}
-
-	public static class SumFlatMapperNonKeyedCheckpointed extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-			implements Checkpointed<Tuple2<Long, Long>> {
-
-		private transient Tuple2<Long, Long> sum;
-
-		@Override
-		public void restoreState(Tuple2<Long, Long> state) throws Exception {
-			sum = state;
-		}
-
-		@Override
-		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
-			if (SavepointUtil.allowStateChange()) {
-				if (sum == null) {
-					sum = value;
-					out.collect(sum);
-				} else {
-					sum.f1 += value.f1;
-					out.collect(sum);
-				}
-			}
-
-			SavepointUtil.triggerOrTestSavepoint(
-				this,
-				new Tuple2<>(value.f1, value.f1 * 4),
-				sum);
-		}
-
-		@Override
-		public Tuple2<Long, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return sum;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint
new file mode 100644
index 0000000..f2f6dcd
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbd9f5d/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb
new file mode 100644
index 0000000..e63038b
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb differ


[15/15] flink git commit: [FLINK-5317] Make the continuous file processing backwards compatible w/ unit tests.

Posted by al...@apache.org.
[FLINK-5317] Make the continuous file processing backwards compatible w/ unit tests.

This includes both the ContinuousFileMonitoringFunction and the
ContinuousFileReaderOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1220230c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1220230c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1220230c

Branch: refs/heads/master
Commit: 1220230c624234a5fd5e2ef5855aebb294184462
Parents: 2cbd9f5
Author: kl0u <kk...@gmail.com>
Authored: Fri Dec 16 17:52:06 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 16:02:34 2016 +0100

----------------------------------------------------------------------
 .../ContinuousFileProcessingMigrationTest.java  | 402 +++++++++++++++++++
 ...gration-test-1482144479339-flink1.1-snapshot | Bin 0 -> 468 bytes
 .../reader-migration-test-flink1.1-snapshot     | Bin 0 -> 979 bytes
 .../ContinuousFileMonitoringFunction.java       |  35 +-
 .../source/ContinuousFileReaderOperator.java    | 125 +++++-
 5 files changed, 536 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
new file mode 100644
index 0000000..0915005
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+public class ContinuousFileProcessingMigrationTest {
+
+	private static final int NO_OF_FILES = 5;
+	private static final int LINES_PER_FILE = 10;
+
+	private static final long INTERVAL = 100;
+
+	private static File baseDir;
+
+	private static FileSystem hdfs;
+	private static String hdfsURI;
+	private static MiniDFSCluster hdfsCluster;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	//						PREPARING FOR THE TESTS
+
+	@BeforeClass
+	public static void createHDFS() {
+		try {
+			baseDir = tempFolder.newFolder().getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			Configuration hdConf = new Configuration();
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
+
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+			hdfsCluster.shutdown();
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = WindowOperatorTest.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		return resource.getFile();
+	}
+
+	//						END OF PREPARATIONS
+
+	//						TESTS
+
+	@Test
+	public void testReaderSnapshotRestore() throws Exception {
+
+		/*
+
+		FileInputSplit split1 =
+			new FileInputSplit(3, new Path("test/test1"), 0, 100, null);
+		FileInputSplit split2 =
+			new FileInputSplit(2, new Path("test/test2"), 101, 200, null);
+		FileInputSplit split3 =
+			new FileInputSplit(1, new Path("test/test2"), 0, 100, null);
+		FileInputSplit split4 =
+			new FileInputSplit(0, new Path("test/test3"), 0, 100, null);
+
+		final OneShotLatch latch = new OneShotLatch();
+		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
+		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
+		ContinuousFileReaderOperator<FileInputSplit, ?> initReader = new ContinuousFileReaderOperator<>(format);
+		initReader.setOutputType(typeInfo, new ExecutionConfig());
+		OneInputStreamOperatorTestHarness<FileInputSplit, FileInputSplit> initTestInstance =
+			new OneInputStreamOperatorTestHarness<>(initReader);
+		initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		initTestInstance.open();
+		// create some state in the reader
+		initTestInstance.processElement(new StreamRecord<>(split1));
+		initTestInstance.processElement(new StreamRecord<>(split2));
+		initTestInstance.processElement(new StreamRecord<>(split3));
+		initTestInstance.processElement(new StreamRecord<>(split4));
+		// take a snapshot of the operator's state. This will be used
+		// to initialize another reader and compare the results of the
+		// two operators.
+		final StreamTaskState snapshot;
+		synchronized (initTestInstance.getCheckpointLock()) {
+			snapshot = initTestInstance.snapshot(0L, 0L);
+		}
+
+		initTestInstance.snaphotToFile(snapshot, "src/test/resources/reader-migration-test-flink1.1-snapshot");
+
+		*/
+		TimestampedFileInputSplit split1 =
+			new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
+
+		TimestampedFileInputSplit split2 =
+			new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
+
+		TimestampedFileInputSplit split3 =
+			new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
+
+		TimestampedFileInputSplit split4 =
+			new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
+
+
+		final OneShotLatch latch = new OneShotLatch();
+
+		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
+		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+		ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
+		initReader.setOutputType(typeInfo, new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> initTestInstance =
+			new OneInputStreamOperatorTestHarness<>(initReader);
+		initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		initTestInstance.setup();
+		initTestInstance.initializeStateFromLegacyCheckpoint(getResourceFilename("reader-migration-test-flink1.1-snapshot"));
+		initTestInstance.open();
+
+		latch.trigger();
+
+		// ... and wait for the operators to close gracefully
+
+		synchronized (initTestInstance.getCheckpointLock()) {
+			initTestInstance.close();
+		}
+
+		FileInputSplit fsSplit1 = createSplitFromTimestampedSplit(split1);
+		FileInputSplit fsSplit2 = createSplitFromTimestampedSplit(split2);
+		FileInputSplit fsSplit3 = createSplitFromTimestampedSplit(split3);
+		FileInputSplit fsSplit4 = createSplitFromTimestampedSplit(split4);
+
+		// compare if the results contain what they should contain and also if
+		// they are the same, as they should.
+
+		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1)));
+		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2)));
+		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3)));
+		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4)));
+	}
+
+	private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) {
+		Preconditions.checkNotNull(split);
+
+		return new FileInputSplit(
+			split.getSplitNumber(),
+			split.getPath(),
+			split.getStart(),
+			split.getLength(),
+			split.getHostnames()
+		);
+	}
+
+	private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {
+
+		private static final long serialVersionUID = -6727603565381560267L;
+
+		private final OneShotLatch latch;
+
+		private FileInputSplit split;
+
+		private boolean reachedEnd;
+
+		BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
+			super(filePath);
+			this.latch = latch;
+			this.reachedEnd = false;
+		}
+
+		@Override
+		public void open(FileInputSplit fileSplit) throws IOException {
+			this.split = fileSplit;
+			this.reachedEnd = false;
+		}
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			if (!latch.isTriggered()) {
+				try {
+					latch.await();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+			return reachedEnd;
+		}
+
+		@Override
+		public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException {
+			this.reachedEnd = true;
+			return split;
+		}
+
+		@Override
+		public void close() {
+
+		}
+	}
+
+	////				Monitoring Function Tests				//////
+
+	@Test
+	public void testFunctionRestore() throws Exception {
+
+		/*
+		org.apache.hadoop.fs.Path path = null;
+		long fileModTime = Long.MIN_VALUE;
+		for (int i = 0; i < 1; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
+			path = file.f0;
+			fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+
+		final ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, format.getFilePath().toString(), new PathFilter(), FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+		StreamSource<FileInputSplit, ContinuousFileMonitoringFunction<String>> src =
+			new StreamSource<>(monitoringFunction);
+
+		final OneInputStreamOperatorTestHarness<Void, FileInputSplit> testHarness =
+			new OneInputStreamOperatorTestHarness<>(src);
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+
+		final OneShotLatch latch = new OneShotLatch();
+
+		// run the source asynchronously
+		Thread runner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					monitoringFunction.run(new DummySourceContext() {
+						@Override
+						public void collect(FileInputSplit element) {
+							latch.trigger();
+						}
+					});
+				}
+				catch (Throwable t) {
+					t.printStackTrace();
+					error[0] = t;
+				}
+			}
+		};
+		runner.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		StreamTaskState snapshot = testHarness.snapshot(0, 0);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.1-snapshot");
+		monitoringFunction.cancel();
+		runner.join();
+
+		testHarness.close();
+		*/
+
+		Long expectedModTime = Long.parseLong("1482144479339");
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+
+		final ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
+			new StreamSource<>(monitoringFunction);
+
+		final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+			new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("monitoring-function-migration-test-1482144479339-flink1.1-snapshot"));
+		testHarness.open();
+
+		Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());
+
+	}
+
+	///////////				Source Contexts Used by the tests				/////////////////
+
+	private static abstract class DummySourceContext
+		implements SourceFunction.SourceContext<TimestampedFileInputSplit> {
+
+		private final Object lock = new Object();
+
+		@Override
+		public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+	/////////				Auxiliary Methods				/////////////
+
+	/**
+	 * Create a file with pre-determined String format of the form:
+	 * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
+	 * */
+	private Tuple2<org.apache.hadoop.fs.Path, String> createFileAndFillWithData(
+		String base, String fileName, int fileIdx, String sampleLine) throws IOException {
+
+		assert (hdfs != null);
+
+		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+		Assert.assertFalse(hdfs.exists(file));
+
+		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+		FSDataOutputStream stream = hdfs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for (int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+
+		hdfs.rename(tmp, file);
+
+		Assert.assertTrue("No result file present", hdfs.exists(file));
+		return new Tuple2<>(file, str.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot
new file mode 100644
index 0000000..17eba99
Binary files /dev/null and b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot
new file mode 100644
index 0000000..e47ebbd
Binary files /dev/null and b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 8723853..1ec9a70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +65,7 @@ import java.util.TreeMap;
  */
 @Internal
 public class ContinuousFileMonitoringFunction<OUT>
-	extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction {
+	extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction, CheckpointedRestoring<Long> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -92,7 +93,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 	private final FileProcessingMode watchType;
 
 	/** The maximum file modification time seen so far. */
-	private volatile long globalModificationTime;
+	private volatile long globalModificationTime = Long.MIN_VALUE;
 
 	private transient Object checkpointLock;
 
@@ -147,15 +148,25 @@ public class ContinuousFileMonitoringFunction<OUT>
 				retrievedStates.add(entry);
 			}
 
-			// given that the parallelism of the function is 1, we can only have 1 state
-			Preconditions.checkArgument(retrievedStates.size() == 1,
+			// given that the parallelism of the function is 1, we can only have 1 or 0 retrieved items.
+			// the 0 is for the case that we are migrating from a previous Flink version.
+
+			Preconditions.checkArgument(retrievedStates.size() <= 1,
 				getClass().getSimpleName() + " retrieved invalid state.");
 
-			this.globalModificationTime = retrievedStates.get(0);
+			if (retrievedStates.size() == 1 && globalModificationTime != Long.MIN_VALUE) {
+				// this is the case where we have both legacy and new state.
+				// The two should be mutually exclusive for the operator, thus we throw the exception.
+
+				throw new IllegalArgumentException(
+					"The " + getClass().getSimpleName() +" has already restored from a previous Flink version.");
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} retrieved a global mod time of {}.",
-					getClass().getSimpleName(), globalModificationTime);
+			} else if (retrievedStates.size() == 1) {
+				this.globalModificationTime = retrievedStates.get(0);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("{} retrieved a global mod time of {}.",
+						getClass().getSimpleName(), globalModificationTime);
+				}
 			}
 
 		} else {
@@ -357,4 +368,12 @@ public class ContinuousFileMonitoringFunction<OUT>
 			LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime);
 		}
 	}
+
+	@Override
+	public void restoreState(Long state) throws Exception {
+		this.globalModificationTime = state;
+
+		LOG.info("{} (taskIdx={}) restored global modification time from an older Flink version: {}",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), globalModificationTime);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1220230c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index bbe1ea5..6419aa6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -24,22 +24,29 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
@@ -59,7 +66,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
+	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, CheckpointedRestoringOperator {
 
 	private static final long serialVersionUID = 1L;
 
@@ -73,8 +80,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 	private transient SplitReader<OUT> reader;
 	private transient SourceFunction.SourceContext<OUT> readerContext;
 
-	private ListState<TimestampedFileInputSplit> checkpointedState;
-	private List<TimestampedFileInputSplit> restoredReaderState;
+	private transient ListState<TimestampedFileInputSplit> checkpointedState;
+	private transient List<TimestampedFileInputSplit> restoredReaderState;
 
 	public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
 		this.format = checkNotNull(format);
@@ -89,25 +96,27 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
 
-		checkState(this.checkpointedState == null && this.restoredReaderState == null,
-			"The reader state has already been initialized.");
+		checkState(checkpointedState == null,	"The reader state has already been initialized.");
 
 		checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 		if (context.isRestored()) {
-			LOG.info("Restoring state for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx);
+			LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
 
-			this.restoredReaderState = new ArrayList<>();
-			for (TimestampedFileInputSplit split : this.checkpointedState.get()) {
-				this.restoredReaderState.add(split);
-			}
+			// this may not be null in case we migrate from a previous Flink version.
+			if (restoredReaderState == null) {
+				restoredReaderState = new ArrayList<>();
+				for (TimestampedFileInputSplit split : checkpointedState.get()) {
+					restoredReaderState.add(split);
+				}
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("ContinuousFileReaderOperator idx {} restored {}.", subtaskIdx, this.restoredReaderState);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("{} (taskIdx={}) restored {}.", getClass().getSimpleName(), subtaskIdx, restoredReaderState);
+				}
 			}
 		} else {
-			LOG.info("No state to restore for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx);
+			LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
 		}
 	}
 
@@ -379,20 +388,100 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 	public void snapshotState(StateSnapshotContext context) throws Exception {
 		super.snapshotState(context);
 
-		checkState(this.checkpointedState != null,
+		checkState(checkpointedState != null,
 			"The operator state has not been properly initialized.");
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 
-		this.checkpointedState.clear();
-		List<TimestampedFileInputSplit> readerState = this.reader.getReaderState();
+		checkpointedState.clear();
+		List<TimestampedFileInputSplit> readerState = reader.getReaderState();
 		for (TimestampedFileInputSplit split : readerState) {
 			// create a new partition for each entry.
-			this.checkpointedState.add(split);
+			checkpointedState.add(split);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",
+				getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Restoring / Migrating from an older Flink version.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void restoreState(FSDataInputStream in) throws Exception {
+
+		LOG.info("{} (taskIdx={}) restoring state from an older Flink version.",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
+		// this is just to read the byte indicating if we have udf state or not
+		int hasUdfState = in.read();
+
+		Preconditions.checkArgument(hasUdfState == 0);
+
+		final ObjectInputStream ois = new ObjectInputStream(in);
+		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
+
+		// read the split that was being read
+		FileInputSplit currSplit = (FileInputSplit) ois.readObject();
+
+		// read the pending splits list
+		List<FileInputSplit> pendingSplits = new LinkedList<>();
+		int noOfSplits = div.readInt();
+		for (int i = 0; i < noOfSplits; i++) {
+			FileInputSplit split = (FileInputSplit) ois.readObject();
+			pendingSplits.add(split);
+		}
+
+		// read the state of the format
+		Serializable formatState = (Serializable) ois.readObject();
+
+		div.close();
+
+		if (restoredReaderState == null) {
+			restoredReaderState = new ArrayList<>();
+		}
+
+		// we do not know the modification time of the retrieved splits, so we assign them
+		// artificial ones, with the only constraint that they respect the relative order of the
+		// retrieved splits, because modification time is going to be used to sort the splits within
+		// the "pending splits" priority queue.
+
+		long now = getProcessingTimeService().getCurrentProcessingTime();
+		long runningModTime = Math.max(now, noOfSplits + 1);
+
+		TimestampedFileInputSplit currentSplit = createTimestampedFileSplit(currSplit, --runningModTime, formatState);
+		restoredReaderState.add(currentSplit);
+		for (FileInputSplit split : pendingSplits) {
+			TimestampedFileInputSplit timestampedSplit = createTimestampedFileSplit(split, --runningModTime);
+			restoredReaderState.add(timestampedSplit);
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("ContinuousFileReaderOperator idx {} checkpointed {}.", subtaskIdx, readerState);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("{} (taskIdx={}) restored {} splits from legacy: {}.",
+					getClass().getSimpleName(),
+					getRuntimeContext().getIndexOfThisSubtask(),
+					restoredReaderState.size(),
+					restoredReaderState);
+			}
+		}
+	}
+
+	private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime) {
+		return createTimestampedFileSplit(split, modificationTime, null);
+	}
+
+	private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime, Serializable state) {
+		TimestampedFileInputSplit timestampedSplit = new TimestampedFileInputSplit(
+			modificationTime, split.getSplitNumber(), split.getPath(),
+			split.getStart(), split.getLength(), split.getHostnames());
+
+		if (state != null) {
+			timestampedSplit.setSplitState(state);
 		}
+		return timestampedSplit;
 	}
 }


[06/15] flink git commit: [FLINK-5293] Add test for Kafka backwards compatibility

Posted by al...@apache.org.
[FLINK-5293] Add test for Kafka backwards compatibility


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b43067b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b43067b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b43067b6

Branch: refs/heads/master
Commit: b43067b68ad01882d061be49de8bdfa54a07bfd6
Parents: 216653a
Author: kl0u <kk...@gmail.com>
Authored: Mon Dec 19 13:50:31 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           |  18 +-
 .../kafka/internals/AbstractFetcher.java        |   2 +-
 .../FlinkKafkaConsumerBaseMigrationTest.java    | 531 +++++++++++++++++++
 ...ka-consumer-migration-test-flink1.1-snapshot | Bin 0 -> 738 bytes
 ...migration-test-flink1.1-snapshot-empty-state | Bin 0 -> 468 bytes
 5 files changed, 543 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b43067b6/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 13bfe93..2918080 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -321,14 +321,18 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
 		if (context.isRestored()) {
-			restoreToOffset = new HashMap<>();
-			for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
-				restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
-			}
+			if (restoreToOffset == null) {
+				restoreToOffset = new HashMap<>();
+				for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
+					restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
+				}
 
-			LOG.info("Setting restore state in the FlinkKafkaConsumer.");
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Using the following offsets: {}", restoreToOffset);
+				LOG.info("Setting restore state in the FlinkKafkaConsumer.");
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Using the following offsets: {}", restoreToOffset);
+				}
+			} else if (restoreToOffset.isEmpty()) {
+				restoreToOffset = null;
 			}
 		} else {
 			LOG.info("No restore state for FlinkKafkaConsumer.");

http://git-wip-us.apache.org/repos/asf/flink/blob/b43067b6/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index cf39606..821eb03 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -197,7 +197,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * 
 	 * @param snapshotState The offsets for the partitions 
 	 */
-	public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
+	public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) {
 		for (KafkaTopicPartitionState<?> partition : allPartitions) {
 			Long offset = snapshotState.get(partition.getKafkaTopicPartition());
 			if (offset != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b43067b6/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
new file mode 100644
index 0000000..c315d31
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyMapOf;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from snapshots that were
+ * done using the Flink 1.1 {@link FlinkKafkaConsumerBase}.
+ *
+ * <p>For regenerating the binary snapshot file you have to run the commented out portion
+ * of each test on a checkout of the Flink 1.1 branch.
+ */
+public class FlinkKafkaConsumerBaseMigrationTest {
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = WindowOperatorTest.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		if (resource == null) {
+			throw new NullPointerException("Missing snapshot resource.");
+		}
+		return resource.getFile();
+	}
+
+	@Test
+	public void testRestoreFromFlink11WithEmptyStateNoPartitions() throws Exception {
+		// --------------------------------------------------------------------
+		//   prepare fake states
+		// --------------------------------------------------------------------
+
+		final OneShotLatch latch = new OneShotLatch();
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+		doAnswer(new Answer() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				latch.trigger();
+				Assert.fail("This should never be called");
+				return null;
+			}
+		}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class));
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				latch.trigger();
+				Assert.fail("This should never be called");
+				return null;
+			}
+		}).when(fetcher).runFetchLoop();
+
+		final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(
+				new FetcherFactory<String>() {
+					private static final long serialVersionUID = -2803131905656983619L;
+
+					@Override
+					public AbstractFetcher<String, ?> createFetcher() {
+						return fetcher;
+					}
+				},
+				Collections.<KafkaTopicPartition>emptyList());
+
+		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
+			new StreamSource<>(consumerFunction);
+
+		final AbstractStreamOperatorTestHarness<String> testHarness =
+			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
+
+		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(
+			getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+
+		// run the source asynchronously
+		Thread runner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					consumerFunction.run(new DummySourceContext() {
+						@Override
+						public void collect(String element) {
+							latch.trigger();
+							Assert.fail("This should never be called.");
+						}
+
+						@Override
+						public void emitWatermark(Watermark mark) {
+							latch.trigger();
+							assertEquals(Long.MAX_VALUE, mark.getTimestamp());
+						}
+					});
+				}
+				catch (Throwable t) {
+					t.printStackTrace();
+					error[0] = t;
+				}
+			}
+		};
+		runner.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		consumerOperator.close();
+
+		consumerOperator.cancel();
+		runner.interrupt();
+		runner.join();
+
+		Assert.assertNull(error[0]);
+	}
+
+	@Test
+	public void testRestoreFromFlink11WithEmptyStateWithPartitions() throws Exception {
+		final OneShotLatch latch = new OneShotLatch();
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+		doAnswer(new Answer() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				latch.trigger();
+				Assert.fail("This should never be called");
+				return null;
+			}
+		}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class));
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				latch.trigger();
+				return null;
+			}
+		}).when(fetcher).runFetchLoop();
+
+		final List<KafkaTopicPartition> partitions = new ArrayList<>();
+		partitions.add(new KafkaTopicPartition("abc", 13));
+		partitions.add(new KafkaTopicPartition("def", 7));
+
+		final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(
+				new FetcherFactory<String>() {
+					private static final long serialVersionUID = -2803131905656983619L;
+
+					@Override
+					public AbstractFetcher<String, ?> createFetcher() {
+						return fetcher;
+					}
+				},
+				partitions);
+
+		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
+			new StreamSource<>(consumerFunction);
+
+		final AbstractStreamOperatorTestHarness<String> testHarness =
+			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
+
+		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(
+			getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+
+		// run the source asynchronously
+		Thread runner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					consumerFunction.run(new DummySourceContext() {
+						@Override
+						public void collect(String element) {
+							latch.trigger();
+							Assert.fail("This should never be called.");
+						}
+
+						@Override
+						public void emitWatermark(Watermark mark) {
+							latch.trigger();
+							Assert.fail("This should never be called.");
+						}
+					});
+				}
+				catch (Throwable t) {
+					t.printStackTrace();
+					error[0] = t;
+				}
+			}
+		};
+		runner.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		consumerOperator.close();
+
+		runner.join();
+
+		Assert.assertNull(error[0]);
+	}
+
+	@Test
+	public void testRestoreFromFlink11() throws Exception {
+		// --------------------------------------------------------------------
+		//   prepare fake states
+		// --------------------------------------------------------------------
+
+		final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
+		state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+		state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+		final OneShotLatch latch = new OneShotLatch();
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+		doAnswer(new Answer() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				Map<KafkaTopicPartition, Long> map = (HashMap<KafkaTopicPartition, Long>) invocationOnMock.getArguments()[0];
+
+				latch.trigger();
+				assertEquals(state1, map);
+				return null;
+			}
+		}).when(fetcher).restoreOffsets(anyMapOf(KafkaTopicPartition.class, Long.class));
+
+
+		final List<KafkaTopicPartition> partitions = new ArrayList<>();
+		partitions.add(new KafkaTopicPartition("abc", 13));
+		partitions.add(new KafkaTopicPartition("def", 7));
+
+		final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(
+				new FetcherFactory<String>() {
+					private static final long serialVersionUID = -2803131905656983619L;
+
+					@Override
+					public AbstractFetcher<String, ?> createFetcher() {
+						return fetcher;
+					}
+				},
+				partitions);
+
+		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
+			new StreamSource<>(consumerFunction);
+
+		final AbstractStreamOperatorTestHarness<String> testHarness =
+			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
+
+		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(
+			getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+
+		// run the source asynchronously
+		Thread runner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					consumerFunction.run(new DummySourceContext() {
+						@Override
+						public void collect(String element) {
+							//latch.trigger();
+						}
+					});
+				}
+				catch (Throwable t) {
+					t.printStackTrace();
+					error[0] = t;
+				}
+			}
+		};
+		runner.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		consumerOperator.close();
+
+		runner.join();
+
+		Assert.assertNull(error[0]);
+	}
+
+	private abstract static class DummySourceContext
+		implements SourceFunction.SourceContext<String> {
+
+		private final Object lock = new Object();
+
+		@Override
+		public void collectWithTimestamp(String element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private interface FetcherFactory<T> extends Serializable {
+		AbstractFetcher<T, ?> createFetcher();
+	}
+
+	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
+		private static final long serialVersionUID = 1L;
+
+		private final FetcherFactory<T> fetcherFactory;
+
+		private final List<KafkaTopicPartition> partitions;
+
+		@SuppressWarnings("unchecked")
+		DummyFlinkKafkaConsumer(
+				FetcherFactory<T> fetcherFactory,
+				List<KafkaTopicPartition> partitions) {
+			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
+			this.fetcherFactory = fetcherFactory;
+			this.partitions = partitions;
+		}
+
+		@Override
+		protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
+			return fetcherFactory.createFetcher();
+		}
+
+		@Override
+		protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
+			return partitions;
+		}
+	}
+}
+
+/*
+	THE CODE FOR FLINK 1.1
+
+	@Test
+	public void testRestoreFromFlink11() throws Exception {
+		// --------------------------------------------------------------------
+		//   prepare fake states
+		// --------------------------------------------------------------------
+
+		final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
+		state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+		state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+		final OneShotLatch latch = new OneShotLatch();
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				latch.trigger();
+				return null;
+			}
+		}).when(fetcher).runFetchLoop();
+
+		when(fetcher.snapshotCurrentState()).thenReturn(state1);
+
+		final DummyFlinkKafkaConsumer<String> consumerFunction = new DummyFlinkKafkaConsumer<>(
+			new FetcherFactory<String>() {
+				private static final long serialVersionUID = -2803131905656983619L;
+
+				@Override
+				public AbstractFetcher<String, ?> createFetcher() {
+					return fetcher;
+				}
+			});
+
+		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
+			new StreamSource<>(consumerFunction);
+
+		final OneInputStreamOperatorTestHarness<Void, String> testHarness =
+			new OneInputStreamOperatorTestHarness<>(consumerOperator);
+
+		testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		testHarness.setup();
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+
+		// run the source asynchronously
+		Thread runner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					consumerFunction.run(new DummySourceContext() {
+						@Override
+						public void collect(String element) {
+							latch.trigger();
+						}
+					});
+				}
+				catch (Throwable t) {
+					t.printStackTrace();
+					error[0] = t;
+				}
+			}
+		};
+		runner.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-2");
+		consumerOperator.run(new Object());
+
+		consumerOperator.close();
+		runner.join();
+
+		System.out.println("Killed");
+	}
+
+	private static abstract class DummySourceContext
+		implements SourceFunction.SourceContext<String> {
+
+		private final Object lock = new Object();
+
+		@Override
+		public void collectWithTimestamp(String element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
+
+	private interface FetcherFactory<T> extends Serializable {
+		AbstractFetcher<T, ?> createFetcher();
+	}
+
+	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
+		private static final long serialVersionUID = 1L;
+
+		private final FetcherFactory<T> fetcherFactory;
+
+		@SuppressWarnings("unchecked")
+		public DummyFlinkKafkaConsumer(FetcherFactory<T> fetcherFactory) {
+			super((KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
+
+			final List<KafkaTopicPartition> partitions = new ArrayList<>();
+			partitions.add(new KafkaTopicPartition("dummy-topic", 0));
+			setSubscribedPartitions(partitions);
+
+			this.fetcherFactory = fetcherFactory;
+		}
+
+		@Override
+		protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
+			return fetcherFactory.createFetcher();
+		}
+	}
+* */

http://git-wip-us.apache.org/repos/asf/flink/blob/b43067b6/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot
new file mode 100644
index 0000000..01c9e03
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/b43067b6/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state
new file mode 100644
index 0000000..f4dd96d
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state differ


[10/15] flink git commit: [FLINK-5294] Test aggregating aligned window op restore from 1.1

Posted by al...@apache.org.
[FLINK-5294] Test aggregating aligned window op restore from 1.1


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/074d80c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/074d80c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/074d80c8

Branch: refs/heads/master
Commit: 074d80c834765f2aa659ea27ff647b99f760a06a
Parents: ac134d6
Author: kl0u <kk...@gmail.com>
Authored: Fri Dec 16 16:14:26 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../windowing/WindowOperatorMigrationTest.java  |  98 +++++++++++++++++++
 ...igration-test-aggr-aligned-flink1.1-snapshot | Bin 0 -> 586 bytes
 2 files changed, 98 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/074d80c8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index c02100d..fd38881 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
@@ -566,6 +567,103 @@ public class WindowOperatorMigrationTest {
 		testHarness.close();
 	}
 
+	@Test
+	public void testRestoreAlignedProcessingTimeWindowsFromFlink11() throws Exception {
+		/*
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		AggregatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>> operator =
+			new AggregatingProcessingTimeWindowOperator<>(
+				new ReduceFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = -8913160567151867987L;
+
+					@Override
+					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+						return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
+					}
+				},
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				inputType.createSerializer(new ExecutionConfig()),
+				3000,
+				3000);
+
+		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		testTimeProvider.setCurrentTime(3);
+
+		// timestamp is ignored in processing time
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot");
+		testHarness.close();
+
+		*/
+
+		final int WINDOW_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+				new WindowOperatorTest.SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+				ProcessingTimeTrigger.create(),
+				0,
+				LegacyWindowOperatorType.FAST_AGGREGATING);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint("src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot");
+		testHarness.open();
+
+		testHarness.setProcessingTime(5000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+
+		testHarness.setProcessingTime(7000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.close();
+	}
+
 	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/074d80c8/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot
new file mode 100644
index 0000000..c086a91
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot differ


[13/15] flink git commit: [FLINK-5294] Test accumulating aligned window op restore from 1.1

Posted by al...@apache.org.
[FLINK-5294] Test accumulating aligned window op restore from 1.1


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49f1a038
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49f1a038
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49f1a038

Branch: refs/heads/master
Commit: 49f1a0383923452676ee380cebe8a889fd40c80c
Parents: 074d80c
Author: kl0u <kk...@gmail.com>
Authored: Fri Dec 16 17:22:53 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../windowing/WindowOperatorMigrationTest.java  | 105 ++++++++++++++++++-
 ...gration-test-accum-aligned-flink1.1-snapshot | Bin 0 -> 621 bytes
 2 files changed, 104 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49f1a038/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index fd38881..b7d5928 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -568,7 +568,7 @@ public class WindowOperatorMigrationTest {
 	}
 
 	@Test
-	public void testRestoreAlignedProcessingTimeWindowsFromFlink11() throws Exception {
+	public void testRestoreAggregatingAlignedProcessingTimeWindowsFromFlink11() throws Exception {
 		/*
 		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
 
@@ -664,6 +664,109 @@ public class WindowOperatorMigrationTest {
 		testHarness.close();
 	}
 
+	@Test
+	public void testRestoreAccumulatingAlignedProcessingTimeWindowsFromFlink11() throws Exception {
+		/*
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		AccumulatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>> operator =
+			new AccumulatingProcessingTimeWindowOperator<>(
+				new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+
+					private static final long serialVersionUID = 6551516443265733803L;
+
+					@Override
+					public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
+						int sum = 0;
+						for (Tuple2<String, Integer> anInput : input) {
+							sum += anInput.f1;
+						}
+						out.collect(new Tuple2<>(s, sum));
+					}
+				},
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				inputType.createSerializer(new ExecutionConfig()),
+				3000,
+				3000);
+
+		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		testTimeProvider.setCurrentTime(3);
+
+		// timestamp is ignored in processing time
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot");
+		testHarness.close();
+
+		*/
+		final int WINDOW_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+				new WindowOperatorTest.SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+				ProcessingTimeTrigger.create(),
+				0,
+				LegacyWindowOperatorType.FAST_ACCUMULATING);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint("src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot");
+		testHarness.open();
+
+		testHarness.setProcessingTime(5000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+
+		testHarness.setProcessingTime(7000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.close();
+	}
+
+
 	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/49f1a038/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot
new file mode 100644
index 0000000..08ee86c
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot differ


[12/15] flink git commit: [hotfix] Fix compatibility check in RegisteredBackendStateMetaInfo

Posted by al...@apache.org.
[hotfix] Fix compatibility check in RegisteredBackendStateMetaInfo

This was to strict. RocksDB initializes with null namespace Serializer
and when we have the actual namespace serializer the check fails.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1eaa1ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1eaa1ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1eaa1ee

Branch: refs/heads/master
Commit: d1eaa1ee41728e6d788f1e914cb0568a874a6f32
Parents: 434013a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Dec 19 19:48:18 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../state/RegisteredBackendStateMetaInfo.java   | 21 ++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1eaa1ee/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
index 62418c3..0c50486 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
@@ -20,7 +20,8 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
@@ -49,8 +50,8 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<S> stateSerializer) {
 
-		this.stateType = Preconditions.checkNotNull(stateType);
-		this.name = Preconditions.checkNotNull(name);
+		this.stateType = checkNotNull(stateType);
+		this.name = checkNotNull(name);
 		this.namespaceSerializer = namespaceSerializer;
 		this.stateSerializer = stateSerializer;
 	}
@@ -91,7 +92,9 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 			return false;
 		}
 
-		return namespaceSerializer.isCompatibleWith(other.namespaceSerializer)
+		return ((namespaceSerializer == null && other.namespaceSerializer == null)
+					|| namespaceSerializer == null || other.namespaceSerializer == null
+					|| namespaceSerializer.isCompatibleWith(other.namespaceSerializer))
 				&& stateSerializer.isCompatibleWith(other.stateSerializer);
 	}
 
@@ -122,6 +125,16 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 	}
 
 	@Override
+	public String toString() {
+		return "RegisteredBackendStateMetaInfo{" +
+				"stateType=" + stateType +
+				", name='" + name + '\'' +
+				", namespaceSerializer=" + namespaceSerializer +
+				", stateSerializer=" + stateSerializer +
+				'}';
+	}
+
+	@Override
 	public int hashCode() {
 		int result = getName().hashCode();
 		result = 31 * result + getStateType().hashCode();


[04/15] flink git commit: [FLINK-5293] Make Kafka consumer backwards compatible with 1.1 snapshots

Posted by al...@apache.org.
[FLINK-5293] Make Kafka consumer backwards compatible with 1.1 snapshots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/216653ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/216653ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/216653ad

Branch: refs/heads/master
Commit: 216653ad5864302dbcda4be5b88a83c4c039a05c
Parents: 49f1a03
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 16:46:33 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumerBase.java  | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/216653ad/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index aef7116..13bfe93 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -65,7 +66,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
 		CheckpointListener,
 		ResultTypeQueryable<T>,
-		CheckpointedFunction {
+		CheckpointedFunction,
+		CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {
+
 	private static final long serialVersionUID = -6272159445203409112L;
 
 	protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
@@ -382,6 +385,19 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
+	public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
+		LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
+		restoreToOffset = restoredOffsets;
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
+				getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
+		}
+	}
+
+	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
 		if (!running) {
 			LOG.debug("notifyCheckpointComplete() called on closed source");


[03/15] flink git commit: [FLINK-5292] Add CheckpointedRestoringOperator interface.

Posted by al...@apache.org.
[FLINK-5292] Add CheckpointedRestoringOperator interface.

This breaks the StreamCheckpointedOperator interface into the
checkpointing and restoring part. The restoring part is meant for
operators that need to restore from legacy snapshots done using Flink
1.1


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32f300fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32f300fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32f300fd

Branch: refs/heads/master
Commit: 32f300fd2c95e3cd94c7edb52665f90c0fa281ed
Parents: bfdaa38
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 18:39:24 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:53 2016 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |  5 +--
 .../CheckpointedRestoringOperator.java          | 46 ++++++++++++++++++++
 .../operators/StreamCheckpointedOperator.java   | 22 +---------
 3 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/32f300fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 1c27293..a21660c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -141,7 +141,6 @@ public abstract class AbstractStreamOperator<OUT>
 	// ---------------- timers ------------------
 
 	private transient Map<String, HeapInternalTimerService<?, ?>> timerServices;
-//	private transient Map<String, HeapInternalTimerService<?, ?>> restoredServices;
 
 
 	// ---------------- two-input operator watermarks ------------------
@@ -230,7 +229,7 @@ public abstract class AbstractStreamOperator<OUT>
 	private void restoreStreamCheckpointed(OperatorStateHandles stateHandles) throws Exception {
 		StreamStateHandle state = stateHandles.getLegacyOperatorState();
 		if (null != state) {
-			if (this instanceof StreamCheckpointedOperator) {
+			if (this instanceof CheckpointedRestoringOperator) {
 
 				LOG.debug("Restore state of task {} in chain ({}).",
 						stateHandles.getOperatorChainIndex(), getContainingTask().getName());
@@ -238,7 +237,7 @@ public abstract class AbstractStreamOperator<OUT>
 				FSDataInputStream is = state.openInputStream();
 				try {
 					getContainingTask().getCancelables().registerClosable(is);
-					((StreamCheckpointedOperator) this).restoreState(is);
+					((CheckpointedRestoringOperator) this).restoreState(is);
 				} finally {
 					getContainingTask().getCancelables().unregisterClosable(is);
 					is.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/32f300fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
new file mode 100644
index 0000000..20eb1cf
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Interface for {@link StreamOperator StreamOperators} that can restore from a Flink 1.1
+ * legacy snapshot that was done using the {@link StreamCheckpointedOperator} interface.
+ */
+@Deprecated
+public interface CheckpointedRestoringOperator {
+
+	/**
+	 * Restores the operator state, if this operator's execution is recovering from a checkpoint.
+	 * This method restores the operator state (if the operator is stateful) and the key/value state
+	 * (if it had been used and was initialized when the snapshot occurred).
+	 *
+	 * <p>This method is called after {@link StreamOperator#setup(StreamTask, StreamConfig, Output)}
+	 * and before {@link StreamOperator#open()}.
+	 *
+	 * @param in The stream from which we have to restore our state.
+	 *
+	 * @throws Exception Exceptions during state restore should be forwarded, so that the system can
+	 *                   properly react to failed state restore and fail the execution attempt.
+	 */
+	void restoreState(FSDataInputStream in) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/32f300fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
index d2f7e0d..f93e7ea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 @Deprecated
-public interface StreamCheckpointedOperator {
+public interface StreamCheckpointedOperator extends CheckpointedRestoringOperator {
 
 	/**
 	 * Called to draw a state snapshot from the operator. This method snapshots the operator state
@@ -39,19 +36,4 @@ public interface StreamCheckpointedOperator {
 	 */
 	void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception;
 
-	/**
-	 * Restores the operator state, if this operator's execution is recovering from a checkpoint.
-	 * This method restores the operator state (if the operator is stateful) and the key/value state
-	 * (if it had been used and was initialized when the snapshot occurred).
-	 *
-	 * <p>This method is called after {@link StreamOperator#setup(StreamTask, StreamConfig, Output)}
-	 * and before {@link StreamOperator#open()}.
-	 *
-	 * @param in The stream from which we have to restore our state.
-	 *
-	 * @throws Exception Exceptions during state restore should be forwarded, so that the system can
-	 *                   properly react to failed state restore and fail the execution attempt.
-	 */
-	void restoreState(FSDataInputStream in) throws Exception;
-
-}
\ No newline at end of file
+}


[02/15] flink git commit: [FLINK-5292] Expose some SavepointV0Serializer methods for use in tests

Posted by al...@apache.org.
[FLINK-5292] Expose some SavepointV0Serializer methods for use in tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/896fbaef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/896fbaef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/896fbaef

Branch: refs/heads/master
Commit: 896fbaefb3801302fe7b0e60215ad69b2457ddee
Parents: 32f300f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Dec 16 10:56:20 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:53 2016 +0100

----------------------------------------------------------------------
 .../savepoint/SavepointV0Serializer.java          | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/896fbaef/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index 6c6a8f6..9e37dbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -252,7 +252,11 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
 				null);
 	}
 
-	private StreamStateHandle convertOperatorAndFunctionState(StreamTaskState streamTaskState) throws Exception {
+	/**
+	 * This is public so that we can use it when restoring a legacy snapshot
+	 * in {@code AbstractStreamOperatorTestHarness}.
+	 */
+	public static StreamStateHandle convertOperatorAndFunctionState(StreamTaskState streamTaskState) throws Exception {
 
 		List<StreamStateHandle> mergeStateHandles = new ArrayList<>(4);
 
@@ -273,7 +277,11 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
 		return new MigrationStreamStateHandle(new MultiStreamStateHandle(mergeStateHandles));
 	}
 
-	private KeyGroupsStateHandle convertKeyedBackendState(
+	/**
+	 * This is public so that we can use it when restoring a legacy snapshot
+	 * in {@code AbstractStreamOperatorTestHarness}.
+	 */
+	public static KeyGroupsStateHandle convertKeyedBackendState(
 			HashMap<String, KvStateSnapshot<?, ?, ?, ?>> oldKeyedState,
 			int parallelInstanceIdx,
 			long checkpointID) throws Exception {
@@ -327,7 +335,11 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
 		return 0;
 	}
 
-	private static StreamStateHandle convertStateHandle(StateHandle<?> oldStateHandle) throws Exception {
+	/**
+	 * This is public so that we can use it when restoring a legacy snapshot
+	 * in {@code AbstractStreamOperatorTestHarness}.
+	 */
+	public static StreamStateHandle convertStateHandle(StateHandle<?> oldStateHandle) throws Exception {
 		if (oldStateHandle instanceof AbstractFileStateHandle) {
 			Path path = ((AbstractFileStateHandle) oldStateHandle).getFilePath();
 			return new FileStateHandle(path, oldStateHandle.getStateSize());


[09/15] flink git commit: [FLINK-5294] Add tests for WindowOperator restore from 1.1 snapshot

Posted by al...@apache.org.
[FLINK-5294] Add tests for WindowOperator restore from 1.1 snapshot


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac134d61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac134d61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac134d61

Branch: refs/heads/master
Commit: ac134d615ddace4c700cc5b0010b49c9a91611fe
Parents: e0819dc
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Dec 16 14:05:18 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../windowing/WindowOperatorMigrationTest.java  | 687 +++++++++++++++++++
 ...tion-test-apply-event-time-flink1.1-snapshot | Bin 0 -> 2427 bytes
 ...test-apply-processing-time-flink1.1-snapshot | Bin 0 -> 2327 bytes
 ...ion-test-reduce-event-time-flink1.1-snapshot | Bin 0 -> 2457 bytes
 ...est-reduce-processing-time-flink1.1-snapshot | Bin 0 -> 2406 bytes
 ...sion-with-stateful-trigger-flink1.1-snapshot | Bin 0 -> 2988 bytes
 ...with-stateful-trigger-mint-flink1.1-snapshot | Bin 0 -> 538 bytes
 7 files changed, 687 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac134d61/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
new file mode 100644
index 0000000..c02100d
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for checking whether {@link WindowOperator} can restore from snapshots that were done
+ * using the Flink 1.1 {@link WindowOperator}.
+ *
+ * <p>
+ * This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink 1.1
+ * aligned processing-time windows operator.
+ *
+ * <p>For regenerating the binary snapshot file you have to run the commented out portion
+ * of each test on a checkout of the Flink 1.1 branch.
+ */
+public class WindowOperatorMigrationTest {
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = WindowOperatorTest.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		if (resource == null) {
+			throw new NullPointerException("Missing snapshot resource.");
+		}
+		return resource.getFile();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testRestoreSessionWindowsWithCountTriggerFromFlink11() throws Exception {
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
+				PurgingTrigger.of(CountTrigger.of(4)),
+				0);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		/*
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+
+		// do snapshot and save to file
+		StreamTaskState snapshot = testHarness.snapshot(0, 0);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot");
+		testHarness.close();
+        */
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
+				"win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot"));
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
+
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		// add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	/**
+	 * This checks that we can restore from a virgin {@code WindowOperator} that has never seen
+	 * any elements.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testRestoreSessionWindowsWithCountTriggerInMintConditionFromFlink11() throws Exception {
+
+		final int SESSION_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
+				PurgingTrigger.of(CountTrigger.of(4)),
+				0);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		/*
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		// do snapshot and save to file
+		StreamTaskState snapshot = testHarness.snapshot(0, 0);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot");
+		testHarness.close();
+		*/
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
+				"win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot"));
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		// add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testRestoreReducingEventTimeWindowsFromFlink11() throws Exception {
+		final int WINDOW_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+				new SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+				EventTimeTrigger.create(),
+				0);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		/*
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+
+		testHarness.processWatermark(new Watermark(999));
+		expectedOutput.add(new Watermark(999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(1999));
+		expectedOutput.add(new Watermark(1999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		// do snapshot and save to file
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot");
+		testHarness.close();
+
+		*/
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
+				"win-op-migration-test-reduce-event-time-flink1.1-snapshot"));
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+		expectedOutput.add(new Watermark(2999));
+
+		testHarness.processWatermark(new Watermark(3999));
+		expectedOutput.add(new Watermark(3999));
+
+		testHarness.processWatermark(new Watermark(4999));
+		expectedOutput.add(new Watermark(4999));
+
+		testHarness.processWatermark(new Watermark(5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
+		expectedOutput.add(new Watermark(5999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testRestoreApplyEventTimeWindowsFromFlink11() throws Exception {
+		final int WINDOW_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
+				EventTimeTrigger.create(),
+				0);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		/*
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
+
+		testHarness.processWatermark(new Watermark(999));
+		expectedOutput.add(new Watermark(999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(1999));
+		expectedOutput.add(new Watermark(1999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		// do snapshot and save to file
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot");
+		testHarness.close();
+
+		*/
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
+				"win-op-migration-test-apply-event-time-flink1.1-snapshot"));
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+		expectedOutput.add(new Watermark(2999));
+
+		testHarness.processWatermark(new Watermark(3999));
+		expectedOutput.add(new Watermark(3999));
+
+		testHarness.processWatermark(new Watermark(4999));
+		expectedOutput.add(new Watermark(4999));
+
+		testHarness.processWatermark(new Watermark(5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
+		expectedOutput.add(new Watermark(5999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testRestoreReducingProcessingTimeWindowsFromFlink11() throws Exception {
+		final int WINDOW_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+				new WindowOperatorTest.SumReducer(),
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+				ProcessingTimeTrigger.create(),
+				0);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		/*
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), timeServiceProvider);
+
+		testHarness.configureForKeyedStream(new WindowOperatorTest.TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		timeServiceProvider.setCurrentTime(10);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+
+		timeServiceProvider.setCurrentTime(3010);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1)));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new WindowOperatorTest.Tuple2ResultSortComparator());
+
+		// do snapshot and save to file
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot");
+		testHarness.close();
+		*/
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
+				"win-op-migration-test-reduce-processing-time-flink1.1-snapshot"));
+		testHarness.open();
+
+		testHarness.setProcessingTime(3020);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3)));
+
+		testHarness.setProcessingTime(6000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 5999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testRestoreApplyProcessingTimeWindowsFromFlink11() throws Exception {
+		final int WINDOW_SIZE = 3;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+				inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				new TimeWindow.Serializer(),
+				new TupleKeySelector(),
+				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+				stateDesc,
+				new InternalIterableWindowFunction<>(new WindowOperatorTest.RichSumReducer<TimeWindow>()),
+				ProcessingTimeTrigger.create(),
+				0);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		/*
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), timeServiceProvider);
+
+		testHarness.configureForKeyedStream(new WindowOperatorTest.TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		timeServiceProvider.setCurrentTime(10);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
+
+		timeServiceProvider.setCurrentTime(3010);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1)));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new WindowOperatorTest.Tuple2ResultSortComparator());
+
+		// do snapshot and save to file
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot");
+		testHarness.close();
+		*/
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
+				"win-op-migration-test-apply-processing-time-flink1.1-snapshot"));
+		testHarness.open();
+
+		testHarness.setProcessingTime(3020);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3)));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3)));
+
+		testHarness.setProcessingTime(6000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 5999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+		testHarness.close();
+	}
+
+	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static class Tuple2ResultSortComparator implements Comparator<Object> {
+		@Override
+		public int compare(Object o1, Object o2) {
+			if (o1 instanceof Watermark || o2 instanceof Watermark) {
+				return 0;
+			} else {
+				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
+				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
+				if (sr0.getTimestamp() != sr1.getTimestamp()) {
+					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+				}
+				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
+				if (comparison != 0) {
+					return comparison;
+				} else {
+					return sr0.getValue().f1 - sr1.getValue().f1;
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static class Tuple3ResultSortComparator implements Comparator<Object> {
+		@Override
+		public int compare(Object o1, Object o2) {
+			if (o1 instanceof Watermark || o2 instanceof Watermark) {
+				return 0;
+			} else {
+				StreamRecord<Tuple3<String, Long, Long>> sr0 = (StreamRecord<Tuple3<String, Long, Long>>) o1;
+				StreamRecord<Tuple3<String, Long, Long>> sr1 = (StreamRecord<Tuple3<String, Long, Long>>) o2;
+				if (sr0.getTimestamp() != sr1.getTimestamp()) {
+					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+				}
+				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
+				if (comparison != 0) {
+					return comparison;
+				} else {
+					comparison = (int) (sr0.getValue().f1 - sr1.getValue().f1);
+					if (comparison != 0) {
+						return comparison;
+					}
+					return (int) (sr0.getValue().f1 - sr1.getValue().f1);
+				}
+			}
+		}
+	}
+
+	public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+				Tuple2<String, Integer> value2) throws Exception {
+			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+		}
+	}
+
+	public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
+		private static final long serialVersionUID = 1L;
+
+		private boolean openCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+		}
+
+		@Override
+		public void apply(String key,
+				W window,
+				Iterable<Tuple2<String, Integer>> input,
+				Collector<Tuple2<String, Integer>> out) throws Exception {
+
+			if (!openCalled) {
+				fail("Open was not called");
+			}
+			int sum = 0;
+
+			for (Tuple2<String, Integer> t: input) {
+				sum += t.f1;
+			}
+			out.collect(new Tuple2<>(key, sum));
+
+		}
+
+	}
+
+	public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void apply(String key,
+				TimeWindow window,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, Long, Long>> out) throws Exception {
+			int sum = 0;
+			for (Tuple2<String, Integer> i: values) {
+				sum += i.f1;
+			}
+			String resultString = key + "-" + sum;
+			out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd()));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac134d61/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot
new file mode 100644
index 0000000..02f0df2
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ac134d61/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot
new file mode 100644
index 0000000..78963d6
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ac134d61/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot
new file mode 100644
index 0000000..ed428be
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ac134d61/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot
new file mode 100644
index 0000000..9945be6
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ac134d61/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot
new file mode 100644
index 0000000..5f42359
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ac134d61/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot
new file mode 100644
index 0000000..72e8d90
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot differ


[07/15] flink git commit: [FLINK-5294] Make WindowOperator backwards compatible with 1.1 snapshots

Posted by al...@apache.org.
[FLINK-5294] Make WindowOperator backwards compatible with 1.1 snapshots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0e2a2c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0e2a2c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0e2a2c1

Branch: refs/heads/master
Commit: b0e2a2c175d54346010ee4817d206bbeb4033ac9
Parents: f9b4f91
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 16:40:50 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../operators/windowing/WindowOperator.java     | 100 +++++++++++++++++++
 1 file changed, 100 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0e2a2c1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index edcd833..1cfeba8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -34,6 +34,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -52,8 +54,10 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.PriorityQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -140,6 +144,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	protected transient InternalTimerService<W> internalTimerService;
 
+	// ------------------------------------------------------------------------
+	// State restored in case of migration from an older version (backwards compatibility)
+	// ------------------------------------------------------------------------
+
+	/** The restored processing time timers. */
+	protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
+
+	/** The restored event time timers. */
+	protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
+
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
 	 */
@@ -196,6 +210,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				return internalTimerService.currentProcessingTime();
 			}
 		};
+
+		// if we restore from an older version,
+		// we have to re-register the timers.
+
+		if (restoredFromLegacyEventTimeTimers != null) {
+			for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
+				setCurrentKey(timer.key);
+				internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
+			}
+		}
+
+		if (restoredFromLegacyProcessingTimeTimers != null) {
+			for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
+				setCurrentKey(timer.key);
+				internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
+			}
+		}
+
+		// gc friendliness
+		this.restoredFromLegacyEventTimeTimers = null;
+		this.restoredFromLegacyProcessingTimeTimers = null;
 	}
 
 	@Override
@@ -707,6 +742,71 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	// ------------------------------------------------------------------------
+	//  Restoring / Migrating from an older Flink version.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void restoreState(FSDataInputStream in) throws Exception {
+		super.restoreState(in);
+
+		LOG.info("{} (taskIdx={}) restoring timers from an older Flink version.",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
+		restoreTimers(new DataInputViewStreamWrapper(in));
+	}
+
+	private void restoreTimers(DataInputViewStreamWrapper in) throws IOException {
+		int numWatermarkTimers = in.readInt();
+		this.restoredFromLegacyEventTimeTimers = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
+
+		for (int i = 0; i < numWatermarkTimers; i++) {
+			K key = keySerializer.deserialize(in);
+			W window = windowSerializer.deserialize(in);
+			long timestamp = in.readLong();
+
+			Timer<K, W> timer = new Timer<>(timestamp, key, window);
+			restoredFromLegacyEventTimeTimers.add(timer);
+		}
+
+		int numProcessingTimeTimers = in.readInt();
+		this.restoredFromLegacyProcessingTimeTimers = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
+
+		for (int i = 0; i < numProcessingTimeTimers; i++) {
+			K key = keySerializer.deserialize(in);
+			W window = windowSerializer.deserialize(in);
+			long timestamp = in.readLong();
+
+			Timer<K, W> timer = new Timer<>(timestamp, key, window);
+			restoredFromLegacyProcessingTimeTimers.add(timer);
+		}
+
+		// just to read all the rest, although we do not really use this information.
+		int numProcessingTimeTimerTimestamp = in.readInt();
+		for (int i = 0; i< numProcessingTimeTimerTimestamp; i++) {
+			in.readLong();
+			in.readInt();
+		}
+
+		if (LOG.isDebugEnabled()) {
+			int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+			if (restoredFromLegacyEventTimeTimers != null && !restoredFromLegacyEventTimeTimers.isEmpty()) {
+				LOG.debug("{} (taskIdx={}) restored {} event time timers from an older Flink version: {}",
+					getClass().getSimpleName(), subtaskIdx,
+					restoredFromLegacyEventTimeTimers.size(),
+					restoredFromLegacyEventTimeTimers);
+			}
+
+			if (restoredFromLegacyProcessingTimeTimers != null && !restoredFromLegacyProcessingTimeTimers.isEmpty()) {
+				LOG.debug("{} (taskIdx={}) restored {} processing time timers from an older Flink version: {}",
+					getClass().getSimpleName(), subtaskIdx,
+					restoredFromLegacyProcessingTimeTimers.size(),
+					restoredFromLegacyProcessingTimeTimers);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------
 


[11/15] flink git commit: [hotfix] Add null checks in StateAssignmentOperation

Posted by al...@apache.org.
[hotfix] Add null checks in StateAssignmentOperation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74df7631
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74df7631
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74df7631

Branch: refs/heads/master
Commit: 74df7631316e78af39a5416e12c1adc8a46d87fe
Parents: d1eaa1e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Dec 19 19:50:51 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/StateAssignmentOperation.java     | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74df7631/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 61a71e5..2e05a85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -185,7 +185,9 @@ public class StateAssignmentOperation {
 				ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
 
 				if (!parallelismChanged) {
-					nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
+					if (taskState.getState(subTaskIdx) != null) {
+						nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
+					}
 				}
 
 				// partitionable state
@@ -224,10 +226,17 @@ public class StateAssignmentOperation {
 					newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
 				} else {
 					SubtaskState subtaskState = taskState.getState(subTaskIdx);
-					KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
-					KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
-					newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(oldKeyedStatesBackend) : null;
-					newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(oldKeyedStatesStream) : null;
+					if (subtaskState != null) {
+						KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
+						KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
+						newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(
+								oldKeyedStatesBackend) : null;
+						newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(
+								oldKeyedStatesStream) : null;
+					} else {
+						newKeyedStatesBackend = null;
+						newKeyedStateStream = null;
+					}
 				}
 
 				TaskStateHandles taskStateHandles = new TaskStateHandles(