You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/11/16 02:53:15 UTC
flink git commit: [FLINK-7962] Add built-in support for min/max
aggregation for Timestamp
Repository: flink
Updated Branches:
refs/heads/master b6a2dc345 -> d0b2aa28d
[FLINK-7962] Add built-in support for min/max aggregation for Timestamp
This closes #4936
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0b2aa28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0b2aa28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0b2aa28
Branch: refs/heads/master
Commit: d0b2aa28db529a1c982b3cf61242520fb67efd6a
Parents: b6a2dc3
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Thu Nov 2 11:26:25 2017 +0800
Committer: Jark Wu <ja...@apache.org>
Committed: Thu Nov 16 10:51:29 2017 +0800
----------------------------------------------------------------------
.../functions/aggfunctions/MaxAggFunction.scala | 11 +++++++
.../MaxAggFunctionWithRetract.scala | 11 +++++++
.../functions/aggfunctions/MinAggFunction.scala | 11 +++++++
.../MinAggFunctionWithRetract.scala | 11 +++++++
.../table/functions/aggfunctions/Ordering.scala | 27 +++++++++++++++++
.../table/runtime/aggregate/AggregateUtil.scala | 8 +++++
.../aggfunctions/MaxAggFunctionTest.scala | 27 +++++++++++++++++
.../MaxWithRetractAggFunctionTest.scala | 30 ++++++++++++++++++
.../aggfunctions/MinAggFunctionTest.scala | 27 +++++++++++++++++
.../MinWithRetractAggFunctionTest.scala | 32 ++++++++++++++++++++
10 files changed, 195 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
index 0789bee..9097eba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
@@ -19,10 +19,13 @@ package org.apache.flink.table.functions.aggfunctions
import java.math.BigDecimal
import java.lang.{Iterable => JIterable}
+import java.sql.Timestamp
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.AggregateFunction
/** The initial accumulator for Max aggregate function */
@@ -159,3 +162,11 @@ class StringMaxAggFunction extends MaxAggFunction[String] {
override def getInitValue = ""
override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
}
+
+/**
+ * Built-in Timestamp Max aggregate function
+ */
+class TimestampMaxAggFunction extends MaxAggFunction[Timestamp] {
+ override def getInitValue: Timestamp = new Timestamp(0)
+ override def getValueTypeInfo = Types.SQL_TIMESTAMP
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
index c79c06a..fdbfef3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
@@ -20,10 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
import java.math.BigDecimal
import java.util.{HashMap => JHashMap}
import java.lang.{Iterable => JIterable}
+import java.sql.Timestamp
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.AggregateFunction
/** The initial accumulator for Max with retraction aggregate function */
@@ -216,3 +219,11 @@ class StringMaxWithRetractAggFunction extends MaxWithRetractAggFunction[String]
override def getInitValue: String = ""
override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
}
+
+/**
+ * Built-in Timestamp Max with retraction aggregate function
+ */
+class TimestampMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Timestamp] {
+ override def getInitValue: Timestamp = new Timestamp(0)
+ override def getValueTypeInfo = Types.SQL_TIMESTAMP
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
index d2132c2..1cb1ab0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
@@ -19,10 +19,13 @@ package org.apache.flink.table.functions.aggfunctions
import java.math.BigDecimal
import java.lang.{Iterable => JIterable}
+import java.sql.Timestamp
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.AggregateFunction
/** The initial accumulator for Min aggregate function */
@@ -159,3 +162,11 @@ class StringMinAggFunction extends MinAggFunction[String] {
override def getInitValue = ""
override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
}
+
+/**
+ * Built-in Timestamp Min aggregate function
+ */
+class TimestampMinAggFunction extends MinAggFunction[Timestamp] {
+ override def getInitValue: Timestamp = new Timestamp(0)
+ override def getValueTypeInfo = Types.SQL_TIMESTAMP
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
index faa6725..44fa37f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
@@ -20,10 +20,13 @@ package org.apache.flink.table.functions.aggfunctions
import java.math.BigDecimal
import java.util.{HashMap => JHashMap}
import java.lang.{Iterable => JIterable}
+import java.sql.Timestamp
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.aggfunctions.Ordering.TimestampOrdering
import org.apache.flink.table.functions.AggregateFunction
/** The initial accumulator for Min with retraction aggregate function */
@@ -216,3 +219,11 @@ class StringMinWithRetractAggFunction extends MinWithRetractAggFunction[String]
override def getInitValue: String = ""
override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
}
+
+/**
+ * Built-in Timestamp Min with retraction aggregate function
+ */
+class TimestampMinWithRetractAggFunction extends MinWithRetractAggFunction[Timestamp] {
+ override def getInitValue: Timestamp = new Timestamp(0)
+ override def getValueTypeInfo = Types.SQL_TIMESTAMP
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
new file mode 100644
index 0000000..15ea2e3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Ordering.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.sql.Timestamp
+
+object Ordering {
+ implicit object TimestampOrdering extends Ordering[Timestamp] {
+ override def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index a867b1c..79cc258 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1267,6 +1267,8 @@ object AggregateUtil {
new BooleanMinWithRetractAggFunction
case VARCHAR | CHAR =>
new StringMinWithRetractAggFunction
+ case TIMESTAMP =>
+ new TimestampMinWithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(
s"Min with retract aggregate does no support type: '$sqlType'")
@@ -1291,6 +1293,8 @@ object AggregateUtil {
new BooleanMinAggFunction
case VARCHAR | CHAR =>
new StringMinAggFunction
+ case TIMESTAMP =>
+ new TimestampMinAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Min aggregate does no support type: '$sqlType'")
}
@@ -1316,6 +1320,8 @@ object AggregateUtil {
new BooleanMaxWithRetractAggFunction
case VARCHAR | CHAR =>
new StringMaxWithRetractAggFunction
+ case TIMESTAMP =>
+ new TimestampMaxWithRetractAggFunction
case sqlType: SqlTypeName =>
throw new TableException(
s"Max with retract aggregate does no support type: '$sqlType'")
@@ -1340,6 +1346,8 @@ object AggregateUtil {
new BooleanMaxAggFunction
case VARCHAR | CHAR =>
new StringMaxAggFunction
+ case TIMESTAMP =>
+ new TimestampMaxAggFunction
case sqlType: SqlTypeName =>
throw new TableException(s"Max aggregate does no support type: '$sqlType'")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
index 8a46ec5..03faa80 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions
import java.math.BigDecimal
+import java.sql.Timestamp
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
@@ -230,3 +231,29 @@ class StringMaxAggFunctionTest extends AggFunctionTestBase[String, MaxAccumulato
override def aggregator: AggregateFunction[String, MaxAccumulator[String]] =
new StringMaxAggFunction()
}
+
+class TimestampMaxAggFunctionTest
+ extends AggFunctionTestBase[Timestamp, MaxAccumulator[Timestamp]] {
+ override def inputValueSets: Seq[Seq[_]] = Seq(
+ Seq(
+ new Timestamp(0),
+ new Timestamp(1000),
+ new Timestamp(100),
+ null.asInstanceOf[Timestamp],
+ new Timestamp(10)
+ ),
+ Seq(
+ null.asInstanceOf[Timestamp],
+ null.asInstanceOf[Timestamp],
+ null.asInstanceOf[Timestamp]
+ )
+ )
+
+ override def expectedResults: Seq[Timestamp] = Seq(
+ new Timestamp(1000),
+ null.asInstanceOf[Timestamp]
+ )
+
+ override def aggregator: AggregateFunction[Timestamp, MaxAccumulator[Timestamp]] =
+ new TimestampMaxAggFunction()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
index 246d964..eb620b4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions
import java.math.BigDecimal
+import java.sql.Timestamp
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
@@ -242,3 +243,32 @@ class StringMaxWithRetractAggFunctionTest
override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
}
+
+class TimestampMaxWithRetractAggFunctionTest
+ extends AggFunctionTestBase[Timestamp, MaxWithRetractAccumulator[Timestamp]] {
+
+ override def inputValueSets: Seq[Seq[_]] = Seq(
+ Seq(
+ new Timestamp(0),
+ new Timestamp(1000),
+ new Timestamp(100),
+ null.asInstanceOf[Timestamp],
+ new Timestamp(10)
+ ),
+ Seq(
+ null.asInstanceOf[Timestamp],
+ null.asInstanceOf[Timestamp],
+ null.asInstanceOf[Timestamp]
+ )
+ )
+
+ override def expectedResults: Seq[Timestamp] = Seq(
+ new Timestamp(1000),
+ null.asInstanceOf[Timestamp]
+ )
+
+ override def aggregator: AggregateFunction[Timestamp, MaxWithRetractAccumulator[Timestamp]] =
+ new TimestampMaxWithRetractAggFunction()
+
+ override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
index 80fcace..992d1fc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions
import java.math.BigDecimal
+import java.sql.Timestamp
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
@@ -231,3 +232,29 @@ class StringMinAggFunctionTest
override def aggregator: AggregateFunction[String, MinAccumulator[String]] =
new StringMinAggFunction()
}
+
+class TimestampMinAggFunctionTest
+ extends AggFunctionTestBase[Timestamp, MinAccumulator[Timestamp]] {
+ override def inputValueSets: Seq[Seq[_]] = Seq(
+ Seq(
+ new Timestamp(0),
+ new Timestamp(1000),
+ new Timestamp(100),
+ null.asInstanceOf[Timestamp],
+ new Timestamp(10)
+ ),
+ Seq(
+ null.asInstanceOf[Timestamp],
+ null.asInstanceOf[Timestamp],
+ null.asInstanceOf[Timestamp]
+ )
+ )
+
+ override def expectedResults: Seq[Timestamp] = Seq(
+ new Timestamp(0),
+ null.asInstanceOf[Timestamp]
+ )
+
+ override def aggregator: AggregateFunction[Timestamp, MinAccumulator[Timestamp]] =
+ new TimestampMinAggFunction()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0b2aa28/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
index c4273f6..323a651 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.aggfunctions
import java.math.BigDecimal
+import java.sql.Timestamp
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.aggfunctions._
@@ -242,3 +243,34 @@ class StringMinWithRetractAggFunctionTest
override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
}
+
+class TimestampMinWithRetractAggFunctionTest
+ extends AggFunctionTestBase[Timestamp, MinWithRetractAccumulator[Timestamp]] {
+
+ override def inputValueSets: Seq[Seq[_]] = Seq(
+ Seq(
+ new Timestamp(0),
+ new Timestamp(1000),
+ new Timestamp(100),
+ null.asInstanceOf[Timestamp],
+ new Timestamp(10)
+ ),
+ Seq(
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ )
+
+ override def expectedResults: Seq[Timestamp] = Seq(
+ new Timestamp(0),
+ null
+ )
+
+ override def aggregator: AggregateFunction[Timestamp, MinWithRetractAccumulator[Timestamp]] =
+ new TimestampMinWithRetractAggFunction()
+
+ override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}