You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Fernanda de Camargo <na...@ime.usp.br> on 2019/06/18 02:25:00 UTC
Using StreamEndpoint for SQL queries
Hello,
I would like to do some queries and data transformation using SQL.
For this, I am trying to use StreamEndpoint from SQL API to do a simple
query, as shown below:
@Override
public void populateDAG(DAG dag, Configuration conf) {
SQLExecEnvironment env = SQLExecEnvironment.getEnvironment();
Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of(
"RowTime", Date.class,
"id", Integer.class,
"Product", String.class,
"units", Integer.class);
LineByLineFileInputOperator input = dag.addOperator("input", new
LineByLineFileInputOperator());
env.registerTable(conf.get("sqlSchemaInputName"), new
StreamEndpoint(input.output,
fieldMapping));
env.executeSQL(dag, conf.get("sql"));
}
The query inside properties file is SELECT STREAM * FROM table1 and the
"sqlSchemaInputName" is "table1". However, when executing the program, I am
getting this error:
"java.lang.RuntimeException: Unexpected tuple received. Received class:
class java.lang.String. Expected class: class java.lang.Class".
Also, I've tried to replace LineByLineFileInputOperator by customized
classes in which the output Port is of Object type and I've got the error:
"java.lang.RuntimeException: Unexpected tuple received. Received class:
class java.lang.Class. Expected class: class java.lang.Class".
I don't get it... The documentation says: "StreamEndpoint: This allows us
to connect existing operator output or input ports to the SQL query as a
data source or sink respectively." So, it should work for different sources
if I'm using the output port as StreamEndpoint argument, right?
I do not intend to use Kafka or File Endpoints, I need to use
StreamEndpoint for my application.
Any idea of the reason why this is not working?
Thanks in advance!