You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by kant kodali <ka...@gmail.com> on 2020/01/18 09:39:43 UTC

some basic questions

Hi All,

1) The Documentation says full outer join is supported however the below
code just exits with value 1. No error message.

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Test {

    public static void main(String... args) throws Exception {

        EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(env, bsSettings);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
                java.util.regex.Pattern.compile("test-topic1"),
                new SimpleStringSchema(),
                properties);
        FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
                java.util.regex.Pattern.compile("test-topic2"),
                new SimpleStringSchema(),
                properties);

        DataStream<String> stream1 = env.addSource(consumer1);
        DataStream<String> stream2 = env.addSource(consumer2);

        bsTableEnv.registerDataStream("sample1", stream1);
        bsTableEnv.registerDataStream("sample2", stream2);

        Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL
OUTER JOIN sample2 on sample1.f0=sample2.f0");
        result.printSchema();

        bsTableEnv.toAppendStream(result, Row.class).print();
        bsTableEnv.execute("sample job");
    }
}


2) If I am using a blink planner should I use TableEnvironment or
StreamTableEnvironment ?

3) Why flink current stable documentation(1.9) recommends (old planner)?
any rough timeline on when we would be able to use blink planner in
production? perhaps 1.10 or 1.11?

Thanks!

Re: some basic questions

Posted by godfrey he <go...@gmail.com>.
hi kant,
"FULL OUTER JOIN" job will generate retract message, so toRetractStream is
required to guarantee the correctness.
I think it's better to use StreamExecutionEnvrionment.execute, because you
have converted the Table to DataStream.

kant kodali <ka...@gmail.com> 于2020年1月19日周日 上午11:59写道:

> Hi Godfrey,
>
> I was just clicking the run button on my IDE and it doesn't really show me
> errors so I used command line fink run <jar> and that shows me what the
> error is. It tells me I need to change to toRetractStream() and both
> StreamExecutionEnvrionment and StreamTableEnvrionment .execute seems to
> work fine although I am not sure which one is the correct usage.
>
> Thanks!
>
> On Sat, Jan 18, 2020 at 6:52 PM kant kodali <ka...@gmail.com> wrote:
>
>> Hi Godfrey,
>>
>> Thanks a lot for your response. I just tried it with env.execute("simple
>> job") but I still get the same error message.
>>
>> Kant
>>
>> On Sat, Jan 18, 2020 at 6:26 PM godfrey he <go...@gmail.com> wrote:
>>
>>> hi kant,
>>>
>>> > 1) The Documentation says full outer join is supported however the
>>> below code just exits with value 1. No error message.
>>> if you have converted Table to DataStream, please execute it
>>> with StreamExecutionEnvironment ( call env.execute("simple job") )
>>>
>>> > 2) If I am using a blink planner should I use TableEnvironment or
>>> StreamTableEnvironment ?
>>> for streaming job, both Environment can be used. the difference is:
>>>   TableEnvironment will optimize multiple queries into one DAG when
>>> executing, while StreamTableEnvironment will independent optimize each
>>> query.
>>>   StreamTableEnvironment supports convert from/to DataStream,
>>> while TableEnvironment does not support it.
>>>   StreamTableEnvironment supports register TableFunction
>>> and AggregateFunction, while TableEnvironment does not support it now.
>>>
>>> for batch job, only TableEnvironment is the only choice, because
>>> DataStream does not support batch job now.
>>>
>>> > 3) Why flink current stable documentation(1.9) recommends (old
>>> planner)? any rough timeline on when we would be able to use blink planner
>>> in production? perhaps 1.10 or 1.11?
>>> 1.9 is blink planner's first version, and it is unstable. In 1.10, blink
>>> planner is more statable, we are switching the blink planner to the default
>>> step by step [0].
>>>
>>> [0]
>>> http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E
>>>
>>> kant kodali <ka...@gmail.com> 于2020年1月18日周六 下午5:40写道:
>>>
>>>> Hi All,
>>>>
>>>> 1) The Documentation says full outer join is supported however the
>>>> below code just exits with value 1. No error message.
>>>>
>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>>>> import org.apache.flink.table.api.*;
>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>> import org.apache.flink.types.Row;
>>>>
>>>> import java.util.Properties;
>>>>
>>>> public class Test {
>>>>
>>>>     public static void main(String... args) throws Exception {
>>>>
>>>>         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
>>>>
>>>>         Properties properties = new Properties();
>>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>         properties.setProperty("group.id", "test");
>>>>
>>>>         FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
>>>>                 java.util.regex.Pattern.compile("test-topic1"),
>>>>                 new SimpleStringSchema(),
>>>>                 properties);
>>>>         FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
>>>>                 java.util.regex.Pattern.compile("test-topic2"),
>>>>                 new SimpleStringSchema(),
>>>>                 properties);
>>>>
>>>>         DataStream<String> stream1 = env.addSource(consumer1);
>>>>         DataStream<String> stream2 = env.addSource(consumer2);
>>>>
>>>>         bsTableEnv.registerDataStream("sample1", stream1);
>>>>         bsTableEnv.registerDataStream("sample2", stream2);
>>>>
>>>>         Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>>>         result.printSchema();
>>>>
>>>>         bsTableEnv.toAppendStream(result, Row.class).print();
>>>>         bsTableEnv.execute("sample job");
>>>>     }
>>>> }
>>>>
>>>>
>>>> 2) If I am using a blink planner should I use TableEnvironment or
>>>> StreamTableEnvironment ?
>>>>
>>>> 3) Why flink current stable documentation(1.9) recommends (old
>>>> planner)? any rough timeline on when we would be able to use blink planner
>>>> in production? perhaps 1.10 or 1.11?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>

