You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/14 07:29:17 UTC

[flink] 01/02: [FLINK-17655] Remove old and long deprecated TimestampExtractor

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc369e2f39c839d1c2b9c3db7970521748a22079
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Nov 11 11:42:33 2019 +0100

    [FLINK-17655] Remove old and long deprecated TimestampExtractor
---
 flink-streaming-java/pom.xml                       |  7 ++
 .../flink/streaming/api/datastream/DataStream.java | 31 -------
 .../api/functions/TimestampExtractor.java          | 74 ----------------
 .../operators/ExtractTimestampsOperator.java       | 98 ----------------------
 flink-streaming-scala/pom.xml                      |  1 +
 .../flink/streaming/api/scala/DataStream.scala     | 18 +---
 6 files changed, 9 insertions(+), 220 deletions(-)

diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 41f3381..b4b780a 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -98,6 +98,13 @@ under the License.
 			<plugin>
 				<groupId>com.github.siom79.japicmp</groupId>
 				<artifactId>japicmp-maven-plugin</artifactId>
+				<configuration>
+					<parameter>
+						<excludes combine.children="append">
+							<exclude>org.apache.flink.streaming.api.datastream.DataStream#assignTimestamps(org.apache.flink.streaming.api.functions.TimestampExtractor)</exclude>
+						</excludes>
+					</parameter>
+				</configuration>
 			</plugin>
 
 			<!-- disable fork reuse for the streaming project, because of
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b10b451..ff51975 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -52,12 +52,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
@@ -84,7 +82,6 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
 import org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator;
 import org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
