You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 15:08:23 UTC
[flink] branch release-1.9 updated: [FLINK-13315][api-java] Port
wmstrategies to api-java-bridge
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 66edf7d [FLINK-13315][api-java] Port wmstrategies to api-java-bridge
66edf7d is described below
commit 66edf7d386817c7939fedcf73c926e94034eaa0a
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 17 20:54:30 2019 +0800
[FLINK-13315][api-java] Port wmstrategies to api-java-bridge
This closes #9153.
---
.../sources/wmstrategies/AscendingTimestamps.java | 71 +++++++++++++++++
.../wmstrategies/BoundedOutOfOrderTimestamps.java | 88 ++++++++++++++++++++++
.../wmstrategies/PeriodicWatermarkAssigner.java | 44 +++++++++++
.../wmstrategies/PunctuatedWatermarkAssigner.java | 40 ++++++++++
.../sources/wmstrategies/PreserveWatermarks.java | 53 +++++++++++++
.../stream/StreamExecTableSourceScan.scala | 23 ++++--
.../sources/wmstrategies/AscendingTimestamps.scala | 49 ------------
.../sources/wmstrategies/watermarkStrategies.scala | 81 --------------------
.../sources/wmstrategies/AscendingTimestamps.scala | 60 ---------------
.../wmstrategies/BoundedOutOfOrderTimestamps.scala | 63 ----------------
.../sources/wmstrategies/watermarkStrategies.scala | 80 --------------------
.../table/dataformat/DataFormatConverters.java | 1 -
12 files changed, 313 insertions(+), 340 deletions(-)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
new file mode 100644
index 0000000..4b85007
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.descriptors.Rowtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A watermark strategy for ascending rowtime attributes.
+ *
+ * <p>Emits a watermark of the maximum observed timestamp so far minus 1.
+ * Rows that have a timestamp equal to the max timestamp are not late.
+ */
+@PublicEvolving
+public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
+
+ private static final long serialVersionUID = 1L;
+
+ private long maxTimestamp = Long.MIN_VALUE + 1;
+
+ @Override
+ public void nextTimestamp(long timestamp) {
+ if (timestamp > maxTimestamp) {
+ maxTimestamp = timestamp;
+ }
+ }
+
+ @Override
+ public Map<String, String> toProperties() {
+ Map<String, String> map = new HashMap<>();
+ map.put(
+ Rowtime.ROWTIME_WATERMARKS_TYPE,
+ Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING);
+ return map;
+ }
+
+ @Override
+ public int hashCode() {
+ return AscendingTimestamps.class.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof AscendingTimestamps;
+ }
+
+ @Override
+ public Watermark getWatermark() {
+ return new Watermark(maxTimestamp - 1);
+ }
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
new file mode 100644
index 0000000..725f534
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.descriptors.Rowtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
+ *
+ * <p>Emits watermarks which are the maximum observed timestamp minus the specified delay.
+ */
+@PublicEvolving
+public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long delay;
+ private long maxTimestamp = Long.MIN_VALUE + 1;
+
+ /**
+ * @param delay The delay by which watermarks are behind the maximum observed timestamp.
+ */
+ public BoundedOutOfOrderTimestamps(long delay) {
+ this.delay = delay;
+ }
+
+ @Override
+ public void nextTimestamp(long timestamp) {
+ if (timestamp > maxTimestamp) {
+ maxTimestamp = timestamp;
+ }
+ }
+
+ @Override
+ public Watermark getWatermark() {
+ return new Watermark(maxTimestamp - delay);
+ }
+
+ @Override
+ public Map<String, String> toProperties() {
+ Map<String, String> map = new HashMap<>();
+ map.put(
+ Rowtime.ROWTIME_WATERMARKS_TYPE,
+ Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED);
+ map.put(Rowtime.ROWTIME_WATERMARKS_DELAY, String.valueOf(delay));
+ return map;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BoundedOutOfOrderTimestamps that = (BoundedOutOfOrderTimestamps) o;
+
+ return delay == that.delay;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(delay);
+ }
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PeriodicWatermarkAssigner.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PeriodicWatermarkAssigner.java
new file mode 100644
index 0000000..4b35e2e
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PeriodicWatermarkAssigner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A periodic watermark assigner.
+ */
+@PublicEvolving
+public abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
+
+ /**
+ * Updates the assigner with the next timestamp.
+ *
+ * @param timestamp The next timestamp to update the assigner.
+ */
+ public abstract void nextTimestamp(long timestamp);
+
+ /**
+ * Returns the current watermark.
+ *
+ * @return The current watermark.
+ */
+ public abstract Watermark getWatermark();
+
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PunctuatedWatermarkAssigner.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PunctuatedWatermarkAssigner.java
new file mode 100644
index 0000000..210d829
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PunctuatedWatermarkAssigner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.types.Row;
+
+/**
+ * A punctuated watermark assigner.
+ */
+@PublicEvolving
+public abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
+
+ /**
+ * Returns the watermark for the current row or null if no watermark should be generated.
+ *
+ * @param row The current row.
+ * @param timestamp The value of the timestamp attribute for the row.
+ * @return The watermark for this row or null if no watermark should be generated.
+ */
+ public abstract Watermark getWatermark(Row row, long timestamp);
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/wmstrategies/PreserveWatermarks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/wmstrategies/PreserveWatermarks.java
new file mode 100644
index 0000000..34ac1e7
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/wmstrategies/PreserveWatermarks.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.descriptors.Rowtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A strategy which indicates the watermarks should be preserved from the underlying datastream.
+ */
+@PublicEvolving
+public final class PreserveWatermarks extends WatermarkStrategy {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final PreserveWatermarks INSTANCE = new PreserveWatermarks();
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof PreserveWatermarks;
+ }
+
+ @Override
+ public int hashCode() {
+ return PreserveWatermarks.class.hashCode();
+ }
+
+ @Override
+ public Map<String, String> toProperties() {
+ Map<String, String> map = new HashMap<>();
+ map.put(Rowtime.ROWTIME_WATERMARKS_TYPE, Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE);
+ return map;
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index dc7e7ab..bed598a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -24,10 +24,11 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{DataTypes, TableException}
import org.apache.flink.table.codegen.CodeGeneratorContext
import org.apache.flink.table.codegen.OperatorCodeGenerator._
-import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.dataformat.DataFormatConverters.DataFormatConverter
+import org.apache.flink.table.dataformat.{BaseRow, DataFormatConverters}
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.FlinkRelOptTable
@@ -36,7 +37,9 @@ import org.apache.flink.table.planner.StreamPlanner
import org.apache.flink.table.runtime.AbstractProcessStreamOperator
import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PreserveWatermarks, PunctuatedWatermarkAssigner}
import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, StreamTableSource, TableSourceUtil}
-import org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, fromLegacyInfoToDataType}
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.types.{DataType, FieldsDataType}
+import org.apache.flink.types.Row
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
@@ -173,7 +176,8 @@ class StreamExecTableSourceScan(
val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
case p: PunctuatedWatermarkAssigner =>
- val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+ val watermarkGenerator =
+ new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, producedDataType)
ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
case _: PreserveWatermarks =>
// The watermarks have already been provided by the underlying DataStream.
@@ -229,12 +233,19 @@ private class PeriodicWatermarkAssignerWrapper(
*/
private class PunctuatedWatermarkAssignerWrapper(
timeFieldIdx: Int,
- assigner: PunctuatedWatermarkAssigner)
+ assigner: PunctuatedWatermarkAssigner,
+ sourceType: DataType)
extends AssignerWithPunctuatedWatermarks[BaseRow] {
+ private val converter =
+ DataFormatConverters.getConverterForDataType((sourceType match {
+ case _: FieldsDataType => sourceType
+ case _ => DataTypes.ROW(DataTypes.FIELD("f0", sourceType))
+ }).bridgedTo(classOf[Row])).asInstanceOf[DataFormatConverter[BaseRow, Row]]
+
override def checkAndGetNextWatermark(row: BaseRow, ts: Long): Watermark = {
val timestamp: Long = row.getLong(timeFieldIdx)
- assigner.getWatermark(row, timestamp)
+ assigner.getWatermark(converter.toExternal(row), timestamp)
}
override def extractTimestamp(element: BaseRow, previousElementTimestamp: Long): Long = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
deleted file mode 100644
index 8dbe0e1..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import org.apache.flink.streaming.api.watermark.Watermark
-
-/**
- * A watermark strategy for ascending rowtime attributes.
- *
- * Emits a watermark of the maximum observed timestamp so far minus 1.
- * Rows that have a timestamp equal to the max timestamp are not late.
- */
-final class AscendingTimestamps extends PeriodicWatermarkAssigner {
-
- var maxTimestamp: Long = Long.MinValue + 1
-
- override def nextTimestamp(timestamp: Long): Unit = {
- if (timestamp > maxTimestamp) {
- maxTimestamp = timestamp
- }
- }
-
- override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
-
- override def equals(obj: Any): Boolean = obj match {
- case _: AscendingTimestamps => true
- case _ => false
- }
-
- override def hashCode(): Int = {
- classOf[AscendingTimestamps].hashCode()
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
deleted file mode 100644
index 088ac80..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.dataformat.BaseRow
-import org.apache.flink.table.descriptors.Rowtime
-
-import java.util
-
-/** A periodic watermark assigner. */
-abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
-
- /**
- * Updates the assigner with the next timestamp.
- *
- * @param timestamp The next timestamp to update the assigner.
- */
- def nextTimestamp(timestamp: Long): Unit
-
- /**
- * Returns the current watermark.
- *
- * @return The current watermark.
- */
- def getWatermark: Watermark
-}
-
-/** A punctuated watermark assigner. */
-abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
-
- /**
- * Returns the watermark for the current row or null if no watermark should be generated.
- *
- * @param row The current row.
- * @param timestamp The value of the timestamp attribute for the row.
- * @return The watermark for this row or null if no watermark should be generated.
- */
- def getWatermark(row: BaseRow, timestamp: Long): Watermark
-}
-
-/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
-final class PreserveWatermarks extends WatermarkStrategy {
-
- override def equals(obj: scala.Any): Boolean = {
- obj match {
- case _: PreserveWatermarks => true
- case _ => false
- }
- }
-
- override def hashCode(): Int = {
- classOf[PreserveWatermarks].hashCode()
- }
-
- override def toProperties: util.Map[String, String] = {
- val javaMap = new util.HashMap[String, String]()
- javaMap.put(Rowtime.ROWTIME_WATERMARKS_TYPE, Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE)
- javaMap
- }
-}
-
-object PreserveWatermarks {
- val INSTANCE: PreserveWatermarks = new PreserveWatermarks
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
deleted file mode 100644
index 4a3630d..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import java.util
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.descriptors.Rowtime
-
-/**
- * A watermark strategy for ascending rowtime attributes.
- *
- * Emits a watermark of the maximum observed timestamp so far minus 1.
- * Rows that have a timestamp equal to the max timestamp are not late.
- */
-final class AscendingTimestamps extends PeriodicWatermarkAssigner {
-
- var maxTimestamp: Long = Long.MinValue + 1
-
- override def nextTimestamp(timestamp: Long): Unit = {
- if (timestamp > maxTimestamp) {
- maxTimestamp = timestamp
- }
- }
-
- override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
-
- override def equals(obj: Any): Boolean = obj match {
- case _: AscendingTimestamps => true
- case _ => false
- }
-
- override def hashCode(): Int = {
- classOf[AscendingTimestamps].hashCode()
- }
-
- override def toProperties: util.Map[String, String] = {
- val javaMap = new util.HashMap[String, String]()
- javaMap.put(
- Rowtime.ROWTIME_WATERMARKS_TYPE,
- Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING)
- javaMap
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
deleted file mode 100644
index 90972b2..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import java.util
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.descriptors.Rowtime
-
-/**
- * A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
- *
- * Emits watermarks which are the maximum observed timestamp minus the specified delay.
- *
- * @param delay The delay by which watermarks are behind the maximum observed timestamp.
- */
-final class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner {
-
- var maxTimestamp: Long = Long.MinValue + delay
-
- override def nextTimestamp(timestamp: Long): Unit = {
- if (timestamp > maxTimestamp) {
- maxTimestamp = timestamp
- }
- }
-
- override def getWatermark: Watermark = new Watermark(maxTimestamp - delay)
-
- override def equals(other: Any): Boolean = other match {
- case that: BoundedOutOfOrderTimestamps =>
- delay == that.delay
- case _ => false
- }
-
- override def hashCode(): Int = {
- delay.hashCode()
- }
-
- override def toProperties: util.Map[String, String] = {
- val javaMap = new util.HashMap[String, String]()
- javaMap.put(
- Rowtime.ROWTIME_WATERMARKS_TYPE,
- Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED)
- javaMap.put(Rowtime.ROWTIME_WATERMARKS_DELAY, delay.toString)
- javaMap
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
deleted file mode 100644
index 536eed4..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import java.util
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.descriptors.Rowtime
-import org.apache.flink.types.Row
-
-/** A periodic watermark assigner. */
-abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
-
- /**
- * Updates the assigner with the next timestamp.
- *
- * @param timestamp The next timestamp to update the assigner.
- */
- def nextTimestamp(timestamp: Long): Unit
-
- /**
- * Returns the current watermark.
- *
- * @return The current watermark.
- */
- def getWatermark: Watermark
-}
-
-/** A punctuated watermark assigner. */
-abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
-
- /**
- * Returns the watermark for the current row or null if no watermark should be generated.
- *
- * @param row The current row.
- * @param timestamp The value of the timestamp attribute for the row.
- * @return The watermark for this row or null if no watermark should be generated.
- */
- def getWatermark(row: Row, timestamp: Long): Watermark
-}
-
-/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
-final class PreserveWatermarks extends WatermarkStrategy {
-
- override def equals(obj: scala.Any): Boolean = {
- obj match {
- case _: PreserveWatermarks => true
- case _ => false
- }
- }
-
- override def hashCode(): Int = {
- classOf[PreserveWatermarks].hashCode()
- }
-
- override def toProperties: util.Map[String, String] = {
- val javaMap = new util.HashMap[String, String]()
- javaMap.put(Rowtime.ROWTIME_WATERMARKS_TYPE, Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE)
- javaMap
- }
-}
-object PreserveWatermarks {
- val INSTANCE: PreserveWatermarks = new PreserveWatermarks
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index a383682..cf7d882 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -138,7 +138,6 @@ public class DataFormatConverters {
* lost its specific Java format. Only DataType retains all its
* Java format information.
*/
- @Deprecated
public static DataFormatConverter getConverterForDataType(DataType originDataType) {
DataType dataType = originDataType.nullable();
DataFormatConverter converter = TYPE_TO_CONVERTER.get(dataType);