You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/12/01 16:48:48 UTC
flink git commit: [FLINK-8173] [table] Fix input unboxing and support
Avro Utf8
Repository: flink
Updated Branches:
refs/heads/master deea4b32b -> 6aced1251
[FLINK-8173] [table] Fix input unboxing and support Avro Utf8
This closes #5111.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6aced125
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6aced125
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6aced125
Branch: refs/heads/master
Commit: 6aced1251878a4a98f66732d14336f3c86ec9d98
Parents: deea4b3
Author: twalthr <tw...@apache.org>
Authored: Fri Dec 1 15:37:23 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Fri Dec 1 17:43:50 2017 +0100
----------------------------------------------------------------------
flink-libraries/flink-table/pom.xml | 17 ++
.../flink/table/codegen/CodeGenerator.scala | 6 +-
.../table/runtime/batch/AvroTypesITCase.scala | 188 +++++++++++++++++++
.../table/runtime/harness/JoinHarnessTest.scala | 1 -
4 files changed, 207 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6aced125/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index fdd6df3..374ea06 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -157,6 +157,7 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
@@ -164,6 +165,7 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
@@ -171,6 +173,21 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/6aced125/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index d15c973..b253a27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -1220,10 +1220,8 @@ abstract class CodeGenerator(
fieldType: TypeInformation[_],
fieldTerm: String)
: GeneratedExpression = {
- val tmpTerm = newName("tmp")
val resultTerm = newName("result")
val nullTerm = newName("isNull")
- val tmpTypeTerm = boxedTypeTermForTypeInfo(fieldType)
val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
val defaultValue = primitiveDefaultValue(fieldType)
@@ -1254,12 +1252,12 @@ abstract class CodeGenerator(
| $resultTerm = $defaultValue;
|}
|else {
- | $resultTerm = $unboxedFieldCode;
+ | $resultTerm = ($resultTypeTerm) $unboxedFieldCode;
|}
|""".stripMargin
} else {
s"""
- |$resultTypeTerm $resultTerm = $unboxedFieldCode;
+ |$resultTypeTerm $resultTerm = ($resultTypeTerm) $unboxedFieldCode;
|""".stripMargin
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6aced125/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
new file mode 100644
index 0000000..1983554
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/AvroTypesITCase.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch
+
+import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.util
+import java.util.Collections
+
+import org.apache.avro.util.Utf8
+import org.apache.flink.api.scala._
+import org.apache.flink.formats.avro.generated.{Address, Colors, Fixed16, User}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * Tests for interoperability with Avro types.
+ */
+@RunWith(classOf[Parameterized])
+class AvroTypesITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsClusterTestBase(mode, configMode) {
+
+ @Test
+ def testAvroToRow(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = testData(env).toTable(tEnv)
+
+ val result = t.select('*)
+
+ val results = result.toDataSet[Row].collect()
+ val expected = "black,null,Whatever,[true],[hello],true,0.0,GREEN," +
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,null\n" +
+ "blue,null,Charlie,[],[],false,1.337,RED," +
+ "null,1337,{},{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", " +
+ "\"state\": \"Berlin\", \"zip\": \"12049\"},null,null,null\n" +
+ "yellow,null,Terminator,[false],[world],false,0.0,GREEN," +
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,null"
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAvroStringAccess(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = testData(env).toTable(tEnv)
+
+ val result = t.select('name)
+ val results = result.toDataSet[Utf8].collect()
+ val expected = "Charlie\n" +
+ "Terminator\n" +
+ "Whatever"
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAvroObjectAccess(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = testData(env).toTable(tEnv)
+
+ val result = t
+ .filter('type_nested.isNotNull)
+ .select('type_nested.flatten()).as('city, 'num, 'state, 'street, 'zip)
+
+ val results = result.toDataSet[Address].collect()
+ val expected = AvroTypesITCase.USER_1.getTypeNested.toString
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAvroToAvro(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = testData(env).toTable(tEnv)
+
+ val result = t.select('*).toDataSet[User].collect()
+ val expected = AvroTypesITCase.USER_1 + "\n" +
+ AvroTypesITCase.USER_2 + "\n" +
+ AvroTypesITCase.USER_3
+ TestBaseUtils.compareResultAsText(result.asJava, expected)
+ }
+
+ private def testData(env: ExecutionEnvironment): DataSet[User] = {
+
+ val data = new mutable.MutableList[User]
+
+ data.+=(AvroTypesITCase.USER_1)
+ data.+=(AvroTypesITCase.USER_2)
+ data.+=(AvroTypesITCase.USER_3)
+
+ env.fromCollection(data)
+ }
+}
+
+object AvroTypesITCase {
+
+ val USER_1: User = User.newBuilder()
+ .setName("Charlie")
+ .setFavoriteColor("blue")
+ .setFavoriteNumber(null)
+ .setTypeBoolTest(false)
+ .setTypeDoubleTest(1.337d)
+ .setTypeNullTest(null)
+ .setTypeLongTest(1337L)
+ .setTypeArrayString(new util.ArrayList[CharSequence])
+ .setTypeArrayBoolean(new util.ArrayList[JBoolean]())
+ .setTypeNullableArray(null)
+ .setTypeEnum(Colors.RED)
+ .setTypeMap(new util.HashMap[CharSequence, JLong])
+ .setTypeFixed(null)
+ .setTypeUnion(null)
+ .setTypeNested(
+ Address.newBuilder
+ .setNum(42)
+ .setStreet("Bakerstreet")
+ .setCity("Berlin")
+ .setState("Berlin")
+ .setZip("12049")
+ .build)
+ .build
+
+ val USER_2: User = User.newBuilder()
+ .setName("Whatever")
+ .setFavoriteNumber(null)
+ .setFavoriteColor("black")
+ .setTypeLongTest(42L)
+ .setTypeDoubleTest(0.0)
+ .setTypeNullTest(null)
+ .setTypeBoolTest(true)
+ .setTypeArrayString(Collections.singletonList("hello"))
+ .setTypeArrayBoolean(Collections.singletonList(true))
+ .setTypeEnum(Colors.GREEN)
+ .setTypeMap(new util.HashMap[CharSequence, JLong])
+ .setTypeFixed(new Fixed16())
+ .setTypeUnion(null)
+ .setTypeNested(null)
+ .build()
+
+ val USER_3: User = User.newBuilder()
+ .setName("Terminator")
+ .setFavoriteNumber(null)
+ .setFavoriteColor("yellow")
+ .setTypeLongTest(1L)
+ .setTypeDoubleTest(0.0)
+ .setTypeNullTest(null)
+ .setTypeBoolTest(false)
+ .setTypeArrayString(Collections.singletonList("world"))
+ .setTypeArrayBoolean(Collections.singletonList(false))
+ .setTypeEnum(Colors.GREEN)
+ .setTypeMap(new util.HashMap[CharSequence, JLong])
+ .setTypeFixed(new Fixed16())
+ .setTypeUnion(null)
+ .setTypeNested(null)
+ .build()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6aced125/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index 43397ae..facdbd4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -334,7 +334,6 @@ class JoinHarnessTest extends HarnessTestBase {
expectedOutput.add(new Watermark(41))
val result = testHarness.getOutput
- println(result)
verify(
expectedOutput,
result,