You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by srikanth flink <fl...@gmail.com> on 2019/09/26 06:50:34 UTC

Flink SQL update-mode set to retract in env file.

How could I configure environment file for Flink SQL, update-mode: retract?

I have this for append:
properties:
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092
        - key: group.id
          value: reconMultiAttempFail
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'a': {
               type: 'string'
            },
            'b': {
               type: 'string'
            },
            'cnt': {
               type: 'string'
            }
          }
        }
      derive-schema: false

    schema:
      - name: 'a'
        type: VARCHAR
     - name: 'b'
        type: VARCHAR
      - name: 'cnt'
        type: BIGINT

Couldn't find any document for the same.

someone help me with the syntax.

Thanks
Srikanth

Re: Flink SQL update-mode set to retract in env file.

Posted by srikanth flink <fl...@gmail.com>.
Awesome, thanks!

On Thu, Sep 26, 2019 at 5:50 PM Terry Wang <zj...@gmail.com> wrote:

> Hi, Srikanth~
>
> In your code,
> DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable,
> Row.class).map(t -> {});  has converted the resultTable into a DataStream
> that’s unrelated with tableApi,
> And the following code `outStreamAgg.addSink(…)` is just a normall stream
> write to a FlinkKafka sink function.
> Your program code is a mixture of table api and dataStream programing not
> just single Table API.
>
> Best,
> Terry Wang
>
>
>
> 在 2019年9月26日,下午5:47,srikanth flink <fl...@gmail.com> 写道:
>
> Hi Terry Wang,
>
> Thanks for quick reply.
>
> I would like to understand more on your line " If the target table is a
> type of Kafka which implments AppendStreamTableSink, the update-mode will
> be append only".
> If your statement defines retract mode could not be used for Kafka sinks
> as it implements AppendStreamTableSink, but then the below code is working
> for me, dumping data to Kafka:
> DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable,
> Row.class).map(t -> {
> Row r = t.f1;
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", r.getField(0).toString());
> node.put("destination.ip", r.getField(1).toString());
> node.put("cnt", Long.parseLong(r.getField(2).toString()));
> return node.toString();
> });
> Properties kafkaProducerProperties = new Properties();
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "host:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>
> outStreamAgg.addSink(new
> FlinkKafkaProducer<String>("reconMultiAttempFail", new SimpleStringSchema(),
> kafkaProducerProperties));
>
> Is it that the above functionality works only with Table API and not with
> SQL?
> Please explain.
>
> Thanks
> Srikanth
>
>
>
> On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <zj...@gmail.com> wrote:
>
>> Hi srikanth~
>>
>> The Flink SQL update-mode is inferred from the target table type.
>> For now, there are three StreamTableSink type, `AppendStreamTableSink`
>> `UpsertStreamTableSink` and `RetractStreamTableSink`.
>> If the target table is a type of Kafka which implments
>> AppendStreamTableSink, the update-mode will be append only.
>> So if you want enable retract-mode you may need to insert into one kind
>> of RetractStreamTableSink.
>> Hope it helps you ~
>>
>>
>>
>> Best,
>> Terry Wang
>>
>>
>>
>> 在 2019年9月26日,下午2:50,srikanth flink <fl...@gmail.com> 写道:
>>
>> How could I configure environment file for Flink SQL, update-mode:
>> retract?
>>
>> I have this for append:
>> properties:
>>         - key: zookeeper.connect
>>           value: localhost:2181
>>         - key: bootstrap.servers
>>           value: localhost:9092
>>         - key: group.id
>>           value: reconMultiAttempFail
>>     format:
>>       type: json
>>       fail-on-missing-field: false
>>       json-schema: >
>>         {
>>           type: 'object',
>>           properties: {
>>             'a': {
>>                type: 'string'
>>             },
>>             'b': {
>>                type: 'string'
>>             },
>>             'cnt': {
>>                type: 'string'
>>             }
>>           }
>>         }
>>       derive-schema: false
>>
>>     schema:
>>       - name: 'a'
>>         type: VARCHAR
>>      - name: 'b'
>>         type: VARCHAR
>>       - name: 'cnt'
>>         type: BIGINT
>>
>> Couldn't find any document for the same.
>>
>> someone help me with the syntax.
>>
>> Thanks
>> Srikanth
>>
>>
>>
>

Re: Flink SQL update-mode set to retract in env file.

Posted by Terry Wang <zj...@gmail.com>.
Hi, Srikanth~

In your code, 
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {});  has converted the resultTable into a DataStream that’s unrelated with tableApi,
And the following code `outStreamAgg.addSink(…)` is just a normall stream write to a FlinkKafka sink function.
Your program code is a mixture of table api and dataStream programing not just single Table API.

Best,
Terry Wang



