You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Fernanda de Camargo <na...@ime.usp.br> on 2019/06/18 02:25:00 UTC

Using StreamEndpoint for SQL queries

Hello,

I would like to do some queries and data transformation using SQL.
For this, I am trying to use StreamEndpoint from SQL API to do a simple
query, as shown below:

@Override
public void populateDAG(DAG dag, Configuration conf) {
SQLExecEnvironment env = SQLExecEnvironment.getEnvironment();
Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of(
"RowTime", Date.class,
"id", Integer.class,
"Product", String.class,
"units", Integer.class);

LineByLineFileInputOperator input = dag.addOperator("input", new
LineByLineFileInputOperator());
env.registerTable(conf.get("sqlSchemaInputName"), new
StreamEndpoint(input.output,
fieldMapping));

env.executeSQL(dag, conf.get("sql"));
}

The query inside properties file is  SELECT STREAM * FROM table1 and the
"sqlSchemaInputName" is "table1". However, when executing the program, I am
getting this error:
"java.lang.RuntimeException: Unexpected tuple received. Received class:
class java.lang.String. Expected class: class java.lang.Class".

Also, I've tried to replace LineByLineFileInputOperator by customized
classes in which the output Port is of Object type and I've got the error:
"java.lang.RuntimeException: Unexpected tuple received. Received class:
class java.lang.Class. Expected class: class java.lang.Class".

I don't get it... The documentation says: "StreamEndpoint: This allows us
to connect existing operator output or input ports to the SQL query as a
data source or sink respectively." So, it should work for different sources
if I'm using the output port as StreamEndpoint argument, right?

I do not intend to use Kafka or File Endpoints, I need to use
StreamEndpoint for my application.

Any idea of the reason why this is not working?

Thanks in advance!