Re: some basic questions

Posted by kant kodali <ka...@gmail.com>.
Hi Godfrey,

I was just clicking the run button on my IDE and it doesn't really show me
errors so I used command line fink run <jar> and that shows me what the
error is. It tells me I need to change to toRetractStream() and both
StreamExecutionEnvrionment and StreamTableEnvrionment .execute seems to
work fine although I am not sure which one is the correct usage.

Thanks!

On Sat, Jan 18, 2020 at 6:52 PM kant kodali <ka...@gmail.com> wrote:

> Hi Godfrey,
>
> Thanks a lot for your response. I just tried it with env.execute("simple
> job") but I still get the same error message.
>
> Kant
>
> On Sat, Jan 18, 2020 at 6:26 PM godfrey he <go...@gmail.com> wrote:
>
>> hi kant,
>>
>> > 1) The Documentation says full outer join is supported however the
>> below code just exits with value 1. No error message.
>> if you have converted Table to DataStream, please execute it
>> with StreamExecutionEnvironment ( call env.execute("simple job") )
>>
>> > 2) If I am using a blink planner should I use TableEnvironment or
>> StreamTableEnvironment ?
>> for streaming job, both Environment can be used. the difference is:
>>   TableEnvironment will optimize multiple queries into one DAG when
>> executing, while StreamTableEnvironment will independent optimize each
>> query.
>>   StreamTableEnvironment supports convert from/to DataStream,
>> while TableEnvironment does not support it.
>>   StreamTableEnvironment supports register TableFunction
>> and AggregateFunction, while TableEnvironment does not support it now.
>>
>> for batch job, only TableEnvironment is the only choice, because
>> DataStream does not support batch job now.
>>
>> > 3) Why flink current stable documentation(1.9) recommends (old
>> planner)? any rough timeline on when we would be able to use blink planner
>> in production? perhaps 1.10 or 1.11?
>> 1.9 is blink planner's first version, and it is unstable. In 1.10, blink
>> planner is more statable, we are switching the blink planner to the default
>> step by step [0].
>>
>> [0]
>> http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E
>>
>> kant kodali <ka...@gmail.com> 于2020年1月18日周六 下午5:40写道:
>>
>>> Hi All,
>>>
>>> 1) The Documentation says full outer join is supported however the below
>>> code just exits with value 1. No error message.
>>>
>>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>>> import org.apache.flink.table.api.*;
>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>> import org.apache.flink.types.Row;
>>>
>>> import java.util.Properties;
>>>
>>> public class Test {
>>>
>>>     public static void main(String... args) throws Exception {
>>>
>>>         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>         StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
>>>
>>>         Properties properties = new Properties();
>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>         properties.setProperty("group.id", "test");
>>>
>>>         FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
>>>                 java.util.regex.Pattern.compile("test-topic1"),
>>>                 new SimpleStringSchema(),
>>>                 properties);
>>>         FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
>>>                 java.util.regex.Pattern.compile("test-topic2"),
>>>                 new SimpleStringSchema(),
>>>                 properties);
>>>
>>>         DataStream<String> stream1 = env.addSource(consumer1);
>>>         DataStream<String> stream2 = env.addSource(consumer2);
>>>
>>>         bsTableEnv.registerDataStream("sample1", stream1);
>>>         bsTableEnv.registerDataStream("sample2", stream2);
>>>
>>>         Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>>         result.printSchema();
>>>
>>>         bsTableEnv.toAppendStream(result, Row.class).print();
>>>         bsTableEnv.execute("sample job");
>>>     }
>>> }
>>>
>>>
>>> 2) If I am using a blink planner should I use TableEnvironment or
>>> StreamTableEnvironment ?
>>>
>>> 3) Why flink current stable documentation(1.9) recommends (old planner)?
>>> any rough timeline on when we would be able to use blink planner in
>>> production? perhaps 1.10 or 1.11?
>>>
>>> Thanks!
>>>
>>>
>>>

Re: some basic questions

Posted by kant kodali <ka...@gmail.com>.
Hi Godfrey,

