You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Bistline <sr...@gmail.com> on 2018/11/30 07:23:59 UTC

Flink SQL

Hi,

I have a silly question about Flink SQL that I cannot seem to find a clear
answer to. If I have the following code. Will the "result" from the sql
SELECT statement only return and the data then be written to S3 if and only
if the statement returns data that matches the criteria?

Does "nothing" happen otherwise ( ie no match to the sql statement.)?

tableEnv.registerDataStream("SENSORS",dataset,"t_deviceID, t_timeStamp,
t_sKey, t_sValue");


// TEMEPERATURE
Table result = tableEnv.sql("SELECT 'AlertTEMEPERATURE ',t_sKey,
t_deviceID, t_sValue FROM SENSORS WHERE t_sKey='TEMPERATURE' AND t_sValue >
" + TEMPERATURE_THRESHOLD);
tableEnv.toAppendStream(result, Row.class).print();
// Write to S3 bucket
DataStream<Row> dsRow = tableEnv.toAppendStream(result, Row.class);
String fileNameTemp = sdf.format(new Date());
dsRow.writeAsText("s3://csv-ai/flink-alerts/"+fileNameTemp+"TEMPERATURE.txt"
);

Re: Flink SQL

Posted by Dominik Wosiński <wo...@gmail.com>.
Hey,

Not exactly sure by what you mean by "nothing" but generally the concept
is. The data is fed to the dynamic table and the result of the query
creates another dynamic table. So, if the resulting query returns an empty
table, no data will indeed be written to the S3. Not sure if this was what
You are asking about.

Best Regards,
Dom.



pt., 30 lis 2018 o 08:24 Steve Bistline <sr...@gmail.com>
napisał(a):

> Hi,
>
> I have a silly question about Flink SQL that I cannot seem to find a clear
> answer to. If I have the following code. Will the "result" from the sql
> SELECT statement only return and the data then be written to S3 if and only
> if the statement returns data that matches the criteria?
>
> Does "nothing" happen otherwise ( ie no match to the sql statement.)?
>
> tableEnv.registerDataStream("SENSORS",dataset,"t_deviceID, t_timeStamp,
> t_sKey, t_sValue");
>
>
> // TEMEPERATURE
> Table result = tableEnv.sql("SELECT 'AlertTEMEPERATURE ',t_sKey,
> t_deviceID, t_sValue FROM SENSORS WHERE t_sKey='TEMPERATURE' AND t_sValue >
> " + TEMPERATURE_THRESHOLD);
> tableEnv.toAppendStream(result, Row.class).print();
> // Write to S3 bucket
> DataStream<Row> dsRow = tableEnv.toAppendStream(result, Row.class);
> String fileNameTemp = sdf.format(new Date());
> dsRow.writeAsText("s3://csv-ai/flink-alerts/"+fileNameTemp+
> "TEMPERATURE.txt");
>