You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/04 17:10:39 UTC

flink git commit: [FLINK-2666] Add timestamp extraction operator

Repository: flink
Updated Branches:
  refs/heads/master 88a977689 -> 54668242b


[FLINK-2666] Add timestamp extraction operator

This adds a user function TimestampExtractor and an operator
ExtractTimestampsOperator that can be used to extract timestamps and
attach them to elements to do event-time windowing.

Users can either use an AscendingTimestampExtractor that assumes that
timestamps are monotonically increasing. (This allows it to derive the
watermark very easily.) Or they use a TimestampExtractor, where they
also have to provide the watermark.

The ExtractTimestampOperator periodically (on the auto watermark
interval) calls the extractor to get the current watermark and forwards
it.

This also adds an ITCase for this behaviour.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54668242
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54668242
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54668242

Branch: refs/heads/master
Commit: 54668242be36ca7d40eed0a187521a2c9a8ad930
Parents: 88a9776
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 30 15:05:13 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Oct 4 15:53:55 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  26 ++++
 .../functions/AscendingTimestampExtractor.java  |  59 ++++++++
 .../api/functions/TimestampExtractor.java       |  69 +++++++++
 .../operators/ExtractTimestampsOperator.java    |  98 ++++++++++++
 .../streaming/timestamp/TimestampITCase.java    | 150 ++++++++++++++++++-
 .../flink/streaming/api/scala/DataStream.scala  |  41 ++++-
 6 files changed, 440 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54668242/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index e3f7f3e..c2be055 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.temporal.StreamCrossOperator;
 import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -76,6 +77,7 @@ import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.time.EventTime;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -882,6 +884,30 @@ public class DataStream<T> {
 	}
 
 	/**
+	 * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
+	 * The internal timestamps are, for example, used to to event-time window operations.
+	 *
+	 * <p>
+	 * If you know that the timestamps are strictly increasing you can use an
+	 * {@link org.apache.flink.streaming.api.functions.AscendingTimestampExtractor}. Otherwise,
+	 * you should provide a {@link TimestampExtractor} that also implements
+	 * {@link TimestampExtractor#getCurrentWatermark()} to keep track of watermarks.
+	 *
+	 * @see org.apache.flink.streaming.api.watermark.Watermark
+	 *
+	 * @param extractor The TimestampExtractor that is called for each element of the DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> extractTimestamp(TimestampExtractor<T> extractor) {
+		// match parallelism to input, otherwise dop=1 sources could lead to some strange
+		// behaviour: the watermark will creep along very slowly because the elements
+		// from the source go to each extraction operator round robin.
+		int inputParallelism = getTransformation().getParallelism();
+		ExtractTimestampsOperator<T> operator = new ExtractTimestampsOperator<>(clean(extractor));
+		return transform("ExtractTimestamps", getTransformation().getOutputType(), operator)
+				.setParallelism(inputParallelism);
+	}
+
+	/**
 	 * Writes a DataStream to the standard output stream (stdout).
 	 *
 	 * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/54668242/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
new file mode 100644
index 0000000..bdead7c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions;
+
+/**
+ * Interface for user functions that extract timestamps from elements. The extracting timestamps
+ * must be monotonically increasing.
+ *
+ * @param <T> The type of the elements that this function can extract timestamps from
+ */
+public abstract class AscendingTimestampExtractor<T> implements TimestampExtractor<T> {
+
+	long currentTimestamp = 0;
+
+	/**
+	 * Extracts a timestamp from an element. The timestamp must be monotonically increasing.
+	 *
+	 * @param element The element that the timestamp is extracted from.
+	 * @param currentTimestamp The current internal timestamp of the element.
+	 * @return The new timestamp.
+	 */
+	public abstract long extractAscendingTimestamp(T element, long currentTimestamp);
+
+	@Override
+	public final long extractTimestamp(T element, long currentTimestamp) {
+		long newTimestamp = extractAscendingTimestamp(element, currentTimestamp);
+		if (newTimestamp < this.currentTimestamp) {
+			throw new RuntimeException("Timestamp is lower than previously extracted timestamp. " +
+					"You should implement a custom TimestampExtractor.");
+		}
+		this.currentTimestamp = newTimestamp;
+		return this.currentTimestamp;
+	}
+
+	@Override
+	public final long emitWatermark(T element, long currentTimestamp) {
+		return Long.MIN_VALUE;
+	}
+
+	@Override
+	public final long getCurrentWatermark() {
+		return currentTimestamp - 1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54668242/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
new file mode 100644
index 0000000..29603d9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * Interface for user functions that extract timestamps from elements.
+ *
+ * <p>
+ * The extractor must also keep track of the current watermark. The system will periodically
+ * retrieve this watermark using {@link #getCurrentWatermark()} and submit it throughout the topology.
+ *
+ * <p>
+ * Note: If you know that timestamps are monotonically increasing you can use
+ * {@link org.apache.flink.streaming.api.functions.AscendingTimestampExtractor}. This will
+ * keep track of watermarks.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ *
+ * @param <T> The type of the elements that this function can extract timestamps from
+ */
+public interface TimestampExtractor<T> extends Function {
+
+	/**
+	 * Extracts a timestamp from an element.
+	 *
+	 * @param element The element that the timestamp is extracted from.
+	 * @param currentTimestamp The current internal timestamp of the element.
+	 * @return The new timestamp.
+	 */
+	long extractTimestamp(T element, long currentTimestamp);
+
+	/**
+	 * Asks the extractor if it wants to emit a watermark now that it has seen the given element.
+	 * This is called right after {@link #extractTimestamp}. With the same element. The method
+	 * can return {@code Long.MIN_VALUE} to indicate that no watermark should be emitted, a value of 0 or
+	 * greater will be emitted as a watermark if it is higher than the last-emitted watermark.
+	 *
+	 * @param element The element that we last saw.
+	 * @param currentTimestamp The current timestamp of the element that we last saw.
+	 * @return {@code Long.MIN_VALUE} if no watermark should be emitted, positive value for
+	 *          emitting this value as a watermark.
+	 */
+	long emitWatermark(T element, long currentTimestamp);
+
+	/**
+	 * Returns the current watermark. This is periodically called by the system to determine
+	 * the current watermark and forward it.
+	 *
+	 * @see org.apache.flink.streaming.api.watermark.Watermark
+	 */
+	long getCurrentWatermark();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54668242/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
new file mode 100644
index 0000000..f79b4c3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for extracting timestamps
+ * from user elements and assigning them as the internal timestamp of the {@link StreamRecord}.
+ *
+ * @param <T> The type of the input elements
+ */
+public class ExtractTimestampsOperator<T>
+		extends AbstractUdfStreamOperator<T, TimestampExtractor<T>>
+		implements OneInputStreamOperator<T, T>, Triggerable {
+
+	private static final long serialVersionUID = 1L;
+
+	transient long watermarkInterval;
+
+	transient long currentWatermark;
+
+	public ExtractTimestampsOperator(TimestampExtractor<T> extractor) {
+		super(extractor);
+		chainingStrategy = ChainingStrategy.ALWAYS;
+
+		// we don't give the element to any user code, so input copy is not necessary
+		disableInputCopy();
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+		if (watermarkInterval > 0) {
+			getRuntimeContext().registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+		}
+
+		currentWatermark = Long.MIN_VALUE;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+
+		// emit a final +Inf watermark, just like the sources
+		output.emitWatermark(new Watermark(Long.MAX_VALUE));
+	}
+
+	@Override
+	public void processElement(StreamRecord<T> element) throws Exception {
+		long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
+		output.collect(element.replace(element.getValue(), newTimestamp));
+		long watermark = userFunction.emitWatermark(element.getValue(), newTimestamp);
+		if (watermark > currentWatermark) {
+			currentWatermark = watermark;
+			output.emitWatermark(new Watermark(currentWatermark));
+		}
+	}
+
+	@Override
+	public void trigger(long timestamp) throws Exception {
+		// register next timer
+		getRuntimeContext().registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+		long lastWatermark = currentWatermark;
+		currentWatermark = userFunction.getCurrentWatermark();
+
+		if (currentWatermark > lastWatermark) {
+			// emit watermark
+			output.emitWatermark(new Watermark(currentWatermark));
+		}
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		// ingore them, since we are basically a watermark source
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54668242/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 5dc6b12..d6ff5ce 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -22,8 +22,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskmanager.MultiShotLatch;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -35,6 +39,7 @@ import org.apache.flink.streaming.util.NoOpSink;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -53,8 +58,19 @@ public class TimestampITCase {
 	private static final int NUM_TASK_SLOTS = 3;
 	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
 
+	// this is used in some tests to synchronize
+	static MultiShotLatch latch;
+
+
 	private static ForkableFlinkMiniCluster cluster;
 
+	@Before
+	public void setupLatch() {
+		// ensure that we get a fresh latch for each test
+		latch = new MultiShotLatch();
+	}
+
+
 	@BeforeClass
 	public static void startCluster() {
 		try {
@@ -196,7 +212,9 @@ public class TimestampITCase {
 				"localhost", cluster.getLeaderRPCPort());
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
-		Assert.assertEquals("Timestamps are not disabled by default.", false, env.getConfig().areTimestampsEnabled());
+		Assert.assertEquals("Timestamps are not disabled by default.",
+				false,
+				env.getConfig().areTimestampsEnabled());
 		env.getConfig().disableTimestamps();
 
 
@@ -214,6 +232,134 @@ public class TimestampITCase {
 	}
 
 	/**
+	 * This thests whether timestamps are properly extracted in the timestamp
+	 * extractor and whether watermarks are also correctly forwared from this with the auto watermark
+	 * interval.
+	 */
+	@Test
+	public void testTimestampExtractorWithAutoInterval() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(1);
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().enableTimestamps();
+		env.getConfig().setAutoWatermarkInterval(10);
+
+
+		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 0;
+				while (index < NUM_ELEMENTS) {
+					ctx.collect(index);
+					latch.await();
+					index++;
+				}
+			}
+
+			@Override
+			public void cancel() {
+
+			}
+		});
+
+		DataStream<Integer> extractOp = source1.extractTimestamp(
+				new AscendingTimestampExtractor<Integer>() {
+					@Override
+					public long extractAscendingTimestamp(Integer element, long currentTimestamp) {
+						return element;
+					}
+				});
+
+		extractOp
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+				.transform("Timestamp Check",
+						BasicTypeInfo.INT_TYPE_INFO,
+						new TimestampCheckingOperator());
+
+		// verify that extractor picks up source parallelism
+		Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism());
+
+		env.execute();
+
+		// verify that we get NUM_ELEMENTS watermarks
+		for (int j = 0; j < NUM_ELEMENTS; j++) {
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
+				Assert.fail("Wrong watermark.");
+			}
+		}
+		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
+			Assert.fail("Wrong watermark.");
+		}
+	}
+
+	/**
+	 * This thests whether timestamps are properly extracted in the timestamp
+	 * extractor and whether watermark are correctly forwarded from the custom watermark emit
+	 * function.
+	 */
+	@Test
+	public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception {
+		final int NUM_ELEMENTS = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(1);
+		env.getConfig().disableSysoutLogging();
+		env.getConfig().enableTimestamps();
+
+
+		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+				int index = 0;
+				while (index < NUM_ELEMENTS) {
+					ctx.collect(index);
+					latch.await();
+					index++;
+				}
+			}
+
+			@Override
+			public void cancel() {
+
+			}
+		});
+
+		source1.extractTimestamp(new TimestampExtractor<Integer>() {
+			@Override
+			public long extractTimestamp(Integer element, long currentTimestamp) {
+				return element;
+			}
+
+			@Override
+			public long emitWatermark(Integer element, long currentTimestamp) {
+				return element - 1;
+			}
+
+			@Override
+			public long getCurrentWatermark() {
+				return Long.MIN_VALUE;
+			}
+		})
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
+
+
+		env.execute();
+
+		// verify that we get NUM_ELEMENTS watermarks
+		for (int j = 0; j < NUM_ELEMENTS; j++) {
+			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
+				Assert.fail("Wrong watermark.");
+			}
+		}
+		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
+			Assert.fail("Wrong watermark.");
+		}
+	}
+
+	/**
 	 * This tests whether the program throws an exception when an event-time source tries
 	 * to emit without timestamp.
 	 */
