You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/11/16 02:53:15 UTC

flink git commit: [FLINK-7962] Add built-in support for min/max aggregation for Timestamp

Repository: flink
Updated Branches:
  refs/heads/master b6a2dc345 -> d0b2aa28d


[FLINK-7962] Add built-in support for min/max aggregation for Timestamp

This closes #4936


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0b2aa28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0b2aa28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0b2aa28

Branch: refs/heads/master
Commit: d0b2aa28db529a1c982b3cf61242520fb67efd6a
Parents: b6a2dc3
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Thu Nov 2 11:26:25 2017 +0800
Committer: Jark Wu <ja...@apache.org>
Committed: Thu Nov 16 10:51:29 2017 +0800

----------------------------------------------------------------------
 .../functions/aggfunctions/MaxAggFunction.scala | 11 +++++++
 .../MaxAggFunctionWithRetract.scala             | 11 +++++++
 .../functions/aggfunctions/MinAggFunction.scala | 11 +++++++
 .../MinAggFunctionWithRetract.scala             | 11 +++++++
 .../table/functions/aggfunctions/Ordering.scala | 27 +++++++++++++++++
 .../table/runtime/aggregate/AggregateUtil.scala |  8 +++++
 .../aggfunctions/MaxAggFunctionTest.scala       | 27 +++++++++++++++++
 .../MaxWithRetractAggFunctionTest.scala         | 30 ++++++++++++++++++
 .../aggfunctions/MinAggFunctionTest.scala       | 27 +++++++++++++++++
 .../MinWithRetractAggFunctionTest.scala         | 32 ++++++++++++++++++++
 10 files changed, 195 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
index 0789bee..9097eba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
@@ -19,10 +19,13 @@ package org.apache.flink.table.functions.aggfunctions
 
 import java.math.BigDecimal
 import java.lang.{Iterable => JIterable}
+import java.sql.Timestamp
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Max aggregate function */
@@ -159,3 +162,11 @@ class StringMaxAggFunction extends MaxAggFunction[String] {
   override def getInitValue = ""
   override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
 }
+
+/**
+  * Built-in Timestamp Max aggregate function
+  */
+class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] {
+  override def getInitValue: Timestamp = new Timestamp(0)
+  override def getValueTypeInfo = Types.SQL_TIMESTAMP
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
index c79c06a..fdbfef3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
@@ -20,10 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
 import java.math.BigDecimal
 import java.util.{HashMap => JHashMap}
 import java.lang.{Iterable => JIterable}
+import java.sql.Timestamp
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Max with retraction aggregate function */
@@ -216,3 +219,11 @@ class StringMaxWithRetractAggFunction extends MaxWithRetractAggFunction[String]
   override def getInitValue: String = ""
   override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
 }
