You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 15:08:23 UTC

[flink] branch release-1.9 updated: [FLINK-13315][api-java] Port wmstrategies to api-java-bridge

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 66edf7d  [FLINK-13315][api-java] Port wmstrategies to api-java-bridge
66edf7d is described below

commit 66edf7d386817c7939fedcf73c926e94034eaa0a
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 17 20:54:30 2019 +0800

    [FLINK-13315][api-java] Port wmstrategies to api-java-bridge
    
    This closes #9153.
---
 .../sources/wmstrategies/AscendingTimestamps.java  | 71 +++++++++++++++++
 .../wmstrategies/BoundedOutOfOrderTimestamps.java  | 88 ++++++++++++++++++++++
 .../wmstrategies/PeriodicWatermarkAssigner.java    | 44 +++++++++++
 .../wmstrategies/PunctuatedWatermarkAssigner.java  | 40 ++++++++++
 .../sources/wmstrategies/PreserveWatermarks.java   | 53 +++++++++++++
 .../stream/StreamExecTableSourceScan.scala         | 23 ++++--
 .../sources/wmstrategies/AscendingTimestamps.scala | 49 ------------
 .../sources/wmstrategies/watermarkStrategies.scala | 81 --------------------
 .../sources/wmstrategies/AscendingTimestamps.scala | 60 ---------------
 .../wmstrategies/BoundedOutOfOrderTimestamps.scala | 63 ----------------
 .../sources/wmstrategies/watermarkStrategies.scala | 80 --------------------
 .../table/dataformat/DataFormatConverters.java     |  1 -
 12 files changed, 313 insertions(+), 340 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
