You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/12/01 16:48:48 UTC

flink git commit: [FLINK-8173] [table] Fix input unboxing and support Avro Utf8

Repository: flink
Updated Branches:
  refs/heads/master deea4b32b -> 6aced1251


[FLINK-8173] [table] Fix input unboxing and support Avro Utf8

This closes #5111.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6aced125
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6aced125
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6aced125

Branch: refs/heads/master
Commit: 6aced1251878a4a98f66732d14336f3c86ec9d98
Parents: deea4b3
Author: twalthr <tw...@apache.org>
Authored: Fri Dec 1 15:37:23 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Fri Dec 1 17:43:50 2017 +0100

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |  17 ++
 .../flink/table/codegen/CodeGenerator.scala     |   6 +-
 .../table/runtime/batch/AvroTypesITCase.scala   | 188 +++++++++++++++++++
 .../table/runtime/harness/JoinHarnessTest.scala |   1 -
 4 files changed, 207 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6aced125/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index fdd6df3..374ea06 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -157,6 +157,7 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
@@ -164,6 +165,7 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
@@ -171,6 +173,21 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/6aced125/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index d15c973..b253a27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -1220,10 +1220,8 @@ abstract class CodeGenerator(
       fieldType: TypeInformation[_],
       fieldTerm: String)
     : GeneratedExpression = {
-    val tmpTerm = newName("tmp")
     val resultTerm = newName("result")
     val nullTerm = newName("isNull")
-    val tmpTypeTerm = boxedTypeTermForTypeInfo(fieldType)
     val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
     val defaultValue = primitiveDefaultValue(fieldType)
 
@@ -1254,12 +1252,12 @@ abstract class CodeGenerator(
         |  $resultTerm = $defaultValue;
         |}
         |else {
-        |  $resultTerm = $unboxedFieldCode;
+        |  $resultTerm = ($resultTypeTerm) $unboxedFieldCode;
         |}
         |""".stripMargin
     } else {
       s"""
-        |$resultTypeTerm $resultTerm = $unboxedFieldCode;
+        |$resultTypeTerm $resultTerm = ($resultTypeTerm) $unboxedFieldCode;
         |""".stripMargin
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6aced125/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
new file mode 100644
index 0000000..1983554
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch
+
+import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.util
+import java.util.Collections
+
+import org.apache.avro.util.Utf8
+import org.apache.flink.api.scala._
+import org.apache.flink.formats.avro.generated.{Address, Colors, Fixed16, User}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Tests for interoperability with Avro types.
+  */
+@RunWith(classOf[Parameterized])
+class AvroTypesITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsClusterTestBase(mode, configMode) {
+
+  @Test
+  def testAvroToRow(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = testData(env).toTable(tEnv)
+
+    val result = t.select('*)
+
+    val results = result.toDataSet[Row].collect()
+    val expected = "black,null,Whatever,[true],[hello],true,0.0,GREEN," +
+      "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,null\n" +
+      "blue,null,Charlie,[],[],false,1.337,RED," +
+      "null,1337,{},{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", " +
+      "\"state\": \"Berlin\", \"zip\": \"12049\"},null,null,null\n" +
+      "yellow,null,Terminator,[false],[world],false,0.0,GREEN," +
+      "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,null"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAvroStringAccess(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = testData(env).toTable(tEnv)
+
+    val result = t.select('name)
+    val results = result.toDataSet[Utf8].collect()
+    val expected = "Charlie\n" +
+      "Terminator\n" +
+      "Whatever"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAvroObjectAccess(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = testData(env).toTable(tEnv)
+
+    val result = t
+      .filter('type_nested.isNotNull)
+      .select('type_nested.flatten()).as('city, 'num, 'state, 'street, 'zip)
+
+    val results = result.toDataSet[Address].collect()
+    val expected = AvroTypesITCase.USER_1.getTypeNested.toString
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAvroToAvro(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = testData(env).toTable(tEnv)
+
+    val result = t.select('*).toDataSet[User].collect()
+    val expected = AvroTypesITCase.USER_1 + "\n" +
+      AvroTypesITCase.USER_2 + "\n" +
+      AvroTypesITCase.USER_3
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  private def testData(env: ExecutionEnvironment): DataSet[User] = {
+
+    val data = new mutable.MutableList[User]
+
+    data.+=(AvroTypesITCase.USER_1)
+    data.+=(AvroTypesITCase.USER_2)
+    data.+=(AvroTypesITCase.USER_3)
+
+    env.fromCollection(data)
+  }
+}
+
+object AvroTypesITCase {
+
+  val USER_1: User = User.newBuilder()
+    .setName("Charlie")
+    .setFavoriteColor("blue")
+    .setFavoriteNumber(null)
+    .setTypeBoolTest(false)
+    .setTypeDoubleTest(1.337d)
+    .setTypeNullTest(null)
+    .setTypeLongTest(1337L)
+    .setTypeArrayString(new util.ArrayList[CharSequence])
+    .setTypeArrayBoolean(new util.ArrayList[JBoolean]())
+    .setTypeNullableArray(null)
+    .setTypeEnum(Colors.RED)
+    .setTypeMap(new util.HashMap[CharSequence, JLong])
+    .setTypeFixed(null)
+    .setTypeUnion(null)
+    .setTypeNested(
+      Address.newBuilder
+      .setNum(42)
+      .setStreet("Bakerstreet")
+      .setCity("Berlin")
+      .setState("Berlin")
+      .setZip("12049")
+      .build)
+    .build
+
+  val USER_2: User = User.newBuilder()
+    .setName("Whatever")
+    .setFavoriteNumber(null)
+    .setFavoriteColor("black")
+    .setTypeLongTest(42L)
+    .setTypeDoubleTest(0.0)
+    .setTypeNullTest(null)
+    .setTypeBoolTest(true)
+    .setTypeArrayString(Collections.singletonList("hello"))
+    .setTypeArrayBoolean(Collections.singletonList(true))
+    .setTypeEnum(Colors.GREEN)
+    .setTypeMap(new util.HashMap[CharSequence, JLong])
+    .setTypeFixed(new Fixed16())
+    .setTypeUnion(null)
+    .setTypeNested(null)
+    .build()
+
+  val USER_3: User = User.newBuilder()
+    .setName("Terminator")
+    .setFavoriteNumber(null)
+    .setFavoriteColor("yellow")
+    .setTypeLongTest(1L)
+    .setTypeDoubleTest(0.0)
+    .setTypeNullTest(null)
+    .setTypeBoolTest(false)
+    .setTypeArrayString(Collections.singletonList("world"))
+    .setTypeArrayBoolean(Collections.singletonList(false))
+    .setTypeEnum(Colors.GREEN)
+    .setTypeMap(new util.HashMap[CharSequence, JLong])
+    .setTypeFixed(new Fixed16())
+    .setTypeUnion(null)
+    .setTypeNested(null)
+    .build()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6aced125/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index 43397ae..facdbd4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -334,7 +334,6 @@ class JoinHarnessTest extends HarnessTestBase {
     expectedOutput.add(new Watermark(41))
 
     val result = testHarness.getOutput
-    println(result)
     verify(
       expectedOutput,
       result,