You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:26:31 UTC

flink git commit: [FLINK-2550] Rename extractTimestmp() to assignTimestamps()

Repository: flink
Updated Branches:
  refs/heads/master 0c1141abc -> 9a21ab117


[FLINK-2550] Rename extractTimestmp() to assignTimestamps()

This also changed TimestampExtractor.emitWatermark() to
TimestampExtractor.extractWatermark().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a21ab11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a21ab11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a21ab11

Branch: refs/heads/master
Commit: 9a21ab1175c2f04446a44684bfd8b5e74404f4fe
Parents: 0c1141a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 9 12:25:10 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 9 12:25:10 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/datastream/DataStream.java    |  2 +-
 .../api/functions/AscendingTimestampExtractor.java    |  2 +-
 .../streaming/api/functions/TimestampExtractor.java   |  2 +-
 .../runtime/operators/ExtractTimestampsOperator.java  |  2 +-
 .../streaming/api/complex/ComplexIntegrationTest.java |  8 ++++----
 .../operators/windowing/CoGroupJoinITCase.java        | 14 +++++++-------
 .../runtime/operators/windowing/WindowFoldITCase.java |  6 +++---
 .../flink/streaming/timestamp/TimestampITCase.java    |  7 +++----
 .../flink/streaming/examples/join/WindowJoin.java     |  6 +++---
 .../examples/ml/IncrementalLearningSkeleton.java      |  4 ++--
 .../examples/windowing/TopSpeedWindowing.java         |  4 ++--
 .../scala/examples/windowing/TopSpeedWindowing.scala  |  2 +-
 .../apache/flink/streaming/api/scala/DataStream.scala |  8 ++++----
 .../flink/streaming/api/scala/CoGroupJoinITCase.scala | 14 +++++++-------
 .../flink/streaming/api/scala/WindowFoldITCase.scala  |  6 +++---
 15 files changed, 43 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 80e0e47..f56e53b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -689,7 +689,7 @@ public class DataStream<T> {
 	 *
 	 * @param extractor The TimestampExtractor that is called for each element of the DataStream.
 	 */
-	public SingleOutputStreamOperator<T, ?> extractTimestamp(TimestampExtractor<T> extractor) {
+	public SingleOutputStreamOperator<T, ?> assignTimestamps(TimestampExtractor<T> extractor) {
 		// match parallelism to input, otherwise dop=1 sources could lead to some strange
 		// behaviour: the watermark will creep along very slowly because the elements
 		// from the source go to each extraction operator round robin.

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
index bdead7c..85433f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
@@ -48,7 +48,7 @@ public abstract class AscendingTimestampExtractor<T> implements TimestampExtract
 	}
 
 	@Override
-	public final long emitWatermark(T element, long currentTimestamp) {
+	public final long extractWatermark(T element, long currentTimestamp) {
 		return Long.MIN_VALUE;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
index 29603d9..7fd7b63 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
@@ -57,7 +57,7 @@ public interface TimestampExtractor<T> extends Function {
 	 * @return {@code Long.MIN_VALUE} if no watermark should be emitted, positive value for
 	 *          emitting this value as a watermark.
 	 */
-	long emitWatermark(T element, long currentTimestamp);
+	long extractWatermark(T element, long currentTimestamp);
 
 	/**
 	 * Returns the current watermark. This is periodically called by the system to determine

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 864ca76..ddfc6a1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -68,7 +68,7 @@ public class ExtractTimestampsOperator<T>
 	public void processElement(StreamRecord<T> element) throws Exception {
 		long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
 		output.collect(element.replace(element.getValue(), newTimestamp));
-		long watermark = userFunction.emitWatermark(element.getValue(), newTimestamp);
+		long watermark = userFunction.extractWatermark(element.getValue(), newTimestamp);
 		if (watermark > currentWatermark) {
 			currentWatermark = watermark;
 			output.emitWatermark(new Watermark(currentWatermark));

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 2775299..020dda3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -199,7 +199,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
 
 		sourceStream21
-				.extractTimestamp(new MyTimestampExtractor())
+				.assignTimestamps(new MyTimestampExtractor())
 				.keyBy(2, 2)
 				.timeWindow(Time.of(10, TimeUnit.MILLISECONDS), Time.of(4, TimeUnit.MILLISECONDS))
 				.maxBy(3)
@@ -467,7 +467,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
 		sourceStream6
-				.extractTimestamp(new Timestamp6())
+				.assignTimestamps(new Timestamp6())
 				.timeWindowAll(Time.of(1, TimeUnit.MILLISECONDS))
 				.reduce(new SalesReduceFunction())
 				.flatMap(new FlatMapFunction6())
@@ -551,7 +551,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public long emitWatermark(Tuple5<Integer, String, Character, Double, Boolean> value,
+		public long extractWatermark(Tuple5<Integer, String, Character, Double, Boolean> value,
 				long currentTimestamp) {
 			return (long) value.f0 - 1;
 		}
@@ -705,7 +705,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public long emitWatermark(Tuple2<Date, HashMap<Character, Integer>> value,
+		public long extractWatermark(Tuple2<Date, HashMap<Character, Integer>> value,
 				long currentTimestamp) {
 			Calendar cal = Calendar.getInstance();
 			cal.setTime(value.f0);

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
index bb79e5e..cfae026 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
@@ -74,7 +74,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 			@Override
 			public void cancel() {
 			}
-		}).extractTimestamp(new Tuple2TimestampExtractor());
+		}).assignTimestamps(new Tuple2TimestampExtractor());
 
 		DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
 			private static final long serialVersionUID = 1L;
@@ -94,7 +94,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 			@Override
 			public void cancel() {
 			}
-		}).extractTimestamp(new Tuple2TimestampExtractor());
+		}).assignTimestamps(new Tuple2TimestampExtractor());
 
 
 		source1.coGroup(source2)
@@ -168,7 +168,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 			@Override
 			public void cancel() {
 			}
-		}).extractTimestamp(new Tuple3TimestampExtractor());
+		}).assignTimestamps(new Tuple3TimestampExtractor());
 
 		DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
 			private static final long serialVersionUID = 1L;
@@ -188,7 +188,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 			@Override
 			public void cancel() {
 			}
-		}).extractTimestamp(new Tuple3TimestampExtractor());
+		}).assignTimestamps(new Tuple3TimestampExtractor());
 
 
 		source1.join(source2)
@@ -263,7 +263,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 			@Override
 			public void cancel() {
 			}
-		}).extractTimestamp(new Tuple3TimestampExtractor());
+		}).assignTimestamps(new Tuple3TimestampExtractor());
 
 		source1.join(source1)
 				.where(new Tuple3KeyExtractor())
@@ -323,7 +323,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
+		public long extractWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
 			return element.f1 - 1;
 		}
 
@@ -342,7 +342,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public long emitWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
+		public long extractWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
 			return element.f2 - 1;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
index 45649bd..fb7142b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
@@ -74,7 +74,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 			@Override
 			public void cancel() {
 			}
-		}).extractTimestamp(new Tuple2TimestampExtractor());
+		}).assignTimestamps(new Tuple2TimestampExtractor());
 
 		source1
 				.keyBy(0)
@@ -138,7 +138,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 			@Override
 			public void cancel() {
 			}
-		}).extractTimestamp(new Tuple2TimestampExtractor());
+		}).assignTimestamps(new Tuple2TimestampExtractor());
 
 		source1
 				.windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
@@ -179,7 +179,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
+		public long extractWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
 			return element.f1 - 1;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index d6ff5ce..a88aa1a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.taskmanager.MultiShotLatch;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
 import org.apache.flink.streaming.api.functions.TimestampExtractor;
@@ -264,7 +263,7 @@ public class TimestampITCase {
 			}
 		});
 
-		DataStream<Integer> extractOp = source1.extractTimestamp(
+		DataStream<Integer> extractOp = source1.assignTimestamps(
 				new AscendingTimestampExtractor<Integer>() {
 					@Override
 					public long extractAscendingTimestamp(Integer element, long currentTimestamp) {
@@ -326,14 +325,14 @@ public class TimestampITCase {
 			}
 		});
 
-		source1.extractTimestamp(new TimestampExtractor<Integer>() {
+		source1.assignTimestamps(new TimestampExtractor<Integer>() {
 			@Override
 			public long extractTimestamp(Integer element, long currentTimestamp) {
 				return element;
 			}
 
 			@Override
-			public long emitWatermark(Integer element, long currentTimestamp) {
+			public long extractWatermark(Integer element, long currentTimestamp) {
 				return element - 1;
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 5915a7a..3355f1c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -73,8 +73,8 @@ public class WindowJoin {
 		DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
 
 		// extract the timestamps
-		grades = grades.extractTimestamp(new MyTimestampExtractor());
-		salaries = salaries.extractTimestamp(new MyTimestampExtractor());
+		grades = grades.assignTimestamps(new MyTimestampExtractor());
+		salaries = salaries.assignTimestamps(new MyTimestampExtractor());
 
 		// apply a temporal join over the two stream based on the names over one
 		// second windows
@@ -220,7 +220,7 @@ public class WindowJoin {
 		}
 
 		@Override
-		public long emitWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+		public long extractWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
 			return element.f0 - 1;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 4c73e44..ce227e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -71,7 +71,7 @@ public class IncrementalLearningSkeleton {
 
 		// build new model on every second of new data
 		DataStream<Double[]> model = trainingData
-				.extractTimestamp(new LinearTimestamp())
+				.assignTimestamps(new LinearTimestamp())
 				.timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
 				.apply(new PartialModelBuilder());
 
@@ -158,7 +158,7 @@ public class IncrementalLearningSkeleton {
 		}
 
 		@Override
-		public long emitWatermark(Integer element, long currentTimestamp) {
+		public long extractWatermark(Integer element, long currentTimestamp) {
 			return counter - 1;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index a46ffd9..df3402e 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -68,7 +68,7 @@ public class TopSpeedWindowing {
 		}
 
 		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
-				.extractTimestamp(new CarTimestamp())
+				.assignTimestamps(new CarTimestamp())
 				.keyBy(0)
 				.window(GlobalWindows.create())
 				.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
@@ -169,7 +169,7 @@ public class TopSpeedWindowing {
 		}
 
 		@Override
-		public long emitWatermark(Tuple4<Integer, Integer, Double, Long> element,
+		public long extractWatermark(Tuple4<Integer, Integer, Double, Long> element,
 				long currentTimestamp) {
 			return element.f3 - 1;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 1419afd..f26f32c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -66,7 +66,7 @@ object TopSpeedWindowing {
     val cars = setCarsInput(env)
 
     val topSeed = cars
-      .extractAscendingTimestamp( _.time )
+      .assignAscendingTimestamps( _.time )
       .keyBy("carId")
       .window(GlobalWindows.create)
       .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 22abbdf..afd0700 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -639,8 +639,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * @see org.apache.flink.streaming.api.watermark.Watermark
    */
-  def extractTimestamp(extractor: TimestampExtractor[T]): DataStream[T] = {
-    javaStream.extractTimestamp(clean(extractor))
+  def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
+    javaStream.assignTimestamps(clean(extractor))
   }
 
   /**
@@ -654,14 +654,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * @see org.apache.flink.streaming.api.watermark.Watermark
    */
-  def extractAscendingTimestamp(extractor: T => Long): DataStream[T] = {
+  def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = {
     val cleanExtractor = clean(extractor)
     val extractorFunction = new AscendingTimestampExtractor[T] {
       def extractAscendingTimestamp(element: T, currentTimestamp: Long): Long = {
         cleanExtractor(element)
       }
     }
-    javaStream.extractTimestamp(extractorFunction)
+    javaStream.assignTimestamps(extractorFunction)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index 3f6e10f..3c1e9c3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -57,7 +57,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+    }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
 
     val source2 = env.addSource(new SourceFunction[(String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
@@ -71,7 +71,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+    }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
 
     source1.coGroup(source2)
       .where(_._1)
@@ -121,7 +121,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+    }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
 
     val source2 = env.addSource(new SourceFunction[(String, String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
@@ -137,7 +137,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+    }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
 
     source1.join(source2)
       .where(_._1)
@@ -197,7 +197,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+    }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
 
     source1.join(source1)
       .where(_._1)
@@ -250,7 +250,7 @@ object CoGroupJoinITCase {
       element._2
     }
 
-    def emitWatermark(element: (String, Int), currentTimestamp: Long): Long = {
+    def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
       element._2 - 1
     }
 
@@ -264,7 +264,7 @@ object CoGroupJoinITCase {
       element._3
     }
 
-    def emitWatermark(element: (String, String, Int), currentTimestamp: Long): Long = {
+    def extractWatermark(element: (String, String, Int), currentTimestamp: Long): Long = {
       element._3 - 1
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index dd098a0..d4e8bb2 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -61,7 +61,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).extractTimestamp(new WindowFoldITCase.Tuple2TimestampExtractor)
+    }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
 
     source1
       .keyBy(0)
@@ -106,7 +106,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 
       def cancel() {
       }
-    }).extractTimestamp(new WindowFoldITCase.Tuple2TimestampExtractor)
+    }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
 
     source1
       .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
@@ -137,7 +137,7 @@ object WindowFoldITCase {
       element._2
     }
 
-    def emitWatermark(element: (String, Int), currentTimestamp: Long): Long = {
+    def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
       element._2 - 1
     }