+
+/**
+  * Built-in Timestamp Max with retraction aggregate function
+  */
+class TimestampMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Timestamp] {
+  override def getInitValue: Timestamp = new Timestamp(0)
+  override def getValueTypeInfo = Types.SQL_TIMESTAMP
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
index d2132c2..1cb1ab0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
@@ -19,10 +19,13 @@ package org.apache.flink.table.functions.aggfunctions
 
 import java.math.BigDecimal
 import java.lang.{Iterable => JIterable}
+import java.sql.Timestamp
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Min aggregate function */
@@ -159,3 +162,11 @@ class StringMinAggFunction extends MinAggFunction[String] {
   override def getInitValue = ""
   override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
 }
+
+/**
+  * Built-in Timestamp Min aggregate function
+  */
+class TimestampMinAggFunction extends MinAggFunction[Timestamp] {
+  override def getInitValue: Timestamp = new Timestamp(0)
+  override def getValueTypeInfo = Types.SQL_TIMESTAMP
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
index faa6725..44fa37f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
@@ -20,10 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
 import java.math.BigDecimal
 import java.util.{HashMap => JHashMap}
 import java.lang.{Iterable => JIterable}
+import java.sql.Timestamp
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Min with retraction aggregate function */
@@ -216,3 +219,11 @@ class StringMinWithRetractAggFunction extends MinWithRetractAggFunction[String]
   override def getInitValue: String = ""
   override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
 }
+
+/**
+  * Built-in Timestamp Min with retraction aggregate function
+  */
+class TimestampMinWithRetractAggFunction extends MinWithRetractAggFunction[Timestamp] {
+  override def getInitValue: Timestamp = new Timestamp(0)
+  override def getValueTypeInfo = Types.SQL_TIMESTAMP
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
new file mode 100644
index 0000000..15ea2e3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.sql.Timestamp
+
+object Ordering {
+  implicit object TimestampOrdering extends Ordering[Timestamp] {
+    override def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index a867b1c..79cc258 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1267,6 +1267,8 @@ object AggregateUtil {
                   new BooleanMinWithRetractAggFunction
                 case VARCHAR | CHAR =>
                   new StringMinWithRetractAggFunction
+                case TIMESTAMP =>
+                  new TimestampMinWithRetractAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(
                     s"Min with retract aggregate does no support type: '$sqlType'")
@@ -1291,6 +1293,8 @@ object AggregateUtil {
                   new BooleanMinAggFunction
                 case VARCHAR | CHAR =>
                   new StringMinAggFunction
+                case TIMESTAMP =>
+                  new TimestampMinAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(s"Min aggregate does no support type: '$sqlType'")
               }
@@ -1316,6 +1320,8 @@ object AggregateUtil {
                   new BooleanMaxWithRetractAggFunction
                 case VARCHAR | CHAR =>
                   new StringMaxWithRetractAggFunction
+                case TIMESTAMP =>
+                  new TimestampMaxWithRetractAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(
                     s"Max with retract aggregate does no support type: '$sqlType'")
@@ -1340,6 +1346,8 @@ object AggregateUtil {
                   new BooleanMaxAggFunction
                 case VARCHAR | CHAR =>
                   new StringMaxAggFunction
+                case TIMESTAMP =>
+                  new TimestampMaxAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(s"Max aggregate does no support type: '$sqlType'")
               }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
index 8a46ec5..03faa80 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.aggfunctions
 
 import java.math.BigDecimal
+import java.sql.Timestamp
 
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.aggfunctions._
@@ -230,3 +231,29 @@ class StringMaxAggFunctionTest extends AggFunctionTestBase[String, MaxAccumulato
   override def aggregator: AggregateFunction[String, MaxAccumulator[String]] =
     new StringMaxAggFunction()
 }
+
+class TimestampMaxAggFunctionTest
+  extends AggFunctionTestBase[Timestamp, MaxAccumulator[Timestamp]] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Timestamp(0),
+      new Timestamp(1000),
+      new Timestamp(100),
+      null.asInstanceOf[Timestamp],
+      new Timestamp(10)
+    ),
+    Seq(
+      null.asInstanceOf[Timestamp],
+      null.asInstanceOf[Timestamp],
+      null.asInstanceOf[Timestamp]
+    )
+  )
+
+  override def expectedResults: Seq[Timestamp] = Seq(
+    new Timestamp(1000),
+    null.asInstanceOf[Timestamp]
+  )
+
+  override def aggregator: AggregateFunction[Timestamp, MaxAccumulator[Timestamp]] =
+    new TimestampMaxAggFunction()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
index 246d964..eb620b4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.aggfunctions
 
 import java.math.BigDecimal
+import java.sql.Timestamp
 
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.aggfunctions._
@@ -242,3 +243,32 @@ class StringMaxWithRetractAggFunctionTest
 
   override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
 }
+
+class TimestampMaxWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Timestamp, MaxWithRetractAccumulator[Timestamp]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Timestamp(0),
+      new Timestamp(1000),
+      new Timestamp(100),
+      null.asInstanceOf[Timestamp],
+      new Timestamp(10)
+    ),
+    Seq(
+      null.asInstanceOf[Timestamp],
+      null.asInstanceOf[Timestamp],
+      null.asInstanceOf[Timestamp]
+    )
+  )
+
+  override def expectedResults: Seq[Timestamp] = Seq(
+    new Timestamp(1000),
+    null.asInstanceOf[Timestamp]
+  )
+
+  override def aggregator: AggregateFunction[Timestamp, MaxWithRetractAccumulator[Timestamp]] =
+    new TimestampMaxWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
index 80fcace..992d1fc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.aggfunctions
 
 import java.math.BigDecimal
+import java.sql.Timestamp
 
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.aggfunctions._
@@ -231,3 +232,29 @@ class StringMinAggFunctionTest
   override def aggregator: AggregateFunction[String, MinAccumulator[String]] =
     new StringMinAggFunction()
 }
+
+class TimestampMinAggFunctionTest
+  extends AggFunctionTestBase[Timestamp, MinAccumulator[Timestamp]] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Timestamp(0),
+      new Timestamp(1000),
+      new Timestamp(100),
+      null.asInstanceOf[Timestamp],
+      new Timestamp(10)
+    ),
+    Seq(
+      null.asInstanceOf[Timestamp],
+      null.asInstanceOf[Timestamp],
+      null.asInstanceOf[Timestamp]
+    )
+  )
+
+  override def expectedResults: Seq[Timestamp] = Seq(
+    new Timestamp(0),
+    null.asInstanceOf[Timestamp]
+  )
+
+  override def aggregator: AggregateFunction[Timestamp, MinAccumulator[Timestamp]] =
+    new TimestampMinAggFunction()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
index c4273f6..323a651 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.aggfunctions
 
 import java.math.BigDecimal
+import java.sql.Timestamp
 
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.aggfunctions._
@@ -242,3 +243,34 @@ class StringMinWithRetractAggFunctionTest
 
   override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
 }
+
+class TimestampMinWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Timestamp, MinWithRetractAccumulator[Timestamp]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new Timestamp(0),
+      new Timestamp(1000),
+      new Timestamp(100),
+      null.asInstanceOf[Timestamp],
+      new Timestamp(10)
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[Timestamp] = Seq(
+    new Timestamp(0),
+    null
+  )
+
+  override def aggregator: AggregateFunction[Timestamp, MinWithRetractAccumulator[Timestamp]] =
+    new TimestampMinWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}