You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Anton Solovev (JIRA)" <ji...@apache.org> on 2017/01/20 15:14:26 UTC
[jira] [Assigned] (FLINK-5592) Wrong number of RowSerializers with
nested Rows in Collection mode
[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anton Solovev reassigned FLINK-5592:
------------------------------------
Assignee: (was: Anton Solovev)
> Wrong number of RowSerializers with nested Rows in Collection mode
> ------------------------------------------------------------------
>
> Key: FLINK-5592
> URL: https://issues.apache.org/jira/browse/FLINK-5592
> Project: Flink
> Issue Type: Bug
> Reporter: Anton Solovev
>
> {code}
> @Test
> def testNestedRowTypes(): Unit = {
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> tEnv.registerTableSource("rows", new MockSource)
> val table: Table = tEnv.scan("rows")
> val nestedTable: Table = tEnv.scan("rows").select('person)
> val collect: Seq[Row] = nestedTable.collect()
> print(collect)
> }
> class MockSource extends BatchTableSource[Row] {
> import org.apache.flink.api.java.ExecutionEnvironment
> import org.apache.flink.api.java.DataSet
> override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
> val data = List(
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
> Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")))
> execEnv.fromCollection(data.asJava, getReturnType)
> }
> override def getReturnType: TypeInformation[Row] = {
> new RowTypeInfo(
> Array[TypeInformation[_]](
> new RowTypeInfo(
> Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
> Array("name", "age"))),
> Array("person")
> )
> }
> }
> {code}
> throws {{java.lang.RuntimeException: Row arity of from does not match serializers}}
> stacktrace
> {code}
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36)
> at org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234)
> at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218)
> at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154)
> at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
> at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181)
> at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157)
> at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
> at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114)
> at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
> at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
> at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
> at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672)
> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)