Thanks a lot for your response. I just tried it with env.execute("simple
job") but I still get the same error message.

Kant

On Sat, Jan 18, 2020 at 6:26 PM godfrey he <go...@gmail.com> wrote:

> hi kant,
>
> > 1) The Documentation says full outer join is supported however the below
> code just exits with value 1. No error message.
> if you have converted Table to DataStream, please execute it
> with StreamExecutionEnvironment ( call env.execute("simple job") )
>
> > 2) If I am using a blink planner should I use TableEnvironment or
> StreamTableEnvironment ?
> for streaming job, both Environment can be used. the difference is:
>   TableEnvironment will optimize multiple queries into one DAG when
> executing, while StreamTableEnvironment will independent optimize each
> query.
>   StreamTableEnvironment supports convert from/to DataStream,
> while TableEnvironment does not support it.
>   StreamTableEnvironment supports register TableFunction
> and AggregateFunction, while TableEnvironment does not support it now.
>
> for batch job, only TableEnvironment is the only choice, because
> DataStream does not support batch job now.
>
> > 3) Why flink current stable documentation(1.9) recommends (old planner)?
> any rough timeline on when we would be able to use blink planner in
> production? perhaps 1.10 or 1.11?
> 1.9 is blink planner's first version, and it is unstable. In 1.10, blink
> planner is more statable, we are switching the blink planner to the default
> step by step [0].
>
> [0]
> http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E
>
> kant kodali <ka...@gmail.com> 于2020年1月18日周六 下午5:40写道:
>
>> Hi All,
>>
>> 1) The Documentation says full outer join is supported however the below
>> code just exits with value 1. No error message.
>>
>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>> import org.apache.flink.table.api.*;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.types.Row;
>>
>> import java.util.Properties;
>>
>> public class Test {
>>
>>     public static void main(String... args) throws Exception {
>>
>>         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
>>
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("group.id", "test");
>>
>>         FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
>>                 java.util.regex.Pattern.compile("test-topic1"),
>>                 new SimpleStringSchema(),
>>                 properties);
>>         FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
>>                 java.util.regex.Pattern.compile("test-topic2"),
>>                 new SimpleStringSchema(),
>>                 properties);
>>
>>         DataStream<String> stream1 = env.addSource(consumer1);
>>         DataStream<String> stream2 = env.addSource(consumer2);
>>
>>         bsTableEnv.registerDataStream("sample1", stream1);
>>         bsTableEnv.registerDataStream("sample2", stream2);
>>
>>         Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>         result.printSchema();
>>
>>         bsTableEnv.toAppendStream(result, Row.class).print();
>>         bsTableEnv.execute("sample job");
>>     }
>> }
>>
>>
>> 2) If I am using a blink planner should I use TableEnvironment or
>> StreamTableEnvironment ?
>>
>> 3) Why flink current stable documentation(1.9) recommends (old planner)?
>> any rough timeline on when we would be able to use blink planner in
>> production? perhaps 1.10 or 1.11?
>>
>> Thanks!
>>
>>
>>

Re: some basic questions

Posted by godfrey he <go...@gmail.com>.
hi kant,

> 1) The Documentation says full outer join is supported however the below
code just exits with value 1. No error message.
if you have converted Table to DataStream, please execute it
with StreamExecutionEnvironment ( call env.execute("simple job") )

> 2) If I am using a blink planner should I use TableEnvironment or
StreamTableEnvironment ?
for streaming job, both Environment can be used. the difference is:
  TableEnvironment will optimize multiple queries into one DAG when
executing, while StreamTableEnvironment will independent optimize each
query.
  StreamTableEnvironment supports convert from/to DataStream,
while TableEnvironment does not support it.
  StreamTableEnvironment supports register TableFunction
and AggregateFunction, while TableEnvironment does not support it now.

for batch job, only TableEnvironment is the only choice, because DataStream
does not support batch job now.

> 3) Why flink current stable documentation(1.9) recommends (old planner)?
any rough timeline on when we would be able to use blink planner in
production? perhaps 1.10 or 1.11?
1.9 is blink planner's first version, and it is unstable. In 1.10, blink
planner is more statable, we are switching the blink planner to the default
step by step [0].

[0]
http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E

kant kodali <ka...@gmail.com> 于2020年1月18日周六 下午5:40写道:

> Hi All,
>
> 1) The Documentation says full outer join is supported however the below
> code just exits with value 1. No error message.
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.table.api.*;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> import java.util.Properties;
>
> public class Test {
>
>     public static void main(String... args) throws Exception {
>
>         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
>
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         properties.setProperty("group.id", "test");
>
>         FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
>                 java.util.regex.Pattern.compile("test-topic1"),
>                 new SimpleStringSchema(),
>                 properties);
>         FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
>                 java.util.regex.Pattern.compile("test-topic2"),
>                 new SimpleStringSchema(),
>                 properties);
>
>         DataStream<String> stream1 = env.addSource(consumer1);
>         DataStream<String> stream2 = env.addSource(consumer2);
>
>         bsTableEnv.registerDataStream("sample1", stream1);
>         bsTableEnv.registerDataStream("sample2", stream2);
>
>         Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
>         result.printSchema();
>
>         bsTableEnv.toAppendStream(result, Row.class).print();
>         bsTableEnv.execute("sample job");
>     }
> }
>
>
> 2) If I am using a blink planner should I use TableEnvironment or
> StreamTableEnvironment ?
>
> 3) Why flink current stable documentation(1.9) recommends (old planner)?
> any rough timeline on when we would be able to use blink planner in
> production? perhaps 1.10 or 1.11?
>
> Thanks!
>
>
>