You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:21 UTC

[06/11] flink git commit: [FLINK-3674] Add an interface for Time aware User Functions

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
new file mode 100644
index 0000000..e9c5eeb
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators.co;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link CoStreamTimelyFlatMap}.
+ */
+public class TimelyCoFlatMapTest extends TestLogger {
+
+	@Test
+	public void testCurrentEventTime() throws Exception {
+
+		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
+				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark1(new Watermark(17));
+		testHarness.processWatermark2(new Watermark(17));
+		testHarness.processElement1(new StreamRecord<>(5, 12L));
+
+		testHarness.processWatermark1(new Watermark(42));
+		testHarness.processWatermark2(new Watermark(42));
+		testHarness.processElement2(new StreamRecord<>("6", 13L));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(17L));
+		expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+		expectedOutput.add(new Watermark(42L));
+		expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testCurrentProcessingTime() throws Exception {
+
+		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
+				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(17);
+		testHarness.processElement1(new StreamRecord<>(5));
+
+		testHarness.setProcessingTime(42);
+		testHarness.processElement2(new StreamRecord<>("6"));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("5PT:17"));
+		expectedOutput.add(new StreamRecord<>("6PT:42"));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testEventTimeTimers() throws Exception {
+
+		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
+				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<>(17, 42L));
+		testHarness.processElement2(new StreamRecord<>("18", 42L));
+
+		testHarness.processWatermark1(new Watermark(5));
+		testHarness.processWatermark2(new Watermark(5));
+
+		testHarness.processWatermark1(new Watermark(6));
+		testHarness.processWatermark2(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
+		expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
+		expectedOutput.add(new StreamRecord<>("1777", 5L));
+		expectedOutput.add(new Watermark(5L));
+		expectedOutput.add(new StreamRecord<>("1777", 6L));
+		expectedOutput.add(new Watermark(6L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeTimers() throws Exception {
+
+		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
+				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<>(17));
+		testHarness.processElement2(new StreamRecord<>("18"));
+
+		testHarness.setProcessingTime(5);
+		testHarness.setProcessingTime(6);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT2:18"));
+		expectedOutput.add(new StreamRecord<>("1777", 5L));
+		expectedOutput.add(new StreamRecord<>("1777", 6L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testEventTimeTimerWithState() throws Exception {
+
+		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
+				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark1(new Watermark(1));
+		testHarness.processWatermark2(new Watermark(1));
+		testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+		testHarness.processWatermark1(new Watermark(2));
+		testHarness.processWatermark2(new Watermark(2));
+		testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7
+
+		testHarness.processWatermark1(new Watermark(6));
+		testHarness.processWatermark2(new Watermark(6));
+
+		testHarness.processWatermark1(new Watermark(7));
+		testHarness.processWatermark2(new Watermark(7));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(1L));
+		expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
+		expectedOutput.add(new Watermark(2L));
+		expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new Watermark(6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+		expectedOutput.add(new Watermark(7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testProcessingTimeTimerWithState() throws Exception {
+
+		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
+				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(1);
+		testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
+
+		testHarness.setProcessingTime(2);
+		testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
+
+		testHarness.setProcessingTime(6);
+		testHarness.setProcessingTime(7);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT2:42"));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testSnapshotAndRestore() throws Exception {
+
+		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
+				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<>(5, 12L));
+		testHarness.processElement2(new StreamRecord<>("5", 12L));
+
+		// snapshot and restore from scratch
+		StreamStateHandle snapshot = testHarness.snapshotLegacy(0, 0);
+
+		testHarness.close();
+
+		operator = new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+
+		testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
+				operator,
+				new IntToStringKeySelector<>(),
+				new IdentityKeySelector<String>(),
+				BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.restore(snapshot);
+		testHarness.open();
+
+		testHarness.setProcessingTime(5);
+		testHarness.processWatermark1(new Watermark(6));
+		testHarness.processWatermark2(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+		expectedOutput.add(new Watermark(6));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+
+	private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Integer value) throws Exception {
+			return "" + value;
+		}
+	}
+
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
+	private static class WatermarkQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect(value + "WM:" + timerService.currentWatermark());
+		}
+
+		@Override
+		public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect(value + "WM:" + timerService.currentWatermark());
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+		}
+	}
+
+	private static class EventTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			timerService.registerEventTimeTimer(5);
+		}
+
+		@Override
+		public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			timerService.registerEventTimeTimer(6);
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+
+			assertEquals(TimeDomain.EVENT_TIME, timeDomain);
+			out.collect("" + 1777);
+		}
+	}
+
+	private static class EventTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<String> state =
+				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
+
+		@Override
+		public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			getRuntimeContext().getState(state).update("" + value);
+			timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+		}
+
+		@Override
+		public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			getRuntimeContext().getState(state).update(value);
+			timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+		}
+
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+			assertEquals(TimeDomain.EVENT_TIME, timeDomain);
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class ProcessingTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			timerService.registerProcessingTimeTimer(5);
+		}
+
+		@Override
+		public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			timerService.registerProcessingTimeTimer(6);
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+
+			assertEquals(TimeDomain.PROCESSING_TIME, timeDomain);
+			out.collect("" + 1777);
+		}
+	}
+
+	private static class ProcessingTimeQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect(value + "PT:" + timerService.currentProcessingTime());
+		}
+
+		@Override
+		public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect(value + "PT:" + timerService.currentProcessingTime());
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+		}
+	}
+
+	private static class ProcessingTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<String> state =
+				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
+
+		@Override
+		public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			getRuntimeContext().getState(state).update("" + value);
+			timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5);
+		}
+
+		@Override
+		public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			getRuntimeContext().getState(state).update(value);
+			timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5);
+		}
+
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+			assertEquals(TimeDomain.PROCESSING_TIME, timeDomain);
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class BothTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			timerService.registerEventTimeTimer(6);
+		}
+
+		@Override
+		public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception {
+			timerService.registerProcessingTimeTimer(5);
+		}
+
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+			if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+				out.collect("EVENT:1777");
+			} else {
+				out.collect("PROC:1777");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 3dd2ed7..52311f3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -576,12 +576,9 @@ public class OneInputStreamTaskTest extends TestLogger {
 		}
 
 		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-
-		}
-
-		@Override
 		public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