@@ -874,34 +871,6 @@ public class DataStream<T> {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
-	 * The internal timestamps are, for example, used to to event-time window operations.
-	 *
-	 * <p>If you know that the timestamps are strictly increasing you can use an
-	 * {@link AscendingTimestampExtractor}. Otherwise,
-	 * you should provide a {@link TimestampExtractor} that also implements
-	 * {@link TimestampExtractor#getCurrentWatermark()} to keep track of watermarks.
-	 *
-	 * @param extractor The TimestampExtractor that is called for each element of the DataStream.
-	 *
-	 * @deprecated Please use {@link #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)}
-	 *             of {@link #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)}
-	 *             instead.
-	 * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
-	 * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
-	 */
-	@Deprecated
-	public SingleOutputStreamOperator<T> assignTimestamps(TimestampExtractor<T> extractor) {
-		// match parallelism to input, otherwise dop=1 sources could lead to some strange
-		// behaviour: the watermark will creep along very slowly because the elements
-		// from the source go to each extraction operator round robin.
-		int inputParallelism = getTransformation().getParallelism();
-		ExtractTimestampsOperator<T> operator = new ExtractTimestampsOperator<>(clean(extractor));
-		return transform("ExtractTimestamps", getTransformation().getOutputType(), operator)
-				.setParallelism(inputParallelism);
-	}
-
-	/**
 	 * Assigns timestamps to the elements in the data stream and periodically creates
 	 * watermarks to signal event time progress.
 	 *
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
deleted file mode 100644
index c71e10b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
-
-/**
- * Interface for user functions that extract timestamps from elements.
- *
- * <p>The extractor must also keep track of the current watermark. The system will periodically
- * retrieve this watermark using {@link #getCurrentWatermark()} and submit it throughout the
- * topology.
- *
- * <p>Note: If you know that timestamps are monotonically increasing you can use
- * {@link AscendingTimestampExtractor}. This will keep track of watermarks.
- *
- * @param <T> The type of the elements that this function can extract timestamps from
- *
- * @deprecated This class has been replaced by {@link AssignerWithPeriodicWatermarks} and
- *             {@link AssignerWithPunctuatedWatermarks}
- *
- * @see AssignerWithPeriodicWatermarks
- * @see AssignerWithPunctuatedWatermarks
- */
-@Deprecated
-public interface TimestampExtractor<T> extends Function {
-
-	/**
-	 * Extracts a timestamp from an element.
-	 *
-	 * @param element The element that the timestamp is extracted from.
-	 * @param currentTimestamp The current internal timestamp of the element.
-	 * @return The new timestamp.
-	 */
-	long extractTimestamp(T element, long currentTimestamp);
-
-	/**
-	 * Asks the extractor if it wants to emit a watermark now that it has seen the given element.
-	 * This is called right after {@link #extractTimestamp}. With the same element. The method
-	 * can return {@code Long.MIN_VALUE} to indicate that no watermark should be emitted, a value of 0 or
-	 * greater will be emitted as a watermark if it is higher than the last-emitted watermark.
-	 *
-	 * @param element The element that we last saw.
-	 * @param currentTimestamp The current timestamp of the element that we last saw.
-	 * @return {@code Long.MIN_VALUE} if no watermark should be emitted, positive value for
-	 *          emitting this value as a watermark.
-	 */
-	long extractWatermark(T element, long currentTimestamp);
-
-	/**
-	 * Returns the current watermark. This is periodically called by the system to determine
-	 * the current watermark and forward it.
-	 *
-	 * @see org.apache.flink.streaming.api.watermark.Watermark
-	 */
-	long getCurrentWatermark();
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
deleted file mode 100644
index cec1f06..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-
-/**
- * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for extracting timestamps
- * from user elements and assigning them as the internal timestamp of the {@link StreamRecord}.
- *
- * @param <T> The type of the input elements
- *
- * @deprecated Subsumed by {@link TimestampsAndPeriodicWatermarksOperator} and
- *             {@link TimestampsAndPunctuatedWatermarksOperator}.
- */
-@Deprecated
-public class ExtractTimestampsOperator<T>
-		extends AbstractUdfStreamOperator<T, TimestampExtractor<T>>
-		implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient long watermarkInterval;
-
-	private transient long currentWatermark;
-
-	public ExtractTimestampsOperator(TimestampExtractor<T> extractor) {
-		super(extractor);
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
-		if (watermarkInterval > 0) {
-			long now = getProcessingTimeService().getCurrentProcessingTime();
-			getProcessingTimeService().registerTimer(now + watermarkInterval, this);
-		}
-		currentWatermark = Long.MIN_VALUE;
-	}
-
-	@Override
-	public void processElement(StreamRecord<T> element) throws Exception {
-		long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
-		output.collect(element.replace(element.getValue(), newTimestamp));
-		long watermark = userFunction.extractWatermark(element.getValue(), newTimestamp);
-		if (watermark > currentWatermark) {
-			currentWatermark = watermark;
-			output.emitWatermark(new Watermark(currentWatermark));
-		}
-	}
-
-	@Override
-	public void onProcessingTime(long timestamp) throws Exception {
-		// register next timer
-		long newWatermark = userFunction.getCurrentWatermark();
-		if (newWatermark > currentWatermark) {
-			currentWatermark = newWatermark;
-			// emit watermark
-			output.emitWatermark(new Watermark(currentWatermark));
-		}
-
-		long now = getProcessingTimeService().getCurrentProcessingTime();
-		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
-		// to signal the end of input and to not block watermark progress downstream
-		if (mark.getTimestamp() == Long.MAX_VALUE && mark.getTimestamp() > currentWatermark) {
-			currentWatermark = Long.MAX_VALUE;
-			output.emitWatermark(mark);
-		}
-	}
-}
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index aefa26b..4d575ad 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -252,6 +252,7 @@ under the License.
 							<!-- Ignore method which was created automatically by Scala for default value calculation.
 							Can be removed once https://github.com/siom79/japicmp/issues/176 will be fixed -->
 							<exclude>org.apache.flink.streaming.api.scala.DataStream#iterate\$default\$3()</exclude>
+							<exclude>org.apache.flink.streaming.api.scala.DataStream#assignTimestamps(org.apache.flink.streaming.api.functions.TimestampExtractor)</exclude>
 						</excludes>
 					</parameter>
 				</configuration>
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 5c4f04a..ca43184 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor, BoundedOutOfOrdernessTimestampExtractor}
-import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, ProcessFunction, TimestampExtractor}
+import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, ProcessFunction}
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -812,22 +812,6 @@ class DataStream[T](stream: JavaStream[T]) {
   def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = {
     new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, assigner))
   }
-  
-  /**
-   * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
-   * The internal timestamps are, for example, used to to event-time window operations.
-   *
-   * If you know that the timestamps are strictly increasing you can use an
-   * [[AscendingTimestampExtractor]]. Otherwise,
-   * you should provide a [[TimestampExtractor]] that also implements
-   * [[TimestampExtractor#getCurrentWatermark]] to keep track of watermarks.
-   *
-   * @see org.apache.flink.streaming.api.watermark.Watermark
-   */
-  @deprecated
-  def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
-    asScalaStream(stream.assignTimestamps(clean(extractor)))
-  }
 
   /**
    * Assigns timestamps to the elements in the data stream and periodically creates