@@ -289,6 +435,8 @@ public class TimestampITCase {
 		@Override
 		public void processWatermark(Watermark mark) throws Exception {
 			watermarks.add(mark);
+			latch.trigger();
+			output.emitWatermark(mark);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/54668242/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 2f4bd23..9f4f52a 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
@@ -29,14 +31,13 @@ import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator, KeyedDataStream}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
-import org.apache.flink.streaming.api.datastream.KeyedDataStream
 import org.apache.flink.streaming.api.scala.function.StatefulFunction
 
 class DataStream[T](javaStream: JavaStream[T]) {
@@ -635,6 +636,42 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream.every(windowingHelper)
 
   /**
+   * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
+   * The internal timestamps are, for example, used to to event-time window operations.
+   *
+   * If you know that the timestamps are strictly increasing you can use an
+   * [[org.apache.flink.streaming.api.functions.AscendingTimestampExtractor]]. Otherwise,
+   * you should provide a [[TimestampExtractor]] that also implements
+   * [[TimestampExtractor#getCurrentWatermark]] to keep track of watermarks.
+   *
+   * @see org.apache.flink.streaming.api.watermark.Watermark
+   */
+  def extractTimestamp(extractor: TimestampExtractor[T]): DataStream[T] = {
+    javaStream.extractTimestamp(clean(extractor))
+  }
+
+  /**
+   * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
+   * The internal timestamps are, for example, used to to event-time window operations.
+   *
+   * If you know that the timestamps are strictly increasing you can use an
+   * [[org.apache.flink.streaming.api.functions.AscendingTimestampExtractor]]. Otherwise,
+   * you should provide a [[TimestampExtractor]] that also implements
+   * [[TimestampExtractor#getCurrentWatermark]] to keep track of watermarks.
+   *
+   * @see org.apache.flink.streaming.api.watermark.Watermark
+   */
+  def extractAscendingTimestamp(extractor: T => Long): DataStream[T] = {
+    val cleanExtractor = clean(extractor)
+    val extractorFunction = new AscendingTimestampExtractor[T] {
+      def extractAscendingTimestamp(element: T, currentTimestamp: Long): Long = {
+        cleanExtractor(element)
+      }
+    }
+    javaStream.extractTimestamp(extractorFunction)
+  }
+
+  /**
    *
    * Operator used for directing tuples to specific named outputs using an
    * OutputSelector. Calling this method on an operator creates a new