You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:26:31 UTC
flink git commit: [FLINK-2550] Rename extractTimestmp() to
assignTimestamps()
Repository: flink
Updated Branches:
refs/heads/master 0c1141abc -> 9a21ab117
[FLINK-2550] Rename extractTimestmp() to assignTimestamps()
This also changed TimestampExtractor.emitWatermark() to
TimestampExtractor.extractWatermark().
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a21ab11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a21ab11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a21ab11
Branch: refs/heads/master
Commit: 9a21ab1175c2f04446a44684bfd8b5e74404f4fe
Parents: 0c1141a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 9 12:25:10 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 9 12:25:10 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/datastream/DataStream.java | 2 +-
.../api/functions/AscendingTimestampExtractor.java | 2 +-
.../streaming/api/functions/TimestampExtractor.java | 2 +-
.../runtime/operators/ExtractTimestampsOperator.java | 2 +-
.../streaming/api/complex/ComplexIntegrationTest.java | 8 ++++----
.../operators/windowing/CoGroupJoinITCase.java | 14 +++++++-------
.../runtime/operators/windowing/WindowFoldITCase.java | 6 +++---
.../flink/streaming/timestamp/TimestampITCase.java | 7 +++----
.../flink/streaming/examples/join/WindowJoin.java | 6 +++---
.../examples/ml/IncrementalLearningSkeleton.java | 4 ++--
.../examples/windowing/TopSpeedWindowing.java | 4 ++--
.../scala/examples/windowing/TopSpeedWindowing.scala | 2 +-
.../apache/flink/streaming/api/scala/DataStream.scala | 8 ++++----
.../flink/streaming/api/scala/CoGroupJoinITCase.scala | 14 +++++++-------
.../flink/streaming/api/scala/WindowFoldITCase.scala | 6 +++---
15 files changed, 43 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 80e0e47..f56e53b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -689,7 +689,7 @@ public class DataStream<T> {
*
* @param extractor The TimestampExtractor that is called for each element of the DataStream.
*/
- public SingleOutputStreamOperator<T, ?> extractTimestamp(TimestampExtractor<T> extractor) {
+ public SingleOutputStreamOperator<T, ?> assignTimestamps(TimestampExtractor<T> extractor) {
// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
index bdead7c..85433f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
@@ -48,7 +48,7 @@ public abstract class AscendingTimestampExtractor<T> implements TimestampExtract
}
@Override
- public final long emitWatermark(T element, long currentTimestamp) {
+ public final long extractWatermark(T element, long currentTimestamp) {
return Long.MIN_VALUE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
index 29603d9..7fd7b63 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
@@ -57,7 +57,7 @@ public interface TimestampExtractor<T> extends Function {
* @return {@code Long.MIN_VALUE} if no watermark should be emitted, positive value for
* emitting this value as a watermark.
*/
- long emitWatermark(T element, long currentTimestamp);
+ long extractWatermark(T element, long currentTimestamp);
/**
* Returns the current watermark. This is periodically called by the system to determine
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 864ca76..ddfc6a1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -68,7 +68,7 @@ public class ExtractTimestampsOperator<T>
public void processElement(StreamRecord<T> element) throws Exception {
long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
output.collect(element.replace(element.getValue(), newTimestamp));
- long watermark = userFunction.emitWatermark(element.getValue(), newTimestamp);
+ long watermark = userFunction.extractWatermark(element.getValue(), newTimestamp);
if (watermark > currentWatermark) {
currentWatermark = watermark;
output.emitWatermark(new Watermark(currentWatermark));
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 2775299..020dda3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -199,7 +199,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
sourceStream21
- .extractTimestamp(new MyTimestampExtractor())
+ .assignTimestamps(new MyTimestampExtractor())
.keyBy(2, 2)
.timeWindow(Time.of(10, TimeUnit.MILLISECONDS), Time.of(4, TimeUnit.MILLISECONDS))
.maxBy(3)
@@ -467,7 +467,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
sourceStream6
- .extractTimestamp(new Timestamp6())
+ .assignTimestamps(new Timestamp6())
.timeWindowAll(Time.of(1, TimeUnit.MILLISECONDS))
.reduce(new SalesReduceFunction())
.flatMap(new FlatMapFunction6())
@@ -551,7 +551,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
}
@Override
- public long emitWatermark(Tuple5<Integer, String, Character, Double, Boolean> value,
+ public long extractWatermark(Tuple5<Integer, String, Character, Double, Boolean> value,
long currentTimestamp) {
return (long) value.f0 - 1;
}
@@ -705,7 +705,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
}
@Override
- public long emitWatermark(Tuple2<Date, HashMap<Character, Integer>> value,
+ public long extractWatermark(Tuple2<Date, HashMap<Character, Integer>> value,
long currentTimestamp) {
Calendar cal = Calendar.getInstance();
cal.setTime(value.f0);
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
index bb79e5e..cfae026 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
@@ -74,7 +74,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
@Override
public void cancel() {
}
- }).extractTimestamp(new Tuple2TimestampExtractor());
+ }).assignTimestamps(new Tuple2TimestampExtractor());
DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@@ -94,7 +94,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
@Override
public void cancel() {
}
- }).extractTimestamp(new Tuple2TimestampExtractor());
+ }).assignTimestamps(new Tuple2TimestampExtractor());
source1.coGroup(source2)
@@ -168,7 +168,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
@Override
public void cancel() {
}
- }).extractTimestamp(new Tuple3TimestampExtractor());
+ }).assignTimestamps(new Tuple3TimestampExtractor());
DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
private static final long serialVersionUID = 1L;
@@ -188,7 +188,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
@Override
public void cancel() {
}
- }).extractTimestamp(new Tuple3TimestampExtractor());
+ }).assignTimestamps(new Tuple3TimestampExtractor());
source1.join(source2)
@@ -263,7 +263,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
@Override
public void cancel() {
}
- }).extractTimestamp(new Tuple3TimestampExtractor());
+ }).assignTimestamps(new Tuple3TimestampExtractor());
source1.join(source1)
.where(new Tuple3KeyExtractor())
@@ -323,7 +323,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
}
@Override
- public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
+ public long extractWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
return element.f1 - 1;
}
@@ -342,7 +342,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
}
@Override
- public long emitWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
+ public long extractWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
return element.f2 - 1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
index 45649bd..fb7142b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
@@ -74,7 +74,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
@Override
public void cancel() {
}
- }).extractTimestamp(new Tuple2TimestampExtractor());
+ }).assignTimestamps(new Tuple2TimestampExtractor());
source1
.keyBy(0)
@@ -138,7 +138,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
@Override
public void cancel() {
}
- }).extractTimestamp(new Tuple2TimestampExtractor());
+ }).assignTimestamps(new Tuple2TimestampExtractor());
source1
.windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
@@ -179,7 +179,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
}
@Override
- public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
+ public long extractWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
return element.f1 - 1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index d6ff5ce..a88aa1a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.taskmanager.MultiShotLatch;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
@@ -264,7 +263,7 @@ public class TimestampITCase {
}
});
- DataStream<Integer> extractOp = source1.extractTimestamp(
+ DataStream<Integer> extractOp = source1.assignTimestamps(
new AscendingTimestampExtractor<Integer>() {
@Override
public long extractAscendingTimestamp(Integer element, long currentTimestamp) {
@@ -326,14 +325,14 @@ public class TimestampITCase {
}
});
- source1.extractTimestamp(new TimestampExtractor<Integer>() {
+ source1.assignTimestamps(new TimestampExtractor<Integer>() {
@Override
public long extractTimestamp(Integer element, long currentTimestamp) {
return element;
}
@Override
- public long emitWatermark(Integer element, long currentTimestamp) {
+ public long extractWatermark(Integer element, long currentTimestamp) {
return element - 1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 5915a7a..3355f1c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -73,8 +73,8 @@ public class WindowJoin {
DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
// extract the timestamps
- grades = grades.extractTimestamp(new MyTimestampExtractor());
- salaries = salaries.extractTimestamp(new MyTimestampExtractor());
+ grades = grades.assignTimestamps(new MyTimestampExtractor());
+ salaries = salaries.assignTimestamps(new MyTimestampExtractor());
// apply a temporal join over the two stream based on the names over one
// second windows
@@ -220,7 +220,7 @@ public class WindowJoin {
}
@Override
- public long emitWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+ public long extractWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
return element.f0 - 1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 4c73e44..ce227e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -71,7 +71,7 @@ public class IncrementalLearningSkeleton {
// build new model on every second of new data
DataStream<Double[]> model = trainingData
- .extractTimestamp(new LinearTimestamp())
+ .assignTimestamps(new LinearTimestamp())
.timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
.apply(new PartialModelBuilder());
@@ -158,7 +158,7 @@ public class IncrementalLearningSkeleton {
}
@Override
- public long emitWatermark(Integer element, long currentTimestamp) {
+ public long extractWatermark(Integer element, long currentTimestamp) {
return counter - 1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index a46ffd9..df3402e 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -68,7 +68,7 @@ public class TopSpeedWindowing {
}
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
- .extractTimestamp(new CarTimestamp())
+ .assignTimestamps(new CarTimestamp())
.keyBy(0)
.window(GlobalWindows.create())
.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
@@ -169,7 +169,7 @@ public class TopSpeedWindowing {
}
@Override
- public long emitWatermark(Tuple4<Integer, Integer, Double, Long> element,
+ public long extractWatermark(Tuple4<Integer, Integer, Double, Long> element,
long currentTimestamp) {
return element.f3 - 1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 1419afd..f26f32c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -66,7 +66,7 @@ object TopSpeedWindowing {
val cars = setCarsInput(env)
val topSeed = cars
- .extractAscendingTimestamp( _.time )
+ .assignAscendingTimestamps( _.time )
.keyBy("carId")
.window(GlobalWindows.create)
.evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 22abbdf..afd0700 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -639,8 +639,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
- def extractTimestamp(extractor: TimestampExtractor[T]): DataStream[T] = {
- javaStream.extractTimestamp(clean(extractor))
+ def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
+ javaStream.assignTimestamps(clean(extractor))
}
/**
@@ -654,14 +654,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
- def extractAscendingTimestamp(extractor: T => Long): DataStream[T] = {
+ def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = {
val cleanExtractor = clean(extractor)
val extractorFunction = new AscendingTimestampExtractor[T] {
def extractAscendingTimestamp(element: T, currentTimestamp: Long): Long = {
cleanExtractor(element)
}
}
- javaStream.extractTimestamp(extractorFunction)
+ javaStream.assignTimestamps(extractorFunction)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index 3f6e10f..3c1e9c3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -57,7 +57,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
def cancel() {
}
- }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+ }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
val source2 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
@@ -71,7 +71,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
def cancel() {
}
- }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor)
+ }).assignTimestamps(new CoGroupJoinITCase.Tuple2TimestampExtractor)
source1.coGroup(source2)
.where(_._1)
@@ -121,7 +121,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
def cancel() {
}
- }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+ }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
val source2 = env.addSource(new SourceFunction[(String, String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
@@ -137,7 +137,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
def cancel() {
}
- }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+ }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
source1.join(source2)
.where(_._1)
@@ -197,7 +197,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
def cancel() {
}
- }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor)
+ }).assignTimestamps(new CoGroupJoinITCase.Tuple3TimestampExtractor)
source1.join(source1)
.where(_._1)
@@ -250,7 +250,7 @@ object CoGroupJoinITCase {
element._2
}
- def emitWatermark(element: (String, Int), currentTimestamp: Long): Long = {
+ def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
element._2 - 1
}
@@ -264,7 +264,7 @@ object CoGroupJoinITCase {
element._3
}
- def emitWatermark(element: (String, String, Int), currentTimestamp: Long): Long = {
+ def extractWatermark(element: (String, String, Int), currentTimestamp: Long): Long = {
element._3 - 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a21ab11/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index dd098a0..d4e8bb2 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -61,7 +61,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
def cancel() {
}
- }).extractTimestamp(new WindowFoldITCase.Tuple2TimestampExtractor)
+ }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
source1
.keyBy(0)
@@ -106,7 +106,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
def cancel() {
}
- }).extractTimestamp(new WindowFoldITCase.Tuple2TimestampExtractor)
+ }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
source1
.windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
@@ -137,7 +137,7 @@ object WindowFoldITCase {
element._2
}
- def emitWatermark(element: (String, Int), currentTimestamp: Long): Long = {
+ def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
element._2 - 1
}