You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Mark You (JIRA)" <ji...@apache.org> on 2017/06/12 10:19:00 UTC
[jira] [Updated] (FLINK-6896) Creating a table from a POJO and use
table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mark You updated FLINK-6896:
----------------------------
Attachment: debug.png
Here is debug information
> Creating a table from a POJO and use table sink to output fail
> --------------------------------------------------------------
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.0
> Reporter: Mark You
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List<Content> data = new ArrayList<Content>();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream<Content> stream = env.fromCollection(data);
> DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {
> /**
> *
> */
> private static final long serialVersionUID = 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, urlKey")
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum ");
> //table.printSchema();
> TableSink<Row> windowSink = new CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
> *
> */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();
> this.recordTime = recordTime;
> this.urlKey = urlKey;
> }
> public String getUrlKey() {
> return urlKey;
> }
> public void setUrlKey(String urlKey) {
> this.urlKey = urlKey;
> }
> public long getRecordTime() {
> return recordTime;
> }
> public void setRecordTime(long recordTime) {
> this.recordTime = recordTime;
> }
> public long getHttpGetMessageCount() {
> return httpGetMessageCount;
> }
> public void setHttpGetMessageCount(long httpGetMessageCount) {
> this.httpGetMessageCount = httpGetMessageCount;
> }
> public long getHttpPostMessageCount() {
> return httpPostMessageCount;
> }
> public void setHttpPostMessageCount(long httpPostMessageCount) {
> this.httpPostMessageCount = httpPostMessageCount;
> }
> public long getUplink() {
> return uplink;
> }
> public void setUplink(long uplink) {
> this.uplink = uplink;
> }
> public long getDownlink() {
> return downlink;
> }
> public void setDownlink(long downlink) {
> this.downlink = downlink;
> }
> public long getStatusCode() {
> return statusCode;
> }
> public void setStatusCode(long statusCode) {
> this.statusCode = statusCode;
> }
> public long getStatusCodeCount() {
> return statusCodeCount;
> }
> public void setStatusCodeCount(long statusCodeCount) {
> this.statusCodeCount = statusCodeCount;
> }
> }
> private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> {
> /**
> *
> */
> private static final long serialVersionUID = 1L;
> @Override
> public long extractTimestamp(Object[] element, long previousElementTimestamp) {
> // TODO Auto-generated method stub
> return (long) element[0];
> }
> @Override
> public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) {
> return new Watermark(extractedTimestamp);
> }
> }
> }
> {code}
> Exception trace
> {code}
> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 6
> at org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateFieldAccess(CodeGenerator.scala:1661)
> at org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateInputAccess(CodeGenerator.scala:1599)
> at org.apache.flink.table.codegen.CodeGenerator$$anonfun$26.apply(CodeGenerator.scala:875)
> at org.apache.flink.table.codegen.CodeGenerator$$anonfun$26.apply(CodeGenerator.scala:874)
> at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:728)
> at scala.collection.immutable.Range.foreach(Range.scala:166)
> at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:727)
> at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:874)
> at org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)
> at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.generatedConversionFunction(DataStreamScan.scala:36)
> at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)
> at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:36)
> at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:63)
> at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:94)
> at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:119)
> at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:94)
> at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:678)
> at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:637)
> at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:214)
> at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
> at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
> at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:66)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)