+			super.snapshotState(out, checkpointId, timestamp);
+
 			if (random == null) {
 				random = new Random(seed);
 			}
@@ -595,6 +592,8 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		@Override
 		public void restoreState(FSDataInputStream in) throws Exception {
+			super.restoreState(in);
+
 			numberRestoreCalls++;
 
 			if (random == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 95115d6..bc40a89 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -53,6 +53,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import scala.Option;
@@ -76,7 +77,11 @@ import static org.junit.Assert.fail;
 
 /**
  * TODO : parameterize to test all different state backends!
+ *
+ * Ignored for now since the timers in {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
+ * are not repartitionable/key-group-aware.
  */
+@Ignore
 public class RescalingITCase extends TestLogger {
 
 	private static final int numTaskManagers = 2;

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index fc48719..d794953 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -160,6 +160,7 @@ public class SavepointITCase extends TestLogger {
 			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
 			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 					checkpointDir.toURI().toString());
+			config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
 			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
 					savepointDir.toURI().toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 5855214..530d6cc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -726,6 +726,8 @@ public class TimestampITCase extends TestLogger {
 
 		@Override
 		public void processWatermark(Watermark mark) throws Exception {
+			super.processWatermark(mark);
+
 			for (Watermark previousMark: watermarks) {
 				assertTrue(previousMark.getTimestamp() < mark.getTimestamp());
 			}
@@ -760,9 +762,6 @@ public class TimestampITCase extends TestLogger {
 			}
 			output.collect(element);
 		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {}
 	}
 
 	public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
@@ -774,10 +773,6 @@ public class TimestampITCase extends TestLogger {
 			}
 			output.collect(element);
 		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {}
-
 	}
 
 	public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> {