You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/03 08:43:20 UTC
[1/3] flink git commit: [FLINK-6226] [table] Add tests for UDFs with
Byte, Short, and Float arguments.
Repository: flink
Updated Branches:
refs/heads/master 37df826e4 -> 16b088218
[FLINK-6226] [table] Add tests for UDFs with Byte, Short, and Float arguments.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e118d1d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e118d1d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e118d1d
Branch: refs/heads/master
Commit: 6e118d1dc97b3a8c0b013d2002fad80219751253
Parents: 37df826
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Nov 2 21:10:03 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Nov 2 23:10:09 2017 +0100
----------------------------------------------------------------------
.../UserDefinedScalarFunctionTest.scala | 28 ++++++++++++++++++--
.../utils/userDefinedScalarFunctions.scala | 6 +++++
.../runtime/batch/table/CorrelateITCase.scala | 23 +++++++++++++++-
.../table/utils/UserDefinedTableFunctions.scala | 12 ++++++++-
4 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e118d1d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index 71ff70d..a3b2f07 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -48,6 +48,24 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
"43")
testAllApis(
+ Func1('f11),
+ "Func1(f11)",
+ "Func1(f11)",
+ "4")
+
+ testAllApis(
+ Func1('f12),
+ "Func1(f12)",
+ "Func1(f12)",
+ "4")
+
+ testAllApis(
+ Func1('f13),
+ "Func1(f13)",
+ "Func1(f13)",
+ "4.0")
+
+ testAllApis(
Func2('f0, 'f1, 'f3),
"Func2(f0, f1, f3)",
"Func2(f0, f1, f3)",
@@ -360,7 +378,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
// ----------------------------------------------------------------------------------------------
override def testData: Any = {
- val testData = new Row(11)
+ val testData = new Row(14)
testData.setField(0, 42)
testData.setField(1, "Test")
testData.setField(2, null)
@@ -372,6 +390,9 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
testData.setField(8, 1000L)
testData.setField(9, Seq("Hello", "World"))
testData.setField(10, Array[Integer](1, 2, null))
+ testData.setField(11, 3.toByte)
+ testData.setField(12, 3.toShort)
+ testData.setField(13, 3.toFloat)
testData
}
@@ -387,7 +408,10 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
Types.INTERVAL_MONTHS,
Types.INTERVAL_MILLIS,
TypeInformation.of(classOf[Seq[String]]),
- BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+ BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO,
+ Types.BYTE,
+ Types.SHORT,
+ Types.FLOAT
).asInstanceOf[TypeInformation[Any]]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e118d1d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 5285569..9535cdf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -41,6 +41,12 @@ object Func1 extends ScalarFunction {
def eval(index: Integer): Integer = {
index + 1
}
+
+ def eval(b: Byte): Byte = (b + 1).toByte
+
+ def eval(s: Short): Short = (s + 1).toShort
+
+ def eval(f: Float): Float = f + 1
}
object Func2 extends ScalarFunction {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e118d1d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index b109752..79243dd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc0
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.utils.{Func1, Func13, Func18, RichFunc2}
@@ -231,6 +231,27 @@ class CorrelateITCase(
}
@Test
+ def testByteShortFloatArguments(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+ val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+ val tFunc = new TableFunc4
+
+ val result = in
+ .select('a.cast(Types.BYTE) as 'a, 'a.cast(Types.SHORT) as 'b, 'b.cast(Types.FLOAT) as 'c)
+ .join(tFunc('a, 'b, 'c) as ('a2, 'b2, 'c2))
+ .toDataSet[Row]
+
+ val results = result.collect()
+ val expected = Seq(
+ "1,1,1.0,Byte=1,Short=1,Float=1.0",
+ "2,2,2.0,Byte=2,Short=2,Float=2.0",
+ "3,3,2.0,Byte=3,Short=3,Float=2.0",
+ "4,4,3.0,Byte=4,Short=4,Float=3.0").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
def testUserDefinedTableFunctionWithParameter(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
http://git-wip-us.apache.org/repos/asf/flink/blob/6e118d1d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
index d0ffade..e1af23b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
@@ -22,7 +22,7 @@ import java.lang.Boolean
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.Tuple3
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{Types, ValidationException}
import org.apache.flink.table.functions.{FunctionContext, TableFunction}
import org.apache.flink.types.Row
import org.junit.Assert
@@ -109,6 +109,16 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
}
}
+class TableFunc4 extends TableFunction[Row] {
+ def eval(b: Byte, s: Short, f: Float): Unit = {
+ collect(Row.of("Byte=" + b, "Short=" + s, "Float=" + f))
+ }
+
+ override def getResultType: TypeInformation[Row] = {
+ new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING)
+ }
+}
+
class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
def eval(user: String) {
if (user.contains("#")) {
[3/3] flink git commit: [FLINK-7338] [table] Fix retrieval of OVER
window lower bound.
Posted by fh...@apache.org.
[FLINK-7338] [table] Fix retrieval of OVER window lower bound.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16b08821
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16b08821
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16b08821
Branch: refs/heads/master
Commit: 16b088218435dc64848ad641a383f9ce808f07c2
Parents: 78c8ea2
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Nov 2 23:09:36 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Nov 3 00:01:44 2017 +0100
----------------------------------------------------------------------
.../flink/table/plan/nodes/OverAggregate.scala | 21 ++--
.../runtime/stream/sql/OverWindowITCase.scala | 119 ++++++++++++-------
2 files changed, 85 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/16b08821/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
index 87ebd86..f9bf803 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
@@ -35,8 +35,8 @@ trait OverAggregate {
}
private[flink] def orderingToString(
- inputType: RelDataType,
- orderFields: java.util.List[RelFieldCollation]): String = {
+ inputType: RelDataType,
+ orderFields: java.util.List[RelFieldCollation]): String = {
val inFields = inputType.getFieldList.asScala
@@ -48,9 +48,9 @@ trait OverAggregate {
}
private[flink] def windowRange(
- logicWindow: Window,
- overWindow: Group,
- input: RelNode): String = {
+ logicWindow: Window,
+ overWindow: Group,
+ input: RelNode): String = {
if (overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded) {
s"BETWEEN ${getLowerBoundary(logicWindow, overWindow, input)} PRECEDING " +
s"AND ${overWindow.upperBound}"
@@ -63,8 +63,7 @@ trait OverAggregate {
inputType: RelDataType,
constants: Seq[RexLiteral],
rowType: RelDataType,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]])
- : String = {
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = {
val inFields = inputType.getFieldNames.asScala
val outFields = rowType.getFieldNames.asScala
@@ -97,12 +96,12 @@ trait OverAggregate {
}
private[flink] def getLowerBoundary(
- logicWindow: Window,
- overWindow: Group,
- input: RelNode): Long = {
+ logicWindow: Window,
+ overWindow: Group,
+ input: RelNode): Long = {
val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
- val lowerBoundIndex = input.getRowType.getFieldCount - ref.getIndex
+ val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
lowerBound match {
case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
http://git-wip-us.apache.org/repos/asf/flink/blob/16b08821/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
index 4884513..9bfdc4c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.stream.sql
import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.java.tuple.Tuple1
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -28,6 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
import org.junit.Assert._
@@ -293,13 +295,16 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
+ tEnv.registerFunction("LTCNT", new LargerThanCount)
val sqlQuery = "SELECT " +
" c, b, " +
+ " LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
" COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
" BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
" SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
- " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
" FROM T1"
val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
@@ -307,16 +312,17 @@ class OverWindowITCase extends StreamingWithStateTestBase {
env.execute()
val expected = List(
- "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
- "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
- "Hello,3,4,9",
- "Hello,4,2,7",
- "Hello,5,2,9",
- "Hello,6,2,11", "Hello,65,2,12",
- "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
- "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
- "Hello World,8,2,15",
- "Hello World,20,1,20")
+ "Hello,1,0,1,1", "Hello,15,0,2,2", "Hello,16,0,3,3",
+ "Hello,2,0,6,9", "Hello,3,0,6,9", "Hello,2,0,6,9",
+ "Hello,3,0,4,9",
+ "Hello,4,0,2,7",
+ "Hello,5,1,2,9",
+ "Hello,6,2,2,11", "Hello,65,2,2,12",
+ "Hello,9,2,2,12", "Hello,9,2,2,12", "Hello,18,3,3,18",
+ "Hello World,7,1,1,7", "Hello World,17,3,3,21", "Hello World,77,3,3,21",
+ "Hello World,18,1,1,7",
+ "Hello World,8,2,2,15",
+ "Hello World,20,1,1,20")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -354,9 +360,12 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
+ tEnv.registerFunction("LTCNT", new LargerThanCount)
val sqlQuery = "SELECT " +
" c, a, " +
+ " LTCNT(a, CAST('4' AS BIGINT)) " +
+ " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
" COUNT(a) " +
" OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
" SUM(a) " +
@@ -368,12 +377,12 @@ class OverWindowITCase extends StreamingWithStateTestBase {
env.execute()
val expected = List(
- "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
- "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
- "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
- "Hello,6,3,15",
- "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
- "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+ "Hello,1,0,1,1", "Hello,1,0,2,2", "Hello,1,0,3,3",
+ "Hello,2,0,3,4", "Hello,2,0,3,5", "Hello,2,0,3,6",
+ "Hello,3,0,3,7", "Hello,4,0,3,9", "Hello,5,1,3,12",
+ "Hello,6,2,3,15",
+ "Hello World,7,1,1,7", "Hello World,7,2,2,14", "Hello World,7,3,3,21",
+ "Hello World,7,3,3,21", "Hello World,8,3,3,22", "Hello World,20,3,3,35")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -518,6 +527,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
StreamITCase.clear
val sqlQuery = "SELECT a, b, c, " +
+ " LTCNT(b, CAST('4' AS BIGINT)) OVER(" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
" SUM(b) OVER (" +
" PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
" COUNT(b) OVER (" +
@@ -552,25 +563,26 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
+ tEnv.registerFunction("LTCNT", new LargerThanCount)
val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
- "1,1,Hello,6,3,2,3,1",
- "1,2,Hello,6,3,2,3,1",
- "1,3,Hello world,6,3,2,3,1",
- "1,1,Hi,7,4,1,3,1",
- "2,1,Hello,1,1,1,1,1",
- "2,2,Hello world,6,3,2,3,1",
- "2,3,Hello world,6,3,2,3,1",
- "1,4,Hello world,11,5,2,4,1",
- "1,5,Hello world,29,8,3,7,1",
- "1,6,Hello world,29,8,3,7,1",
- "1,7,Hello world,29,8,3,7,1",
- "2,4,Hello world,15,5,3,5,1",
- "2,5,Hello world,15,5,3,5,1")
+ "1,1,Hello,0,6,3,2,3,1",
+ "1,2,Hello,0,6,3,2,3,1",
+ "1,3,Hello world,0,6,3,2,3,1",
+ "1,1,Hi,0,7,4,1,3,1",
+ "2,1,Hello,0,1,1,1,1,1",
+ "2,2,Hello world,0,6,3,2,3,1",
+ "2,3,Hello world,0,6,3,2,3,1",
+ "1,4,Hello world,0,11,5,2,4,1",
+ "1,5,Hello world,3,29,8,3,7,1",
+ "1,6,Hello world,3,29,8,3,7,1",
+ "1,7,Hello world,3,29,8,3,7,1",
+ "2,4,Hello world,1,15,5,3,5,1",
+ "2,5,Hello world,1,15,5,3,5,1")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -583,6 +595,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
StreamITCase.testResults = mutable.MutableList()
val sqlQuery = "SELECT a, b, c, " +
+ "LTCNT(b, CAST('4' AS BIGINT)) over(" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
"SUM(b) over (" +
"partition by a order by rowtime rows between unbounded preceding and current row), " +
"count(b) over (" +
@@ -618,26 +632,27 @@ class OverWindowITCase extends StreamingWithStateTestBase {
.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
+ tEnv.registerFunction("LTCNT", new LargerThanCount)
val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = mutable.MutableList(
- "1,2,Hello,2,1,2,2,2",
- "1,3,Hello world,5,2,2,3,2",
- "1,1,Hi,6,3,2,3,1",
- "2,1,Hello,1,1,1,1,1",
- "2,2,Hello world,3,2,1,2,1",
- "3,1,Hello,1,1,1,1,1",
- "3,2,Hello world,3,2,1,2,1",
- "1,5,Hello world,11,4,2,5,1",
- "1,6,Hello world,17,5,3,6,1",
- "1,9,Hello world,26,6,4,9,1",
- "1,8,Hello world,34,7,4,9,1",
- "1,7,Hello world,41,8,5,9,1",
- "2,5,Hello world,8,3,2,5,1",
- "3,5,Hello world,8,3,2,5,1")
+ "1,2,Hello,0,2,1,2,2,2",
+ "1,3,Hello world,0,5,2,2,3,2",
+ "1,1,Hi,0,6,3,2,3,1",
+ "2,1,Hello,0,1,1,1,1,1",
+ "2,2,Hello world,0,3,2,1,2,1",
+ "3,1,Hello,0,1,1,1,1,1",
+ "3,2,Hello world,0,3,2,1,2,1",
+ "1,5,Hello world,1,11,4,2,5,1",
+ "1,6,Hello world,2,17,5,3,6,1",
+ "1,9,Hello world,3,26,6,4,9,1",
+ "1,8,Hello world,4,34,7,4,9,1",
+ "1,7,Hello world,5,41,8,5,9,1",
+ "2,5,Hello world,1,8,3,2,5,1",
+ "3,5,Hello world,1,8,3,2,5,1")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -852,3 +867,19 @@ object OverWindowITCase {
override def cancel(): Unit = ???
}
}
+
+/** Counts how often the first argument was larger than the second argument. */
+class LargerThanCount extends AggregateFunction[Long, Tuple1[Long]] {
+
+ def accumulate(acc: Tuple1[Long], a: Long, b: Long): Unit = {
+ if (a > b) acc.f0 += 1
+ }
+
+ def retract(acc: Tuple1[Long], a: Long, b: Long): Unit = {
+ if (a > b) acc.f0 -= 1
+ }
+
+ override def createAccumulator(): Tuple1[Long] = Tuple1.of(0L)
+
+ override def getValue(acc: Tuple1[Long]): Long = acc.f0
+}
[2/3] flink git commit: [FLINK-7421] [table] Fix serializability of
AvroRowSerializationSchema + AvroRowDeserializationSchema.
Posted by fh...@apache.org.
[FLINK-7421] [table] Fix serializability of AvroRowSerializationSchema + AvroRowDeserializationSchema.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78c8ea20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78c8ea20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78c8ea20
Branch: refs/heads/master
Commit: 78c8ea2085439b0d4fe0d7187044b44b6f110aff
Parents: 6e118d1
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Nov 2 21:40:26 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Nov 3 00:01:40 2017 +0100
----------------------------------------------------------------------
.../AvroRowDeserializationSchema.java | 30 +++++++++++++++++---
.../AvroRowSerializationSchema.java | 29 ++++++++++++++++---
.../kafka/AvroRowDeSerializationSchemaTest.java | 24 ++++++++++++++++
3 files changed, 75 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
index 0713738..c7f1d82 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
@@ -32,6 +32,8 @@ import org.apache.avro.util.Utf8;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.List;
/**
@@ -44,24 +46,29 @@ import java.util.List;
public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
/**
+ * Avro record class.
+ */
+ private Class<? extends SpecificRecord> recordClazz;
+
+ /**
* Schema for deterministic field order.
*/
- private final Schema schema;
+ private transient Schema schema;
/**
* Reader that deserializes byte array into a record.
*/
- private final DatumReader<SpecificRecord> datumReader;
+ private transient DatumReader<SpecificRecord> datumReader;
/**
* Input stream to read message from.
*/
- private final MutableByteArrayInputStream inputStream;
+ private transient MutableByteArrayInputStream inputStream;
/**
* Avro decoder that decodes binary data.
*/
- private final Decoder decoder;
+ private transient Decoder decoder;
/**
* Record to deserialize byte array to.
@@ -75,6 +82,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
*/
public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+ this.recordClazz = recordClazz;
this.schema = SpecificData.get().getSchema(recordClazz);
this.datumReader = new SpecificDatumReader<>(schema);
this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
@@ -97,6 +105,20 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
return (Row) row;
}
+ private void writeObject(ObjectOutputStream oos) throws IOException {
+ oos.writeObject(recordClazz);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+ this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+ this.schema = SpecificData.get().getSchema(recordClazz);
+ this.datumReader = new SpecificDatumReader<>(schema);
+ this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
+ this.inputStream = new MutableByteArrayInputStream();
+ this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ }
+
/**
* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
* Avro's {@link Utf8} fields are converted into regular Java strings.
http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
index 450c78f..09acc6a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
@@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.List;
/**
@@ -42,24 +44,29 @@ import java.util.List;
public class AvroRowSerializationSchema implements SerializationSchema<Row> {
/**
+ * Avro record class.
+ */
+ private Class<? extends SpecificRecord> recordClazz;
+
+ /**
* Avro serialization schema.
*/
- private final Schema schema;
+ private transient Schema schema;
/**
* Writer to serialize Avro record into a byte array.
*/
- private final DatumWriter<GenericRecord> datumWriter;
+ private transient DatumWriter<GenericRecord> datumWriter;
/**
* Output stream to serialize records into byte array.
*/
- private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+ private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
/**
* Low-level class for serialization of Avro values.
*/
- private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+ private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
/**
* Creates a Avro serialization schema for the given schema.
@@ -68,6 +75,7 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
*/
public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+ this.recordClazz = recordClazz;
this.schema = SpecificData.get().getSchema(recordClazz);
this.datumWriter = new SpecificDatumWriter<>(schema);
}
@@ -89,6 +97,19 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
}
}
+ private void writeObject(ObjectOutputStream oos) throws IOException {
+ oos.writeObject(recordClazz);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+ this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+ this.schema = SpecificData.get().getSchema(recordClazz);
+ this.datumWriter = new SpecificDatumWriter<>(schema);
+ this.arrayOutputStream = new ByteArrayOutputStream();
+ this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+ }
+
/**
* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
* Strings are converted into Avro's {@link Utf8} fields.
http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
index d5be274..28f2ed3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.avro.specific.SpecificRecord;
import org.junit.Test;
@@ -121,4 +122,27 @@ public class AvroRowDeSerializationSchemaTest {
assertEquals(testData.f2, actual);
}
+
+ @Test
+ public void testSerializability() throws IOException, ClassNotFoundException {
+ final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+
+ final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
+ final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);
+
+ byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
+ byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig);
+
+ AvroRowSerializationSchema serCopy =
+ InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader());
+ AvroRowDeserializationSchema deserCopy =
+ InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader());
+
+ final byte[] bytes = serCopy.serialize(testData.f2);
+ deserCopy.deserialize(bytes);
+ deserCopy.deserialize(bytes);
+ final Row actual = deserCopy.deserialize(bytes);
+
+ assertEquals(testData.f2, actual);
+ }
}