You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/13 15:39:12 UTC

[flink] branch master updated: [hotfix][table-planner] Port TimeIndicatorTypeInfo to table-common

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e8df4fb  [hotfix][table-planner] Port TimeIndicatorTypeInfo to table-common
e8df4fb is described below

commit e8df4fb1e50ffc353e807eaf1aa3ee106b16559a
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue May 7 18:09:37 2019 +0200

    [hotfix][table-planner] Port TimeIndicatorTypeInfo to table-common
    
    This closes #8363.
---
 .../table/typeutils/TimeIndicatorTypeInfo.java     | 77 ++++++++++++++++++++++
 .../table/typeutils/TimeIndicatorTypeInfo.scala    | 59 -----------------
 2 files changed, 77 insertions(+), 59 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java
new file mode 100644
index 0000000..e46bf3f
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
+
+import java.sql.Timestamp;
+
+/**
+ * Type information for indicating event or processing time. However, it behaves like a
+ * regular SQL timestamp but is serialized as Long.
+ */
+@Internal
+public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo<Timestamp> {
+
+	private final boolean isEventTime;
+
+	public static final int ROWTIME_STREAM_MARKER = -1;
+	public static final int PROCTIME_STREAM_MARKER = -2;
+
+	public static final int ROWTIME_BATCH_MARKER = -3;
+	public static final int PROCTIME_BATCH_MARKER = -4;
+
+	public static final TimeIndicatorTypeInfo ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true);
+	public static final TimeIndicatorTypeInfo PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false);
+
+	@SuppressWarnings("unchecked")
+	protected TimeIndicatorTypeInfo(boolean isEventTime) {
+		super(Timestamp.class, SqlTimestampSerializer.INSTANCE, (Class) SqlTimestampComparator.class);
+		this.isEventTime = isEventTime;
+	}
+
+	// this replaces the effective serializer by a LongSerializer
+	// it is a hacky but efficient solution to keep the object creation overhead low but still
+	// be compatible with the corresponding SqlTimestampTypeInfo
+	@Override
+	@SuppressWarnings("unchecked")
+	public TypeSerializer<Timestamp> createSerializer(ExecutionConfig executionConfig) {
+		return (TypeSerializer) LongSerializer.INSTANCE;
+	}
+
+	public boolean isEventTime() {
+		return isEventTime;
+	}
+
+	@Override
+	public String toString() {
+		if (isEventTime) {
+			return "TimeIndicatorTypeInfo(rowtime)";
+		} else {
+			return "TimeIndicatorTypeInfo(proctime)";
+		}
+	}
+}
+
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
deleted file mode 100644
index ad82d52..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils
-
-import java.sql.Timestamp
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.api.common.typeutils.base.{LongSerializer, SqlTimestampComparator, SqlTimestampSerializer}
-
-/**
-  * Type information for indicating event or processing time. However, it behaves like a
-  * regular SQL timestamp but is serialized as Long.
-  */
-class TimeIndicatorTypeInfo(val isEventTime: Boolean)
-  extends SqlTimeTypeInfo[Timestamp](
-    classOf[Timestamp],
-    SqlTimestampSerializer.INSTANCE,
-    classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) {
-
-  // this replaces the effective serializer by a LongSerializer
-  // it is a hacky but efficient solution to keep the object creation overhead low but still
-  // be compatible with the corresponding SqlTimestampTypeInfo
-  override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Timestamp] =
-    LongSerializer.INSTANCE.asInstanceOf[TypeSerializer[Timestamp]]
-
-  override def toString: String =
-    s"TimeIndicatorTypeInfo(${if (isEventTime) "rowtime" else "proctime" })"
-}
-
-object TimeIndicatorTypeInfo {
-
-  val ROWTIME_STREAM_MARKER: Int = -1
-  val PROCTIME_STREAM_MARKER: Int = -2
-
-  val ROWTIME_BATCH_MARKER: Int = -3
-  val PROCTIME_BATCH_MARKER: Int = -4
-
-  val ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true)
-  val PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false)
-
-}