You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timo Walther <tw...@apache.org> on 2017/10/29 05:37:33 UTC

Re: Help on RowTypeInfo?

Hi,

the translate() method is an internal method. You can use 
"toRetractStream(table, Row.class)" or "toAppendStream(table, 
Row.class)" to convert you table into a stream. Make sure to use the 
correct StreamTableEnvironment for your API: 
org.apache.flink.table.api.java.StreamTableEnvironment

Regards,
Timo

Am 10/29/17 um 5:53 AM schrieb PaulWu:
> Please help how to "translate" table to DataStream in the fellowing code.
>
> StreamTableEnvironment ste =
> StreamTableEnvironment.getTableEnvironment(EXE_ENV);
>          ste.registerDataStreamInternal("abc", stream);
>          Table ts = ste.sql("select * from abc");
>          ts = ts.as("count,word");
>          System.out.println("ts=" + ts.getSchema());
>          ts.printSchema();
>          String[] names = new String[]{"count", "word"};
>          TypeInformation[] types = new TypeInformation[]{Types.STRING,
> Types.STRING};
>
>          RowTypeInfo tpe = Types.ROW(types);
>          DataStream<Row> ds = ste.translate(ts, ste.queryConfig(), true,
> true, tpe);
>          ds.print();
>
> It throws an exception:
> Exception in thread "main" scala.MatchError: Row(f0: String, f1: String) (of
> class org.apache.flink.api.java.typeutils.RowTypeInfo)
> 	at
> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapperWithChanges(StreamTableEnvironment.scala:293)
> 	at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:679)
> 	at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:645)
> 	at
> com.att.ariso.ReadFromKafkaGASFPBTable.main(ReadFromKafkaGASFPBTable.java:127)
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Help on RowTypeInfo?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Paul,

The *.scala.StreamTableEnvironment is for Scala programs, the
*.java.StreamTableEnvironment for Java programs and the third is the common
basis of the Scala and Java environment.
TableEnvironment.getTableEnvironment automatically creates the appropriate
TableEnvironment based on the provided ExecuctionEnvironment.

The Table API and SQL are designed to be unified APIs for batch and
streaming and we aim to support the same queries on batch and streaming
tables.

Best, Fabian




2017-10-29 15:39 GMT+01:00 PaulWu <zw...@gmail.com>:

> Sorry for my rant...fairly new. Felt lost. The one (StreamTableEnvironment)
> from java needs to use the constructor:
> StreamTableEnvironment(StreamExecutionEnvironment execEnv, TableConfig
> config) .
>
> Now it works. Thanks. Still confused...why
>
> BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
>
> works? Even it could have some same compiling error sometimes.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Help on RowTypeInfo?

Posted by PaulWu <zw...@gmail.com>.
Sorry for my rant...fairly new. Felt lost. The one (StreamTableEnvironment)
from java needs to use the constructor:
StreamTableEnvironment(StreamExecutionEnvironment execEnv, TableConfig
config) .

Now it works. Thanks. Still confused...why 
 
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

works? Even it could have some same compiling error sometimes. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Help on RowTypeInfo?

Posted by PaulWu <zw...@gmail.com>.
I felt it is so messy... for the api design: Look at this...
StreamTableEnvironment can from three different packages, which I should
choose? I tried each of them and I just have one problem or another. 

//import org.apache.flink.table.api.scala.StreamTableEnvironment;
//import org.apache.flink.table.api.java.StreamTableEnvironment;
//import org.apache.flink.table.api.StreamTableEnvironment;



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Help on RowTypeInfo?

Posted by PaulWu <zw...@gmail.com>.
Where is this method(from which class/object) you mentioned? I can only find 

(new TableConversions(ts)).toRetractStream(TypeInformation.of(Row.class))

I use flink 1.3.2 java api, and weird the compilation error says this method
is not available although I can see it in the api and my ide tip.  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/