You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/03 08:43:20 UTC

[1/3] flink git commit: [FLINK-6226] [table] Add tests for UDFs with Byte, Short, and Float arguments.

Repository: flink
Updated Branches:
  refs/heads/master 37df826e4 -> 16b088218


[FLINK-6226] [table] Add tests for UDFs with Byte, Short, and Float arguments.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e118d1d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e118d1d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e118d1d

Branch: refs/heads/master
Commit: 6e118d1dc97b3a8c0b013d2002fad80219751253
Parents: 37df826
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Nov 2 21:10:03 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Nov 2 23:10:09 2017 +0100

----------------------------------------------------------------------
 .../UserDefinedScalarFunctionTest.scala         | 28 ++++++++++++++++++--
 .../utils/userDefinedScalarFunctions.scala      |  6 +++++
 .../runtime/batch/table/CorrelateITCase.scala   | 23 +++++++++++++++-
 .../table/utils/UserDefinedTableFunctions.scala | 12 ++++++++-
 4 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e118d1d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index 71ff70d..a3b2f07 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -48,6 +48,24 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
       "43")
 
     testAllApis(
+      Func1('f11),
+      "Func1(f11)",
+      "Func1(f11)",
+      "4")
+
+    testAllApis(
+      Func1('f12),
+      "Func1(f12)",
+      "Func1(f12)",
+      "4")
+
+    testAllApis(
+      Func1('f13),
+      "Func1(f13)",
+      "Func1(f13)",
+      "4.0")
+
+    testAllApis(
       Func2('f0, 'f1, 'f3),
       "Func2(f0, f1, f3)",
       "Func2(f0, f1, f3)",
@@ -360,7 +378,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   override def testData: Any = {
-    val testData = new Row(11)
+    val testData = new Row(14)
     testData.setField(0, 42)
     testData.setField(1, "Test")
     testData.setField(2, null)
@@ -372,6 +390,9 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
     testData.setField(8, 1000L)
     testData.setField(9, Seq("Hello", "World"))
     testData.setField(10, Array[Integer](1, 2, null))
+    testData.setField(11, 3.toByte)
+    testData.setField(12, 3.toShort)
+    testData.setField(13, 3.toFloat)
     testData
   }
 
@@ -387,7 +408,10 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
       Types.INTERVAL_MONTHS,
       Types.INTERVAL_MILLIS,
       TypeInformation.of(classOf[Seq[String]]),
-      BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+      BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO,
+      Types.BYTE,
+      Types.SHORT,
+      Types.FLOAT
     ).asInstanceOf[TypeInformation[Any]]
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e118d1d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 5285569..9535cdf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -41,6 +41,12 @@ object Func1 extends ScalarFunction {
   def eval(index: Integer): Integer = {
     index + 1
   }
+
+  def eval(b: Byte): Byte = (b + 1).toByte
+
+  def eval(s: Short): Short = (s + 1).toShort
+
+  def eval(f: Float): Float = f + 1
 }
 
 object Func2 extends ScalarFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e118d1d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index b109752..79243dd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
 import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc0
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.{Func1, Func13, Func18, RichFunc2}
@@ -231,6 +231,27 @@ class CorrelateITCase(
   }
 
   @Test
+  def testByteShortFloatArguments(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+    val tFunc = new TableFunc4
+
+    val result = in
+      .select('a.cast(Types.BYTE) as 'a, 'a.cast(Types.SHORT) as 'b, 'b.cast(Types.FLOAT) as 'c)
+      .join(tFunc('a, 'b, 'c) as ('a2, 'b2, 'c2))
+      .toDataSet[Row]
+
+    val results = result.collect()
+    val expected = Seq(
+      "1,1,1.0,Byte=1,Short=1,Float=1.0",
+      "2,2,2.0,Byte=2,Short=2,Float=2.0",
+      "3,3,2.0,Byte=3,Short=3,Float=2.0",
+      "4,4,3.0,Byte=4,Short=4,Float=3.0").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testUserDefinedTableFunctionWithParameter(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/6e118d1d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
index d0ffade..e1af23b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
@@ -22,7 +22,7 @@ import java.lang.Boolean
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.Tuple3
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.functions.{FunctionContext, TableFunction}
 import org.apache.flink.types.Row
 import org.junit.Assert
@@ -109,6 +109,16 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
   }
 }
 
+class TableFunc4 extends TableFunction[Row] {
+  def eval(b: Byte, s: Short, f: Float): Unit = {
+    collect(Row.of("Byte=" + b, "Short=" + s, "Float=" + f))
+  }
+
+  override def getResultType: TypeInformation[Row] = {
+    new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING)
+  }
+}
+
 class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
   def eval(user: String) {
     if (user.contains("#")) {


[3/3] flink git commit: [FLINK-7338] [table] Fix retrieval of OVER window lower bound.

Posted by fh...@apache.org.
[FLINK-7338] [table] Fix retrieval of OVER window lower bound.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16b08821
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16b08821
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16b08821

Branch: refs/heads/master
Commit: 16b088218435dc64848ad641a383f9ce808f07c2
Parents: 78c8ea2
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Nov 2 23:09:36 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Nov 3 00:01:44 2017 +0100

----------------------------------------------------------------------
 .../flink/table/plan/nodes/OverAggregate.scala  |  21 ++--
 .../runtime/stream/sql/OverWindowITCase.scala   | 119 ++++++++++++-------
 2 files changed, 85 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16b08821/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
index 87ebd86..f9bf803 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
@@ -35,8 +35,8 @@ trait OverAggregate {
   }
 
   private[flink] def orderingToString(
-    inputType: RelDataType,
-    orderFields: java.util.List[RelFieldCollation]): String = {
+      inputType: RelDataType,
+      orderFields: java.util.List[RelFieldCollation]): String = {
 
     val inFields = inputType.getFieldList.asScala
 
@@ -48,9 +48,9 @@ trait OverAggregate {
   }
 
   private[flink] def windowRange(
-    logicWindow: Window,
-    overWindow: Group,
-    input: RelNode): String = {
+      logicWindow: Window,
+      overWindow: Group,
+      input: RelNode): String = {
     if (overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded) {
       s"BETWEEN ${getLowerBoundary(logicWindow, overWindow, input)} PRECEDING " +
           s"AND ${overWindow.upperBound}"
@@ -63,8 +63,7 @@ trait OverAggregate {
       inputType: RelDataType,
       constants: Seq[RexLiteral],
       rowType: RelDataType,
-      namedAggregates: Seq[CalcitePair[AggregateCall, String]])
-    : String = {
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = {
 
     val inFields = inputType.getFieldNames.asScala
     val outFields = rowType.getFieldNames.asScala
@@ -97,12 +96,12 @@ trait OverAggregate {
   }
 
   private[flink] def getLowerBoundary(
-    logicWindow: Window,
-    overWindow: Group,
-    input: RelNode): Long = {
+      logicWindow: Window,
+      overWindow: Group,
+      input: RelNode): Long = {
 
     val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
-    val lowerBoundIndex = input.getRowType.getFieldCount - ref.getIndex
+    val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
     val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
     lowerBound match {
       case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()

http://git-wip-us.apache.org/repos/asf/flink/blob/16b08821/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
index 4884513..9bfdc4c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.stream.sql
 
 import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.java.tuple.Tuple1
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -28,6 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -293,13 +295,16 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
+    tEnv.registerFunction("LTCNT", new LargerThanCount)
 
     val sqlQuery = "SELECT " +
       "  c, b, " +
+      "  LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
       "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
       "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
       " FROM T1"
 
     val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
@@ -307,16 +312,17 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     env.execute()
 
     val expected = List(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
+      "Hello,1,0,1,1", "Hello,15,0,2,2", "Hello,16,0,3,3",
+      "Hello,2,0,6,9", "Hello,3,0,6,9", "Hello,2,0,6,9",
+      "Hello,3,0,4,9",
+      "Hello,4,0,2,7",
+      "Hello,5,1,2,9",
+      "Hello,6,2,2,11", "Hello,65,2,2,12",
+      "Hello,9,2,2,12", "Hello,9,2,2,12", "Hello,18,3,3,18",
+      "Hello World,7,1,1,7", "Hello World,17,3,3,21", "Hello World,77,3,3,21",
+      "Hello World,18,1,1,7",
+      "Hello World,8,2,2,15",
+      "Hello World,20,1,1,20")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -354,9 +360,12 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
+    tEnv.registerFunction("LTCNT", new LargerThanCount)
 
     val sqlQuery = "SELECT " +
       " c, a, " +
+      "  LTCNT(a, CAST('4' AS BIGINT)) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
       "  COUNT(a) " +
       "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
       "  SUM(a) " +
@@ -368,12 +377,12 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     env.execute()
 
     val expected = List(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15",
-      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
-      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+      "Hello,1,0,1,1", "Hello,1,0,2,2", "Hello,1,0,3,3",
+      "Hello,2,0,3,4", "Hello,2,0,3,5", "Hello,2,0,3,6",
+      "Hello,3,0,3,7", "Hello,4,0,3,9", "Hello,5,1,3,12",
+      "Hello,6,2,3,15",
+      "Hello World,7,1,1,7", "Hello World,7,2,2,14", "Hello World,7,3,3,21",
+      "Hello World,7,3,3,21", "Hello World,8,3,3,22", "Hello World,20,3,3,35")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -518,6 +527,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     StreamITCase.clear
 
     val sqlQuery = "SELECT a, b, c, " +
+      "  LTCNT(b, CAST('4' AS BIGINT)) OVER(" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
       "  SUM(b) OVER (" +
       "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
       "  COUNT(b) OVER (" +
@@ -552,25 +563,26 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
+    tEnv.registerFunction("LTCNT", new LargerThanCount)
 
     val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = List(
-      "1,1,Hello,6,3,2,3,1",
-      "1,2,Hello,6,3,2,3,1",
-      "1,3,Hello world,6,3,2,3,1",
-      "1,1,Hi,7,4,1,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,6,3,2,3,1",
-      "2,3,Hello world,6,3,2,3,1",
-      "1,4,Hello world,11,5,2,4,1",
-      "1,5,Hello world,29,8,3,7,1",
-      "1,6,Hello world,29,8,3,7,1",
-      "1,7,Hello world,29,8,3,7,1",
-      "2,4,Hello world,15,5,3,5,1",
-      "2,5,Hello world,15,5,3,5,1")
+      "1,1,Hello,0,6,3,2,3,1",
+      "1,2,Hello,0,6,3,2,3,1",
+      "1,3,Hello world,0,6,3,2,3,1",
+      "1,1,Hi,0,7,4,1,3,1",
+      "2,1,Hello,0,1,1,1,1,1",
+      "2,2,Hello world,0,6,3,2,3,1",
+      "2,3,Hello world,0,6,3,2,3,1",
+      "1,4,Hello world,0,11,5,2,4,1",
+      "1,5,Hello world,3,29,8,3,7,1",
+      "1,6,Hello world,3,29,8,3,7,1",
+      "1,7,Hello world,3,29,8,3,7,1",
+      "2,4,Hello world,1,15,5,3,5,1",
+      "2,5,Hello world,1,15,5,3,5,1")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -583,6 +595,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     StreamITCase.testResults = mutable.MutableList()
 
     val sqlQuery = "SELECT a, b, c, " +
+      "LTCNT(b, CAST('4' AS BIGINT)) over(" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "SUM(b) over (" +
       "partition by a order by rowtime rows between unbounded preceding and current row), " +
       "count(b) over (" +
@@ -618,26 +632,27 @@ class OverWindowITCase extends StreamingWithStateTestBase {
              .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.registerTable("T1", t1)
+    tEnv.registerFunction("LTCNT", new LargerThanCount)
 
     val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
-      "1,2,Hello,2,1,2,2,2",
-      "1,3,Hello world,5,2,2,3,2",
-      "1,1,Hi,6,3,2,3,1",
-      "2,1,Hello,1,1,1,1,1",
-      "2,2,Hello world,3,2,1,2,1",
-      "3,1,Hello,1,1,1,1,1",
-      "3,2,Hello world,3,2,1,2,1",
-      "1,5,Hello world,11,4,2,5,1",
-      "1,6,Hello world,17,5,3,6,1",
-      "1,9,Hello world,26,6,4,9,1",
-      "1,8,Hello world,34,7,4,9,1",
-      "1,7,Hello world,41,8,5,9,1",
-      "2,5,Hello world,8,3,2,5,1",
-      "3,5,Hello world,8,3,2,5,1")
+      "1,2,Hello,0,2,1,2,2,2",
+      "1,3,Hello world,0,5,2,2,3,2",
+      "1,1,Hi,0,6,3,2,3,1",
+      "2,1,Hello,0,1,1,1,1,1",
+      "2,2,Hello world,0,3,2,1,2,1",
+      "3,1,Hello,0,1,1,1,1,1",
+      "3,2,Hello world,0,3,2,1,2,1",
+      "1,5,Hello world,1,11,4,2,5,1",
+      "1,6,Hello world,2,17,5,3,6,1",
+      "1,9,Hello world,3,26,6,4,9,1",
+      "1,8,Hello world,4,34,7,4,9,1",
+      "1,7,Hello world,5,41,8,5,9,1",
+      "2,5,Hello world,1,8,3,2,5,1",
+      "3,5,Hello world,1,8,3,2,5,1")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -852,3 +867,19 @@ object OverWindowITCase {
     override def cancel(): Unit = ???
   }
 }
+
+/** Counts how often the first argument was larger than the second argument. */
+class LargerThanCount extends AggregateFunction[Long, Tuple1[Long]] {
+
+  def accumulate(acc: Tuple1[Long], a: Long, b: Long): Unit = {
+    if (a > b) acc.f0 += 1
+  }
+
+  def retract(acc: Tuple1[Long], a: Long, b: Long): Unit = {
+    if (a > b) acc.f0 -= 1
+  }
+
+  override def createAccumulator(): Tuple1[Long] = Tuple1.of(0L)
+
+  override def getValue(acc: Tuple1[Long]): Long = acc.f0
+}


[2/3] flink git commit: [FLINK-7421] [table] Fix serializability of AvroRowSerializationSchema + AvroRowDeserializationSchema.

Posted by fh...@apache.org.
[FLINK-7421] [table] Fix serializability of AvroRowSerializationSchema + AvroRowDeserializationSchema.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78c8ea20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78c8ea20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78c8ea20

Branch: refs/heads/master
Commit: 78c8ea2085439b0d4fe0d7187044b44b6f110aff
Parents: 6e118d1
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Nov 2 21:40:26 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Nov 3 00:01:40 2017 +0100

----------------------------------------------------------------------
 .../AvroRowDeserializationSchema.java           | 30 +++++++++++++++++---
 .../AvroRowSerializationSchema.java             | 29 ++++++++++++++++---
 .../kafka/AvroRowDeSerializationSchemaTest.java | 24 ++++++++++++++++
 3 files changed, 75 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
index 0713738..c7f1d82 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
@@ -32,6 +32,8 @@ import org.apache.avro.util.Utf8;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.List;
 
 /**
@@ -44,24 +46,29 @@ import java.util.List;
 public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
 
 	/**
+	 * Avro record class.
+	 */
+	private Class<? extends SpecificRecord> recordClazz;
+
+	/**
 	 * Schema for deterministic field order.
 	 */
-	private final Schema schema;
+	private transient Schema schema;
 
 	/**
 	 * Reader that deserializes byte array into a record.
 	 */
-	private final DatumReader<SpecificRecord> datumReader;
+	private transient DatumReader<SpecificRecord> datumReader;
 
 	/**
 	 * Input stream to read message from.
 	 */
-	private final MutableByteArrayInputStream inputStream;
+	private transient MutableByteArrayInputStream inputStream;
 
 	/**
 	 * Avro decoder that decodes binary data.
 	 */
-	private final Decoder decoder;
+	private transient Decoder decoder;
 
 	/**
 	 * Record to deserialize byte array to.
@@ -75,6 +82,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 	 */
 	public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
 		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+		this.recordClazz = recordClazz;
 		this.schema = SpecificData.get().getSchema(recordClazz);
 		this.datumReader = new SpecificDatumReader<>(schema);
 		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
@@ -97,6 +105,20 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		return (Row) row;
 	}
 
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		oos.writeObject(recordClazz);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.datumReader = new SpecificDatumReader<>(schema);
+		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
 	/**
 	 * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
 	 * Avro's {@link Utf8} fields are converted into regular Java strings.

http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
index 450c78f..09acc6a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
@@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.List;
 
 /**
@@ -42,24 +44,29 @@ import java.util.List;
 public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 
 	/**
+	 * Avro record class.
+	 */
+	private Class<? extends SpecificRecord> recordClazz;
+
+	/**
 	 * Avro serialization schema.
 	 */
-	private final Schema schema;
+	private transient Schema schema;
 
 	/**
 	 * Writer to serialize Avro record into a byte array.
 	 */
-	private final DatumWriter<GenericRecord> datumWriter;
+	private transient DatumWriter<GenericRecord> datumWriter;
 
 	/**
 	 * Output stream to serialize records into byte array.
 	 */
-	private final ByteArrayOutputStream arrayOutputStream =  new ByteArrayOutputStream();
+	private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
 
 	/**
 	 * Low-level class for serialization of Avro values.
 	 */
-	private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
 
 	/**
 	 * Creates a Avro serialization schema for the given schema.
@@ -68,6 +75,7 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 	 */
 	public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
 		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+		this.recordClazz = recordClazz;
 		this.schema = SpecificData.get().getSchema(recordClazz);
 		this.datumWriter = new SpecificDatumWriter<>(schema);
 	}
@@ -89,6 +97,19 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 		}
 	}
 
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		oos.writeObject(recordClazz);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.datumWriter = new SpecificDatumWriter<>(schema);
+		this.arrayOutputStream = new ByteArrayOutputStream();
+		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	}
+
 	/**
 	 * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
 	 * Strings are converted into Avro's {@link Utf8} fields.

http://git-wip-us.apache.org/repos/asf/flink/blob/78c8ea20/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
index d5be274..28f2ed3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.avro.specific.SpecificRecord;
 import org.junit.Test;
@@ -121,4 +122,27 @@ public class AvroRowDeSerializationSchemaTest {
 
 		assertEquals(testData.f2, actual);
 	}
+
+	@Test
+	public void testSerializability() throws IOException, ClassNotFoundException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+
+		final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
+		final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);
+
+		byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
+		byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig);
+
+		AvroRowSerializationSchema serCopy =
+			InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader());
+		AvroRowDeserializationSchema deserCopy =
+			InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader());
+
+		final byte[] bytes = serCopy.serialize(testData.f2);
+		deserCopy.deserialize(bytes);
+		deserCopy.deserialize(bytes);
+		final Row actual = deserCopy.deserialize(bytes);
+
+		assertEquals(testData.f2, actual);
+	}
 }