You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/14 07:29:17 UTC
[flink] 01/02: [FLINK-17655] Remove old and long deprecated
TimestampExtractor
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bc369e2f39c839d1c2b9c3db7970521748a22079
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Nov 11 11:42:33 2019 +0100
[FLINK-17655] Remove old and long deprecated TimestampExtractor
---
flink-streaming-java/pom.xml | 7 ++
.../flink/streaming/api/datastream/DataStream.java | 31 -------
.../api/functions/TimestampExtractor.java | 74 ----------------
.../operators/ExtractTimestampsOperator.java | 98 ----------------------
flink-streaming-scala/pom.xml | 1 +
.../flink/streaming/api/scala/DataStream.scala | 18 +---
6 files changed, 9 insertions(+), 220 deletions(-)
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 41f3381..b4b780a 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -98,6 +98,13 @@ under the License.
<plugin>
<groupId>com.github.siom79.japicmp</groupId>
<artifactId>japicmp-maven-plugin</artifactId>
+ <configuration>
+ <parameter>
+ <excludes combine.children="append">
+ <exclude>org.apache.flink.streaming.api.datastream.DataStream#assignTimestamps(org.apache.flink.streaming.api.functions.TimestampExtractor)</exclude>
+ </excludes>
+ </parameter>
+ </configuration>
</plugin>
<!-- disable fork reuse for the streaming project, because of
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b10b451..ff51975 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -52,12 +52,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
@@ -84,7 +82,6 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
import org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator;
import org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
@@ -874,34 +871,6 @@ public class DataStream<T> {
// ------------------------------------------------------------------------
/**
- * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
- * The internal timestamps are, for example, used to to event-time window operations.
- *
- * <p>If you know that the timestamps are strictly increasing you can use an
- * {@link AscendingTimestampExtractor}. Otherwise,
- * you should provide a {@link TimestampExtractor} that also implements
- * {@link TimestampExtractor#getCurrentWatermark()} to keep track of watermarks.
- *
- * @param extractor The TimestampExtractor that is called for each element of the DataStream.
- *
- * @deprecated Please use {@link #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)}
- * of {@link #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)}
- * instead.
- * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
- * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
- */
- @Deprecated
- public SingleOutputStreamOperator<T> assignTimestamps(TimestampExtractor<T> extractor) {
- // match parallelism to input, otherwise dop=1 sources could lead to some strange
- // behaviour: the watermark will creep along very slowly because the elements
- // from the source go to each extraction operator round robin.
- int inputParallelism = getTransformation().getParallelism();
- ExtractTimestampsOperator<T> operator = new ExtractTimestampsOperator<>(clean(extractor));
- return transform("ExtractTimestamps", getTransformation().getOutputType(), operator)
- .setParallelism(inputParallelism);
- }
-
- /**
* Assigns timestamps to the elements in the data stream and periodically creates
* watermarks to signal event time progress.
*
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
deleted file mode 100644
index c71e10b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
-
-/**
- * Interface for user functions that extract timestamps from elements.
- *
- * <p>The extractor must also keep track of the current watermark. The system will periodically
- * retrieve this watermark using {@link #getCurrentWatermark()} and submit it throughout the
- * topology.
- *
- * <p>Note: If you know that timestamps are monotonically increasing you can use
- * {@link AscendingTimestampExtractor}. This will keep track of watermarks.
- *
- * @param <T> The type of the elements that this function can extract timestamps from
- *
- * @deprecated This class has been replaced by {@link AssignerWithPeriodicWatermarks} and
- * {@link AssignerWithPunctuatedWatermarks}
- *
- * @see AssignerWithPeriodicWatermarks
- * @see AssignerWithPunctuatedWatermarks
- */
-@Deprecated
-public interface TimestampExtractor<T> extends Function {
-
- /**
- * Extracts a timestamp from an element.
- *
- * @param element The element that the timestamp is extracted from.
- * @param currentTimestamp The current internal timestamp of the element.
- * @return The new timestamp.
- */
- long extractTimestamp(T element, long currentTimestamp);
-
- /**
- * Asks the extractor if it wants to emit a watermark now that it has seen the given element.
- * This is called right after {@link #extractTimestamp}. With the same element. The method
- * can return {@code Long.MIN_VALUE} to indicate that no watermark should be emitted, a value of 0 or
- * greater will be emitted as a watermark if it is higher than the last-emitted watermark.
- *
- * @param element The element that we last saw.
- * @param currentTimestamp The current timestamp of the element that we last saw.
- * @return {@code Long.MIN_VALUE} if no watermark should be emitted, positive value for
- * emitting this value as a watermark.
- */
- long extractWatermark(T element, long currentTimestamp);
-
- /**
- * Returns the current watermark. This is periodically called by the system to determine
- * the current watermark and forward it.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
- long getCurrentWatermark();
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
deleted file mode 100644
index cec1f06..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-
-/**
- * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for extracting timestamps
- * from user elements and assigning them as the internal timestamp of the {@link StreamRecord}.
- *
- * @param <T> The type of the input elements
- *
- * @deprecated Subsumed by {@link TimestampsAndPeriodicWatermarksOperator} and
- * {@link TimestampsAndPunctuatedWatermarksOperator}.
- */
-@Deprecated
-public class ExtractTimestampsOperator<T>
- extends AbstractUdfStreamOperator<T, TimestampExtractor<T>>
- implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
-
- private static final long serialVersionUID = 1L;
-
- private transient long watermarkInterval;
-
- private transient long currentWatermark;
-
- public ExtractTimestampsOperator(TimestampExtractor<T> extractor) {
- super(extractor);
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
- if (watermarkInterval > 0) {
- long now = getProcessingTimeService().getCurrentProcessingTime();
- getProcessingTimeService().registerTimer(now + watermarkInterval, this);
- }
- currentWatermark = Long.MIN_VALUE;
- }
-
- @Override
- public void processElement(StreamRecord<T> element) throws Exception {
- long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
- output.collect(element.replace(element.getValue(), newTimestamp));
- long watermark = userFunction.extractWatermark(element.getValue(), newTimestamp);
- if (watermark > currentWatermark) {
- currentWatermark = watermark;
- output.emitWatermark(new Watermark(currentWatermark));
- }
- }
-
- @Override
- public void onProcessingTime(long timestamp) throws Exception {
- // register next timer
- long newWatermark = userFunction.getCurrentWatermark();
- if (newWatermark > currentWatermark) {
- currentWatermark = newWatermark;
- // emit watermark
- output.emitWatermark(new Watermark(currentWatermark));
- }
-
- long now = getProcessingTimeService().getCurrentProcessingTime();
- getProcessingTimeService().registerTimer(now + watermarkInterval, this);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- // if we receive a Long.MAX_VALUE watermark we forward it since it is used
- // to signal the end of input and to not block watermark progress downstream
- if (mark.getTimestamp() == Long.MAX_VALUE && mark.getTimestamp() > currentWatermark) {
- currentWatermark = Long.MAX_VALUE;
- output.emitWatermark(mark);
- }
- }
-}
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index aefa26b..4d575ad 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -252,6 +252,7 @@ under the License.
<!-- Ignore method which was created automatically by Scala for default value calculation.
Can be removed once https://github.com/siom79/japicmp/issues/176 will be fixed -->
<exclude>org.apache.flink.streaming.api.scala.DataStream#iterate\$default\$3()</exclude>
+ <exclude>org.apache.flink.streaming.api.scala.DataStream#assignTimestamps(org.apache.flink.streaming.api.functions.TimestampExtractor)</exclude>
</excludes>
</parameter>
</configuration>
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 5c4f04a..ca43184 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor, BoundedOutOfOrdernessTimestampExtractor}
-import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, ProcessFunction, TimestampExtractor}
+import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, ProcessFunction}
import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
@@ -812,22 +812,6 @@ class DataStream[T](stream: JavaStream[T]) {
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = {
new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, assigner))
}
-
- /**
- * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
- * The internal timestamps are, for example, used to to event-time window operations.
- *
- * If you know that the timestamps are strictly increasing you can use an
- * [[AscendingTimestampExtractor]]. Otherwise,
- * you should provide a [[TimestampExtractor]] that also implements
- * [[TimestampExtractor#getCurrentWatermark]] to keep track of watermarks.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
- @deprecated
- def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
- asScalaStream(stream.assignTimestamps(clean(extractor)))
- }
/**
* Assigns timestamps to the elements in the data stream and periodically creates