new file mode 100644
index 0000000..4b85007
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.descriptors.Rowtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A watermark strategy for ascending rowtime attributes.
+ *
+ * <p>Emits a watermark of the maximum observed timestamp so far minus 1.
+ * Rows that have a timestamp equal to the max timestamp are not late.
+ */
+@PublicEvolving
+public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
+
+	private static final long serialVersionUID = 1L;
+
+	private long maxTimestamp = Long.MIN_VALUE + 1;
+
+	@Override
+	public void nextTimestamp(long timestamp) {
+		if (timestamp > maxTimestamp) {
+			maxTimestamp = timestamp;
+		}
+	}
+
+	@Override
+	public Map<String, String> toProperties() {
+		Map<String, String> map = new HashMap<>();
+		map.put(
+				Rowtime.ROWTIME_WATERMARKS_TYPE,
+				Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING);
+		return map;
+	}
+
+	@Override
+	public int hashCode() {
+		return AscendingTimestamps.class.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj instanceof AscendingTimestamps;
+	}
+
+	@Override
+	public Watermark getWatermark() {
+		return new Watermark(maxTimestamp - 1);
+	}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
new file mode 100644
index 0000000..725f534
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.descriptors.Rowtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
+ *
+ * <p>Emits watermarks which are the maximum observed timestamp minus the specified delay.
+ */
+@PublicEvolving
+public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long delay;
+	private long maxTimestamp = Long.MIN_VALUE + 1;
+
+	/**
+	 * @param delay The delay by which watermarks are behind the maximum observed timestamp.
+	 */
+	public BoundedOutOfOrderTimestamps(long delay) {
+		this.delay = delay;
+	}
+
+	@Override
+	public void nextTimestamp(long timestamp) {
+		if (timestamp > maxTimestamp) {
+			maxTimestamp = timestamp;
+		}
+	}
+
+	@Override
+	public Watermark getWatermark() {
+		return new Watermark(maxTimestamp - delay);
+	}
+
+	@Override
+	public Map<String, String> toProperties() {
+		Map<String, String> map = new HashMap<>();
+		map.put(
+				Rowtime.ROWTIME_WATERMARKS_TYPE,
+				Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED);
+		map.put(Rowtime.ROWTIME_WATERMARKS_DELAY, String.valueOf(delay));
+		return map;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		BoundedOutOfOrderTimestamps that = (BoundedOutOfOrderTimestamps) o;
+
+		return delay == that.delay;
+	}
+
+	@Override
+	public int hashCode() {
+		return Long.hashCode(delay);
+	}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PeriodicWatermarkAssigner.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PeriodicWatermarkAssigner.java
new file mode 100644
index 0000000..4b35e2e
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PeriodicWatermarkAssigner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A periodic watermark assigner.
+ */
+@PublicEvolving
+public abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
+
+	/**
+	 * Updates the assigner with the next timestamp.
+	 *
+	 * @param timestamp The next timestamp to update the assigner.
+	 */
+	public abstract void nextTimestamp(long timestamp);
+
+	/**
+	 * Returns the current watermark.
+	 *
+	 * @return The current watermark.
+	 */
+	public abstract Watermark getWatermark();
+
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PunctuatedWatermarkAssigner.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PunctuatedWatermarkAssigner.java
new file mode 100644
index 0000000..210d829
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/PunctuatedWatermarkAssigner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.types.Row;
+
+/**
+ * A punctuated watermark assigner.
+ */
+@PublicEvolving
+public abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
+
+	/**
+	 * Returns the watermark for the current row or null if no watermark should be generated.
+	 *
+	 * @param row The current row.
+	 * @param timestamp The value of the timestamp attribute for the row.
+	 * @return The watermark for this row or null if no watermark should be generated.
+	 */
+	public abstract Watermark getWatermark(Row row, long timestamp);
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/wmstrategies/PreserveWatermarks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/wmstrategies/PreserveWatermarks.java
new file mode 100644
index 0000000..34ac1e7
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/wmstrategies/PreserveWatermarks.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.descriptors.Rowtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A strategy which indicates the watermarks should be preserved from the underlying datastream.
+ */
+@PublicEvolving
+public final class PreserveWatermarks extends WatermarkStrategy {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final PreserveWatermarks INSTANCE = new PreserveWatermarks();
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj instanceof PreserveWatermarks;
+	}
+
+	@Override
+	public int hashCode() {
+		return PreserveWatermarks.class.hashCode();
+	}
+
+	@Override
+	public Map<String, String> toProperties() {
+		Map<String, String> map = new HashMap<>();
+		map.put(Rowtime.ROWTIME_WATERMARKS_TYPE, Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE);
+		return map;
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index dc7e7ab..bed598a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -24,10 +24,11 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{DataTypes, TableException}
 import org.apache.flink.table.codegen.CodeGeneratorContext
 import org.apache.flink.table.codegen.OperatorCodeGenerator._
-import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.dataformat.DataFormatConverters.DataFormatConverter
+import org.apache.flink.table.dataformat.{BaseRow, DataFormatConverters}
 import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
 import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
@@ -36,7 +37,9 @@ import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.runtime.AbstractProcessStreamOperator
 import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PreserveWatermarks, PunctuatedWatermarkAssigner}
 import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, StreamTableSource, TableSourceUtil}
-import org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, fromLegacyInfoToDataType}
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.types.{DataType, FieldsDataType}
+import org.apache.flink.types.Row
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
@@ -173,7 +176,8 @@ class StreamExecTableSourceScan(
           val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
           ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
         case p: PunctuatedWatermarkAssigner =>
-          val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+          val watermarkGenerator =
+            new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, producedDataType)
           ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
         case _: PreserveWatermarks =>
           // The watermarks have already been provided by the underlying DataStream.
