You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by madan <ma...@gmail.com> on 2017/12/13 15:16:18 UTC
ClassCastException when using RowTypeInfo
Hi,
Below is sample code I am trying with,
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();
TypeInformation[] types = new TypeInformation[]
{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
String[] fieldNames = new String[]{"id", "name", "salary", "department"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
env.registerType(RowTypeInfo.class);
env.addSource(new EmployeeSourceFunction(), "samplesource", rowTypeInfo)
.keyBy("department").sum("salary").addSink(new PrintSinkFunction<>());
public class EmployeeSourceFunction implements SourceFunction<Row> {
private boolean continueRead = true;
@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (continueRead) {
for (int i = 0; i < 3 && continueRead; i++) {
Row row = new Row(4);
row.setField(0, Integer.valueOf(i));
row.setField(1, String.valueOf("user" + i));
row.setField(2, 1000 * i);
row.setField(3, "DEV");
ctx.collect(row);
}
continueRead = false;
}
}
@Override
public void cancel() {
continueRead = false;
}
}
And I am getting below exception
java.lang.ClassCastException:
org.apache.flink.api.java.typeutils.RowTypeInfo cannot be cast to
org.apache.flink.api.java.typeutils.TupleTypeInfo
at
org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:167)
I have checked FieldAccessorFactory.java:167,
if (typeInfo.isTupleType()) {
TupleTypeInfo tupleTypeInfo = (TupleTypeInfo) typeInfo;
RowTypeInfo returns 'true' for isTupleType() and cannot be casted.
Can someone please tell me, Is it that I have done wrong configuration or
bug in code ?
--
Thank you,
Madan.
Re: ClassCastException when using RowTypeInfo
Posted by Timo Walther <tw...@apache.org>.
Hi Madan,
this is definitely a bug. The Row type has mostly been added for the
Table & SQL API and has not tested for expression keys. But in general I
would use a tuple in your case as they are more efficient. The
`registerType` is only necessary for generic types serialized with Kryo.
I opened https://issues.apache.org/jira/browse/FLINK-8255. If you would
like to fix it, I can assign it to you.
Thanks.
Regards,
Timo
Am 12/13/17 um 4:16 PM schrieb madan:
> Hi,
>
> Below is sample code I am trying with,
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> TypeInformation[] types =new TypeInformation[] {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
>
> String[] fieldNames =new String[]{"id","name","salary","department"};
> RowTypeInfo rowTypeInfo =new RowTypeInfo(types, fieldNames);
> env.registerType(RowTypeInfo.class);
>
> env.addSource(new EmployeeSourceFunction(),"samplesource", rowTypeInfo)
> .keyBy("department").sum("salary").addSink(new PrintSinkFunction<>());
>
> public class EmployeeSourceFunctionimplements SourceFunction<Row> {
> private boolean continueRead =true;
>
> @Override public void run(SourceContext<Row> ctx)throws Exception {
> while (continueRead) {
> for (int i =0; i <3 &&continueRead; i++) {
> Row row =new Row(4);
> row.setField(0, Integer.valueOf(i));
> row.setField(1, String.valueOf("user" + i));
> row.setField(2,1000 * i);
> row.setField(3,"DEV");
> ctx.collect(row);
> }
> continueRead =false;
> }
> }
>
> @Override public void cancel() {
> continueRead =false;
> }
> }
>
> And I am getting below exception
>
> java.lang.ClassCastException:
> org.apache.flink.api.java.typeutils.RowTypeInfo cannot be cast to
> org.apache.flink.api.java.typeutils.TupleTypeInfo
> at
> org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:167)
>
>
> I have checked FieldAccessorFactory.java:167,
> if (typeInfo.isTupleType()) {
> TupleTypeInfo tupleTypeInfo = (TupleTypeInfo) typeInfo;
> RowTypeInfo returns 'true' for isTupleType() and cannot be casted.
>
>
> Can someone please tell me, Is it that I have done wrong configuration
> or bug in code ?
>
>
> --
> Thank you,
> Madan.