> 在 2019年9月26日,下午5:47,srikanth flink <fl...@gmail.com> 写道:
> 
> Hi Terry Wang,
> 
> Thanks for quick reply.
> 
> I would like to understand more on your line " If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only". 
> If your statement defines retract mode could not be used for Kafka sinks as it implements AppendStreamTableSink, but then the below code is working for me, dumping data to Kafka:
> DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {
> Row r = t.f1;
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", r.getField(0).toString());
> node.put("destination.ip", r.getField(1).toString());
> node.put("cnt", Long.parseLong(r.getField(2).toString()));
> return node.toString();
> }); 
> Properties kafkaProducerProperties = new Properties();
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
> 
> outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail", new SimpleStringSchema(),
> kafkaProducerProperties));
> 
> Is it that the above functionality works only with Table API and not with SQL?
> Please explain.
> 
> Thanks
> Srikanth
> 
> 
> 
> On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <zjuwangg@gmail.com <ma...@gmail.com>> wrote:
> Hi srikanth~
> 
> The Flink SQL update-mode is inferred from the target table type.
> For now, there are three StreamTableSink type, `AppendStreamTableSink` `UpsertStreamTableSink` and `RetractStreamTableSink`. 
> If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only. 
> So if you want enable retract-mode you may need to insert into one kind of RetractStreamTableSink.
> Hope it helps you ~
> 
> 
> 
> Best,
> Terry Wang
> 
> 
> 
>> 在 2019年9月26日,下午2:50,srikanth flink <flink.devv@gmail.com <ma...@gmail.com>> 写道:
>> 
>> How could I configure environment file for Flink SQL, update-mode: retract?
>> 
>> I have this for append:
>> properties:         
>>         - key: zookeeper.connect
>>           value: localhost:2181
>>         - key: bootstrap.servers
>>           value: localhost:9092
>>         - key: group.id <http://group.id/>
>>           value: reconMultiAttempFail
>>     format:
>>       type: json
>>       fail-on-missing-field: false
>>       json-schema: >
>>         {
>>           type: 'object',
>>           properties: {
>>             'a': {
>>                type: 'string'
>>             },
>>             'b': {
>>                type: 'string'
>>             },
>>             'cnt': {
>>                type: 'string'
>>             }
>>           }
>>         }
>>       derive-schema: false
>> 
>>     schema:
>>       - name: 'a'
>>         type: VARCHAR
>>      - name: 'b'
>>         type: VARCHAR
>>       - name: 'cnt'
>>         type: BIGINT
>> 
>> Couldn't find any document for the same. 
>> 
>> someone help me with the syntax.
>> 
>> Thanks
>> Srikanth
>> 
> 


Re: Flink SQL update-mode set to retract in env file.

Posted by srikanth flink <fl...@gmail.com>.
Hi Terry Wang,

Thanks for quick reply.

I would like to understand more on your line " If the target table is a
type of Kafka which implments AppendStreamTableSink, the update-mode will
be append only".
If your statement defines retract mode could not be used for Kafka sinks as
it implements AppendStreamTableSink, but then the below code is working for
me, dumping data to Kafka:
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable,
Row.class).map(t -> {
Row r = t.f1;
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", r.getField(0).toString());
node.put("destination.ip", r.getField(1).toString());
node.put("cnt", Long.parseLong(r.getField(2).toString()));
return node.toString();
});
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail",
new SimpleStringSchema(),
kafkaProducerProperties));

Is it that the above functionality works only with Table API and not with
SQL?
Please explain.

Thanks
Srikanth



On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <zj...@gmail.com> wrote:

> Hi srikanth~
>
> The Flink SQL update-mode is inferred from the target table type.
> For now, there are three StreamTableSink type, `AppendStreamTableSink`
> `UpsertStreamTableSink` and `RetractStreamTableSink`.
> If the target table is a type of Kafka which implments
> AppendStreamTableSink, the update-mode will be append only.
> So if you want enable retract-mode you may need to insert into one kind of
> RetractStreamTableSink.
> Hope it helps you ~
>
>
>
> Best,
> Terry Wang
>
>
>
> 在 2019年9月26日,下午2:50,srikanth flink <fl...@gmail.com> 写道:
>
> How could I configure environment file for Flink SQL, update-mode: retract?
>
> I have this for append:
> properties:
>         - key: zookeeper.connect
>           value: localhost:2181
>         - key: bootstrap.servers
>           value: localhost:9092
>         - key: group.id
>           value: reconMultiAttempFail
>     format:
>       type: json
>       fail-on-missing-field: false
>       json-schema: >
>         {
>           type: 'object',
>           properties: {
>             'a': {
>                type: 'string'
>             },
>             'b': {
>                type: 'string'
>             },
>             'cnt': {
>                type: 'string'
>             }
>           }
>         }
>       derive-schema: false
>
>     schema:
>       - name: 'a'
>         type: VARCHAR
>      - name: 'b'
>         type: VARCHAR
>       - name: 'cnt'
>         type: BIGINT
>
> Couldn't find any document for the same.
>
> someone help me with the syntax.
>
> Thanks
> Srikanth
>
>
>

Re: Flink SQL update-mode set to retract in env file.

Posted by Terry Wang <zj...@gmail.com>.
Hi srikanth~

The Flink SQL update-mode is inferred from the target table type.
For now, there are three StreamTableSink type, `AppendStreamTableSink` `UpsertStreamTableSink` and `RetractStreamTableSink`. 
If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only. 
So if you want enable retract-mode you may need to insert into one kind of RetractStreamTableSink.
Hope it helps you ~



Best,
Terry Wang



> 在 2019年9月26日,下午2:50,srikanth flink <fl...@gmail.com> 写道:
> 
> How could I configure environment file for Flink SQL, update-mode: retract?
> 
> I have this for append:
> properties:         
>         - key: zookeeper.connect
>           value: localhost:2181
>         - key: bootstrap.servers
>           value: localhost:9092
>         - key: group.id <http://group.id/>
>           value: reconMultiAttempFail
>     format:
>       type: json
>       fail-on-missing-field: false
>       json-schema: >
>         {
>           type: 'object',
>           properties: {
>             'a': {
>                type: 'string'
>             },
>             'b': {
>                type: 'string'
>             },
>             'cnt': {
>                type: 'string'
>             }
>           }
>         }
>       derive-schema: false
> 
>     schema:
>       - name: 'a'
>         type: VARCHAR
>      - name: 'b'
>         type: VARCHAR
>       - name: 'cnt'
>         type: BIGINT
> 
> Couldn't find any document for the same. 
> 
> someone help me with the syntax.
> 
> Thanks
> Srikanth
>