@@ -229,12 +233,19 @@ private class PeriodicWatermarkAssignerWrapper(
   */
 private class PunctuatedWatermarkAssignerWrapper(
     timeFieldIdx: Int,
-    assigner: PunctuatedWatermarkAssigner)
+    assigner: PunctuatedWatermarkAssigner,
+    sourceType: DataType)
   extends AssignerWithPunctuatedWatermarks[BaseRow] {
 
+  private val converter =
+    DataFormatConverters.getConverterForDataType((sourceType match {
+      case _: FieldsDataType => sourceType
+      case _ => DataTypes.ROW(DataTypes.FIELD("f0", sourceType))
+    }).bridgedTo(classOf[Row])).asInstanceOf[DataFormatConverter[BaseRow, Row]]
+
   override def checkAndGetNextWatermark(row: BaseRow, ts: Long): Watermark = {
     val timestamp: Long = row.getLong(timeFieldIdx)
-    assigner.getWatermark(row, timestamp)
+    assigner.getWatermark(converter.toExternal(row), timestamp)
   }
 
   override def extractTimestamp(element: BaseRow, previousElementTimestamp: Long): Long = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
deleted file mode 100644
index 8dbe0e1..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import org.apache.flink.streaming.api.watermark.Watermark
-
-/**
-  * A watermark strategy for ascending rowtime attributes.
-  *
-  * Emits a watermark of the maximum observed timestamp so far minus 1.
-  * Rows that have a timestamp equal to the max timestamp are not late.
-  */
-final class AscendingTimestamps extends PeriodicWatermarkAssigner {
-
-  var maxTimestamp: Long = Long.MinValue + 1
-
-  override def nextTimestamp(timestamp: Long): Unit = {
-    if (timestamp > maxTimestamp) {
-      maxTimestamp = timestamp
-    }
-  }
-
-  override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
-
-  override def equals(obj: Any): Boolean = obj match {
-    case _: AscendingTimestamps => true
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    classOf[AscendingTimestamps].hashCode()
-  }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
deleted file mode 100644
index 088ac80..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.dataformat.BaseRow
-import org.apache.flink.table.descriptors.Rowtime
-
-import java.util
-
-/** A periodic watermark assigner. */
-abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
-
-  /**
-    * Updates the assigner with the next timestamp.
-    *
-    * @param timestamp The next timestamp to update the assigner.
-    */
-  def nextTimestamp(timestamp: Long): Unit
-
-  /**
-    * Returns the current watermark.
-    *
-    * @return The current watermark.
-    */
-  def getWatermark: Watermark
-}
-
-/** A punctuated watermark assigner. */
-abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
-
-  /**
-    * Returns the watermark for the current row or null if no watermark should be generated.
-    *
-    * @param row The current row.
-    * @param timestamp The value of the timestamp attribute for the row.
-    * @return The watermark for this row or null if no watermark should be generated.
-    */
-  def getWatermark(row: BaseRow, timestamp: Long): Watermark
-}
-
-/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
-final class PreserveWatermarks extends WatermarkStrategy {
-
-  override def equals(obj: scala.Any): Boolean =  {
-    obj match {
-      case _: PreserveWatermarks => true
-      case _ => false
-    }
-  }
-
-  override def hashCode(): Int =  {
-    classOf[PreserveWatermarks].hashCode()
-  }
-
-  override def toProperties: util.Map[String, String] = {
-    val javaMap = new util.HashMap[String, String]()
-    javaMap.put(Rowtime.ROWTIME_WATERMARKS_TYPE, Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE)
-    javaMap
-  }
-}
-
-object PreserveWatermarks {
-  val INSTANCE: PreserveWatermarks = new PreserveWatermarks
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
deleted file mode 100644
index 4a3630d..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import java.util
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.descriptors.Rowtime
-
-/**
-  * A watermark strategy for ascending rowtime attributes.
-  *
-  * Emits a watermark of the maximum observed timestamp so far minus 1.
-  * Rows that have a timestamp equal to the max timestamp are not late.
-  */
-final class AscendingTimestamps extends PeriodicWatermarkAssigner {
-
-  var maxTimestamp: Long = Long.MinValue + 1
-
-  override def nextTimestamp(timestamp: Long): Unit = {
-    if (timestamp > maxTimestamp) {
-      maxTimestamp = timestamp
-    }
-  }
-
-  override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
-
-  override def equals(obj: Any): Boolean = obj match {
-    case _: AscendingTimestamps => true
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    classOf[AscendingTimestamps].hashCode()
-  }
-
-  override def toProperties: util.Map[String, String] = {
-    val javaMap = new util.HashMap[String, String]()
-    javaMap.put(
-      Rowtime.ROWTIME_WATERMARKS_TYPE,
-      Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING)
-    javaMap
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
deleted file mode 100644
index 90972b2..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import java.util
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.descriptors.Rowtime
-
-/**
-  * A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
-  *
-  * Emits watermarks which are the maximum observed timestamp minus the specified delay.
-  *
-  * @param delay The delay by which watermarks are behind the maximum observed timestamp.
-  */
-final class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner {
-
-  var maxTimestamp: Long = Long.MinValue + delay
-
-  override def nextTimestamp(timestamp: Long): Unit = {
-    if (timestamp > maxTimestamp) {
-      maxTimestamp = timestamp
-    }
-  }
-
-  override def getWatermark: Watermark = new Watermark(maxTimestamp - delay)
-
-  override def equals(other: Any): Boolean = other match {
-    case that: BoundedOutOfOrderTimestamps =>
-      delay == that.delay
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    delay.hashCode()
-  }
-
-  override def toProperties: util.Map[String, String] = {
-    val javaMap = new util.HashMap[String, String]()
-    javaMap.put(
-      Rowtime.ROWTIME_WATERMARKS_TYPE,
-      Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED)
-    javaMap.put(Rowtime.ROWTIME_WATERMARKS_DELAY, delay.toString)
-    javaMap
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
deleted file mode 100644
index 536eed4..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.wmstrategies
-
-import java.util
-
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.descriptors.Rowtime
-import org.apache.flink.types.Row
-
-/** A periodic watermark assigner. */
-abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
-
-  /**
-    * Updates the assigner with the next timestamp.
-    *
-    * @param timestamp The next timestamp to update the assigner.
-    */
-  def nextTimestamp(timestamp: Long): Unit
-
-  /**
-    * Returns the current watermark.
-    *
-    * @return The current watermark.
-    */
-  def getWatermark: Watermark
-}
-
-/** A punctuated watermark assigner. */
-abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
-
-  /**
-    * Returns the watermark for the current row or null if no watermark should be generated.
-    *
-    * @param row The current row.
-    * @param timestamp The value of the timestamp attribute for the row.
-    * @return The watermark for this row or null if no watermark should be generated.
-    */
-  def getWatermark(row: Row, timestamp: Long): Watermark
-}
-
-/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
-final class PreserveWatermarks extends WatermarkStrategy {
-
-  override def equals(obj: scala.Any): Boolean =  {
-    obj match {
-      case _: PreserveWatermarks => true
-      case _ => false
-    }
-  }
-
-  override def hashCode(): Int =  {
-    classOf[PreserveWatermarks].hashCode()
-  }
-
-  override def toProperties: util.Map[String, String] = {
-    val javaMap = new util.HashMap[String, String]()
-    javaMap.put(Rowtime.ROWTIME_WATERMARKS_TYPE, Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE)
-    javaMap
-  }
-}
-object PreserveWatermarks {
-  val INSTANCE: PreserveWatermarks = new PreserveWatermarks
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index a383682..cf7d882 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -138,7 +138,6 @@ public class DataFormatConverters {
 	 *                   lost its specific Java format. Only DataType retains all its
 	 *                   Java format information.
 	 */
-	@Deprecated
 	public static DataFormatConverter getConverterForDataType(DataType originDataType) {
 		DataType dataType = originDataType.nullable();
 		DataFormatConverter converter = TYPE_TO_CONVERTER.get(dataType);