You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nishant Gupta <ni...@gmail.com> on 2019/09/18 12:02:26 UTC

Time Window Flink SQL join

Hi Team,

I am running a query for Time Window Join as below

INSERT INTO sourceKafkaMalicious
SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.`source.ip`=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

Time windowed join, Flink SQL should automatically clear older records, Some
how the query does not clear the heapspace and fails with error after
sometime.

Can you please let me know what could go wrong, or is it a issue

Environment File chunks
--------------------------------------------------------------------------------------------------------------------------------------------------------------
tables:
  - name: sourceKafka
    type: source-table
    update-mode: append
    connector:
      type: kafka
      version: "universal"
      topic: test-data-flatten
      properties:
        - key: zookeeper.connect
          value: x.x.x.x:2181
        - key: bootstrap.servers
          value: x.x.x.x:9092
        - key: group.id
          value: testgroup
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'source.ip': {
               type: 'string'
            },
            'source.port': {
               type: 'string'
            }
          }
        }
      derive-schema: false
    schema:
      - name: ' source.ip '
        type: VARCHAR
      - name: 'source.port'
        type: VARCHAR

  - name: sourceKafkaMalicious
    type: sink-table
    update-mode: append
    connector:
      type: kafka
      version: "universal"
      topic: test-data-mal
      properties:
        - key: zookeeper.connect
          value: x.x.x.x:2181
        - key: bootstrap.servers
          value: x.x.x.x:9092
        - key: group.id
          value: testgroupmal
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'source.ip': {
               type: 'string'
            },
            'source.port': {
               type: 'string'
            }
          }
        }
      derive-schema: false
    schema:
      - name: ' source.ip '
        type: VARCHAR
      - name: 'source.port'
        type: VARCHAR

  - name: badips
    type: source-table
    #update-mode: append
    connector:
      type: filesystem
      path: "/home/cyanadmin/ipsum/levels/badips.csv"
    format:
      type: csv
      fields:
        - name: ip
          type: VARCHAR
      comment-prefix: "#"
    schema:
      - name: ip
        type: VARCHAR

execution:
  planner: blink
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 1000000
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 0
  max-idle-state-retention: 0
  restart-strategy:
    type: fallback

configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb
 Properties that describe the cluster to which table programs are submitted
to.

deployment:
  response-timeout: 5000

--------------------------------------------------------------------------------------------------------------------------------------------------------------

Re: Time Window Flink SQL join

Posted by Nishant Gupta <ni...@gmail.com>.
Hi Fabian,

I am facing an issue if run multiple queries like this:
Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
(BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");
Table resultKafkaSafe = tableEnv.sqlQuery( "SELECT K.sourceIp,
K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
(BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp<>B.bad_ip");

(And also now heap space issue is not coming with temporal tables)
Error:
-------
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Only single column join key is supported. Found [] in [InnerJoin(where:
(AND(__TEMPORAL_JOIN_CONDITION(k_proctime, bad_ip), <>(sourceIp, bad_ip))),
join: (sourceIp, destinationIp, k_proctime, bad_ip))]
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:198)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:149)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:101)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:168)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:277)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$.create(DataStreamTemporalJoinToCoProcessTranslator.scala:117)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalTableJoin.createTranslator(DataStreamTemporalTableJoin.scala:77)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:110)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:98)
at
org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:250)
at
org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:431)
at
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:421)
at org.apache.flink.table.planner.StreamPlanner.org
$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
at
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
at flink.StreamingJob.main(StreamingJob.java:140)

On Fri, Sep 20, 2019 at 8:26 PM Nishant Gupta <ni...@gmail.com>
wrote:

> Use case is similar. But not able to check heap space issue, as data size
> is small. Thought of mean while checking with you.
>
>
> Thanks for looking into it. Really appreciate it.
> I have marked the usage of temporal tables in bold red for ease of
> reference.
>
> On Fri, Sep 20, 2019, 8:18 PM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> This looks OK on the first sight.
>> Is it doing what you expect?
>>
>> Fabian
>>
>> Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta <
>> nishantgupta1010@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for the information.
>>> I have been reading about it and doing the same as a part of flink job
>>> written in Java
>>>
>>> I am using proctime for both the tables. Can you please verify once the
>>> implementation of temporal tables
>>>
>>>
>>> here is the snippet.
>>> ----------------------------
>>> public class StreamingJob {
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> ParameterTool params = ParameterTool.fromArgs(args);
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>>
>>> Properties kafkaConsumerProperties = new Properties();
>>>
>>> kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
>>> kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
>>> "cg54");
>>> kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>> "latest");
>>>
>>>
>>> kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>>>
>>> kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>>>
>>> DataStream<String> badipStream = env.addSource(new
>>> FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
>>> kafkaConsumerProperties));
>>>
>>> DataStream<String> badipStreamM =
>>> badipStream
>>> .map(new MapFunction<String, String>() {
>>>        private static final long serialVersionUID = -686775202L;
>>>        @Override
>>>        public String map(String value) throws Exception {
>>>         try {
>>>         String[] v = value.split("\\t");
>>>     if(v.length > 1) {
>>>     return v[0].toString();
>>>     } else
>>>     return "0.0.0.0";
>>>         } catch (Exception e) {
>>> System.err.println(e);
>>> return "0.0.0.0";
>>>         }
>>>        }
>>>    });
>>>
>>> Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
>>> r_proctime.proctime");*
>>>
>>> tableEnv.registerTable("BadIP", badipTable);
>>> TemporalTableFunction badIPTT =
>>> badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
>>> tableEnv.registerFunction("BadIPTT", badIPTT);
>>>
>>>
>>>
>>> DataStream<ObjectNode> inKafkaStream = env
>>> .addSource(new FlinkKafkaConsumer<>("tests", new
>>> JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
>>> DataStream<Tuple2<String,String>> inKafkaStreamM =
>>> inKafkaStream
>>> .rebalance()
>>> .filter(value -> value != null)
>>> .map(new MapFunction<ObjectNode, Tuple2<String,String>>() {
>>>        private static final long serialVersionUID = -6867120202L;
>>>        @Override
>>>        public Tuple2<String,String> map(ObjectNode node) throws
>>> Exception {
>>>         try {
>>>         ObjectNode nodeValue = (ObjectNode) node.get("value");
>>>             return new Tuple2<>(nodeValue.get("source.ip").asText(),
>>> nodeValue.get("destination.ip").asText());
>>>         } catch (Exception e) {
>>> System.err.println(e);
>>> System.out.println(node);
>>> return null;
>>>         }
>>>        }
>>>    });
>>>
>>> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
>>> destinationIp, k_proctime.proctime"*);
>>> tableEnv.registerTable("KafkaSource", kafkaSource);
>>> * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
>>> K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
>>> (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*
>>>
>>> TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>(
>>>  Types.STRING(),
>>>  Types.STRING());
>>>
>>> DataStream<Tuple2<String,String>> outStreamMalicious =
>>> tableEnv.toAppendStream(resultKafkaMalicious, tupleType);
>>>
>>> Properties kafkaProducerProperties = new Properties();
>>>
>>> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
>>> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
>>> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>>>
>>> ObjectMapper mapper = new ObjectMapper();
>>> DataStream<String> sinkStreamMaliciousData = outStreamMalicious
>>> .map(new MapFunction<Tuple2<String,String>,String>() {
>>> private static final long serialVersionUID = -6347120202L;
>>> @Override
>>> public String map(Tuple2<String,String> tuple) throws Exception {
>>> try {
>>> ObjectNode node = mapper.createObjectNode();
>>> node.put("source.ip", tuple.f0);
>>> node.put("destination.ip", tuple.f1);
>>> return node.toString();
>>> } catch (Exception e) {
>>> System.err.println(e);
>>> System.out.println(tuple);
>>> return null;
>>> }
>>> }
>>> });
>>>
>>>
>>> sinkStreamMaliciousData.addSink(new
>>> FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
>>> kafkaProducerProperties));
>>> env.execute("Flink List Matching");
>>> }
>>> -------------------------------------------------------
>>>
>>> On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi Nishant,
>>>>
>>>> You should model the query as a join with a time-versioned table [1].
>>>> The bad-ips table would be the time-time versioned table [2].
>>>> Since it is a time-versioned table, it could even be updated with new
>>>> IPs.
>>>>
>>>> This type of join will only keep the time-versioned table (the bad-ips
>>>> in state) and not the other (high-volume) table.
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html
>>>>
>>>> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
>>>> nishantgupta1010@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for your reply
>>>>> I have a continuous stream of kafka coming and static table of badips.
>>>>> I wanted to segregate records having bad ip.
>>>>>
>>>>> So therefore i was joining it. But with that 60 gb memory getting run
>>>>> out
>>>>>
>>>>> So i used below query.
>>>>> Can u please guide me in this regard
>>>>>
>>>>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The query that you wrote is not a time-windowed join.
>>>>>>
>>>>>> INSERT INTO sourceKafkaMalicious
>>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>>>> sourceKafka.`source.ip`=badips.ip
>>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>>>
>>>>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing
>>>>>> time (or event time) attribute of badips.
>>>>>>
>>>>>> What exactly are you trying to achieve with the query?
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>>>>>> nishantgupta1010@gmail.com>:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> I am running a query for Time Window Join as below
>>>>>>>
>>>>>>> INSERT INTO sourceKafkaMalicious
>>>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>>>>> sourceKafka.`source.ip`=badips.ip
>>>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>>>>
>>>>>>> Time windowed join, Flink SQL should automatically clear older
>>>>>>> records, Some how the query does not clear the heapspace and fails
>>>>>>> with error after sometime.
>>>>>>>
>>>>>>> Can you please let me know what could go wrong, or is it a issue
>>>>>>>
>>>>>>> Environment File chunks
>>>>>>>
>>>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>> tables:
>>>>>>>   - name: sourceKafka
>>>>>>>     type: source-table
>>>>>>>     update-mode: append
>>>>>>>     connector:
>>>>>>>       type: kafka
>>>>>>>       version: "universal"
>>>>>>>       topic: test-data-flatten
>>>>>>>       properties:
>>>>>>>         - key: zookeeper.connect
>>>>>>>           value: x.x.x.x:2181
>>>>>>>         - key: bootstrap.servers
>>>>>>>           value: x.x.x.x:9092
>>>>>>>         - key: group.id
>>>>>>>           value: testgroup
>>>>>>>     format:
>>>>>>>       type: json
>>>>>>>       fail-on-missing-field: false
>>>>>>>       json-schema: >
>>>>>>>         {
>>>>>>>           type: 'object',
>>>>>>>           properties: {
>>>>>>>             'source.ip': {
>>>>>>>                type: 'string'
>>>>>>>             },
>>>>>>>             'source.port': {
>>>>>>>                type: 'string'
>>>>>>>             }
>>>>>>>           }
>>>>>>>         }
>>>>>>>       derive-schema: false
>>>>>>>     schema:
>>>>>>>       - name: ' source.ip '
>>>>>>>         type: VARCHAR
>>>>>>>       - name: 'source.port'
>>>>>>>         type: VARCHAR
>>>>>>>
>>>>>>>   - name: sourceKafkaMalicious
>>>>>>>     type: sink-table
>>>>>>>     update-mode: append
>>>>>>>     connector:
>>>>>>>       type: kafka
>>>>>>>       version: "universal"
>>>>>>>       topic: test-data-mal
>>>>>>>       properties:
>>>>>>>         - key: zookeeper.connect
>>>>>>>           value: x.x.x.x:2181
>>>>>>>         - key: bootstrap.servers
>>>>>>>           value: x.x.x.x:9092
>>>>>>>         - key: group.id
>>>>>>>           value: testgroupmal
>>>>>>>     format:
>>>>>>>       type: json
>>>>>>>       fail-on-missing-field: false
>>>>>>>       json-schema: >
>>>>>>>         {
>>>>>>>           type: 'object',
>>>>>>>           properties: {
>>>>>>>             'source.ip': {
>>>>>>>                type: 'string'
>>>>>>>             },
>>>>>>>             'source.port': {
>>>>>>>                type: 'string'
>>>>>>>             }
>>>>>>>           }
>>>>>>>         }
>>>>>>>       derive-schema: false
>>>>>>>     schema:
>>>>>>>       - name: ' source.ip '
>>>>>>>         type: VARCHAR
>>>>>>>       - name: 'source.port'
>>>>>>>         type: VARCHAR
>>>>>>>
>>>>>>>   - name: badips
>>>>>>>     type: source-table
>>>>>>>     #update-mode: append
>>>>>>>     connector:
>>>>>>>       type: filesystem
>>>>>>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>>>>>     format:
>>>>>>>       type: csv
>>>>>>>       fields:
>>>>>>>         - name: ip
>>>>>>>           type: VARCHAR
>>>>>>>       comment-prefix: "#"
>>>>>>>     schema:
>>>>>>>       - name: ip
>>>>>>>         type: VARCHAR
>>>>>>>
>>>>>>> execution:
>>>>>>>   planner: blink
>>>>>>>   type: streaming
>>>>>>>   time-characteristic: event-time
>>>>>>>   periodic-watermarks-interval: 200
>>>>>>>   result-mode: table
>>>>>>>   max-table-result-rows: 1000000
>>>>>>>   parallelism: 3
>>>>>>>   max-parallelism: 128
>>>>>>>   min-idle-state-retention: 0
>>>>>>>   max-idle-state-retention: 0
>>>>>>>   restart-strategy:
>>>>>>>     type: fallback
>>>>>>>
>>>>>>> configuration:
>>>>>>>   table.optimizer.join-reorder-enabled: true
>>>>>>>   table.exec.spill-compression.enabled: true
>>>>>>>   table.exec.spill-compression.block-size: 128kb
>>>>>>>  Properties that describe the cluster to which table programs are
>>>>>>> submitted to.
>>>>>>>
>>>>>>> deployment:
>>>>>>>   response-timeout: 5000
>>>>>>>
>>>>>>>
>>>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>
>>>>>>

Re: Time Window Flink SQL join

Posted by Nishant Gupta <ni...@gmail.com>.
Use case is similar. But not able to check heap space issue, as data size
is small. Thought of mean while checking with you.


Thanks for looking into it. Really appreciate it.
I have marked the usage of temporal tables in bold red for ease of
reference.

On Fri, Sep 20, 2019, 8:18 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> This looks OK on the first sight.
> Is it doing what you expect?
>
> Fabian
>
> Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta <
> nishantgupta1010@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for the information.
>> I have been reading about it and doing the same as a part of flink job
>> written in Java
>>
>> I am using proctime for both the tables. Can you please verify once the
>> implementation of temporal tables
>>
>>
>> here is the snippet.
>> ----------------------------
>> public class StreamingJob {
>>
>> public static void main(String[] args) throws Exception {
>>
>> ParameterTool params = ParameterTool.fromArgs(args);
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>
>> Properties kafkaConsumerProperties = new Properties();
>>
>> kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
>> kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
>> "cg54");
>> kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>> "latest");
>>
>>
>> kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>>
>> kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>>
>> DataStream<String> badipStream = env.addSource(new
>> FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
>> kafkaConsumerProperties));
>>
>> DataStream<String> badipStreamM =
>> badipStream
>> .map(new MapFunction<String, String>() {
>>        private static final long serialVersionUID = -686775202L;
>>        @Override
>>        public String map(String value) throws Exception {
>>         try {
>>         String[] v = value.split("\\t");
>>     if(v.length > 1) {
>>     return v[0].toString();
>>     } else
>>     return "0.0.0.0";
>>         } catch (Exception e) {
>> System.err.println(e);
>> return "0.0.0.0";
>>         }
>>        }
>>    });
>>
>> Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
>> r_proctime.proctime");*
>>
>> tableEnv.registerTable("BadIP", badipTable);
>> TemporalTableFunction badIPTT =
>> badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
>> tableEnv.registerFunction("BadIPTT", badIPTT);
>>
>>
>>
>> DataStream<ObjectNode> inKafkaStream = env
>> .addSource(new FlinkKafkaConsumer<>("tests", new
>> JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
>> DataStream<Tuple2<String,String>> inKafkaStreamM =
>> inKafkaStream
>> .rebalance()
>> .filter(value -> value != null)
>> .map(new MapFunction<ObjectNode, Tuple2<String,String>>() {
>>        private static final long serialVersionUID = -6867120202L;
>>        @Override
>>        public Tuple2<String,String> map(ObjectNode node) throws Exception
>> {
>>         try {
>>         ObjectNode nodeValue = (ObjectNode) node.get("value");
>>             return new Tuple2<>(nodeValue.get("source.ip").asText(),
>> nodeValue.get("destination.ip").asText());
>>         } catch (Exception e) {
>> System.err.println(e);
>> System.out.println(node);
>> return null;
>>         }
>>        }
>>    });
>>
>> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
>> destinationIp, k_proctime.proctime"*);
>> tableEnv.registerTable("KafkaSource", kafkaSource);
>> * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
>> K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
>> (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*
>>
>> TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>(
>>  Types.STRING(),
>>  Types.STRING());
>>
>> DataStream<Tuple2<String,String>> outStreamMalicious =
>> tableEnv.toAppendStream(resultKafkaMalicious, tupleType);
>>
>> Properties kafkaProducerProperties = new Properties();
>>
>> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
>> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
>> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>>
>> ObjectMapper mapper = new ObjectMapper();
>> DataStream<String> sinkStreamMaliciousData = outStreamMalicious
>> .map(new MapFunction<Tuple2<String,String>,String>() {
>> private static final long serialVersionUID = -6347120202L;
>> @Override
>> public String map(Tuple2<String,String> tuple) throws Exception {
>> try {
>> ObjectNode node = mapper.createObjectNode();
>> node.put("source.ip", tuple.f0);
>> node.put("destination.ip", tuple.f1);
>> return node.toString();
>> } catch (Exception e) {
>> System.err.println(e);
>> System.out.println(tuple);
>> return null;
>> }
>> }
>> });
>>
>>
>> sinkStreamMaliciousData.addSink(new
>> FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
>> kafkaProducerProperties));
>> env.execute("Flink List Matching");
>> }
>> -------------------------------------------------------
>>
>> On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Nishant,
>>>
>>> You should model the query as a join with a time-versioned table [1].
>>> The bad-ips table would be the time-time versioned table [2].
>>> Since it is a time-versioned table, it could even be updated with new
>>> IPs.
>>>
>>> This type of join will only keep the time-versioned table (the bad-ips
>>> in state) and not the other (high-volume) table.
>>>
>>> Best,
>>> Fabian
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html
>>>
>>> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
>>> nishantgupta1010@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks for your reply
>>>> I have a continuous stream of kafka coming and static table of badips.
>>>> I wanted to segregate records having bad ip.
>>>>
>>>> So therefore i was joining it. But with that 60 gb memory getting run
>>>> out
>>>>
>>>> So i used below query.
>>>> Can u please guide me in this regard
>>>>
>>>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> The query that you wrote is not a time-windowed join.
>>>>>
>>>>> INSERT INTO sourceKafkaMalicious
>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>>> sourceKafka.`source.ip`=badips.ip
>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>>
>>>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing
>>>>> time (or event time) attribute of badips.
>>>>>
>>>>> What exactly are you trying to achieve with the query?
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>>>>> nishantgupta1010@gmail.com>:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> I am running a query for Time Window Join as below
>>>>>>
>>>>>> INSERT INTO sourceKafkaMalicious
>>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>>>> sourceKafka.`source.ip`=badips.ip
>>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>>>
>>>>>> Time windowed join, Flink SQL should automatically clear older
>>>>>> records, Some how the query does not clear the heapspace and fails
>>>>>> with error after sometime.
>>>>>>
>>>>>> Can you please let me know what could go wrong, or is it a issue
>>>>>>
>>>>>> Environment File chunks
>>>>>>
>>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>> tables:
>>>>>>   - name: sourceKafka
>>>>>>     type: source-table
>>>>>>     update-mode: append
>>>>>>     connector:
>>>>>>       type: kafka
>>>>>>       version: "universal"
>>>>>>       topic: test-data-flatten
>>>>>>       properties:
>>>>>>         - key: zookeeper.connect
>>>>>>           value: x.x.x.x:2181
>>>>>>         - key: bootstrap.servers
>>>>>>           value: x.x.x.x:9092
>>>>>>         - key: group.id
>>>>>>           value: testgroup
>>>>>>     format:
>>>>>>       type: json
>>>>>>       fail-on-missing-field: false
>>>>>>       json-schema: >
>>>>>>         {
>>>>>>           type: 'object',
>>>>>>           properties: {
>>>>>>             'source.ip': {
>>>>>>                type: 'string'
>>>>>>             },
>>>>>>             'source.port': {
>>>>>>                type: 'string'
>>>>>>             }
>>>>>>           }
>>>>>>         }
>>>>>>       derive-schema: false
>>>>>>     schema:
>>>>>>       - name: ' source.ip '
>>>>>>         type: VARCHAR
>>>>>>       - name: 'source.port'
>>>>>>         type: VARCHAR
>>>>>>
>>>>>>   - name: sourceKafkaMalicious
>>>>>>     type: sink-table
>>>>>>     update-mode: append
>>>>>>     connector:
>>>>>>       type: kafka
>>>>>>       version: "universal"
>>>>>>       topic: test-data-mal
>>>>>>       properties:
>>>>>>         - key: zookeeper.connect
>>>>>>           value: x.x.x.x:2181
>>>>>>         - key: bootstrap.servers
>>>>>>           value: x.x.x.x:9092
>>>>>>         - key: group.id
>>>>>>           value: testgroupmal
>>>>>>     format:
>>>>>>       type: json
>>>>>>       fail-on-missing-field: false
>>>>>>       json-schema: >
>>>>>>         {
>>>>>>           type: 'object',
>>>>>>           properties: {
>>>>>>             'source.ip': {
>>>>>>                type: 'string'
>>>>>>             },
>>>>>>             'source.port': {
>>>>>>                type: 'string'
>>>>>>             }
>>>>>>           }
>>>>>>         }
>>>>>>       derive-schema: false
>>>>>>     schema:
>>>>>>       - name: ' source.ip '
>>>>>>         type: VARCHAR
>>>>>>       - name: 'source.port'
>>>>>>         type: VARCHAR
>>>>>>
>>>>>>   - name: badips
>>>>>>     type: source-table
>>>>>>     #update-mode: append
>>>>>>     connector:
>>>>>>       type: filesystem
>>>>>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>>>>     format:
>>>>>>       type: csv
>>>>>>       fields:
>>>>>>         - name: ip
>>>>>>           type: VARCHAR
>>>>>>       comment-prefix: "#"
>>>>>>     schema:
>>>>>>       - name: ip
>>>>>>         type: VARCHAR
>>>>>>
>>>>>> execution:
>>>>>>   planner: blink
>>>>>>   type: streaming
>>>>>>   time-characteristic: event-time
>>>>>>   periodic-watermarks-interval: 200
>>>>>>   result-mode: table
>>>>>>   max-table-result-rows: 1000000
>>>>>>   parallelism: 3
>>>>>>   max-parallelism: 128
>>>>>>   min-idle-state-retention: 0
>>>>>>   max-idle-state-retention: 0
>>>>>>   restart-strategy:
>>>>>>     type: fallback
>>>>>>
>>>>>> configuration:
>>>>>>   table.optimizer.join-reorder-enabled: true
>>>>>>   table.exec.spill-compression.enabled: true
>>>>>>   table.exec.spill-compression.block-size: 128kb
>>>>>>  Properties that describe the cluster to which table programs are
>>>>>> submitted to.
>>>>>>
>>>>>> deployment:
>>>>>>   response-timeout: 5000
>>>>>>
>>>>>>
>>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>
>>>>>

Re: Time Window Flink SQL join

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

This looks OK on the first sight.
Is it doing what you expect?

Fabian

Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta <
nishantgupta1010@gmail.com>:

> Hi Fabian,
>
> Thanks for the information.
> I have been reading about it and doing the same as a part of flink job
> written in Java
>
> I am using proctime for both the tables. Can you please verify once the
> implementation of temporal tables
>
>
> here is the snippet.
> ----------------------------
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
>
> ParameterTool params = ParameterTool.fromArgs(args);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> Properties kafkaConsumerProperties = new Properties();
>
> kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
> kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "cg54");
> kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "latest");
>
>
> kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>
> kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>
> DataStream<String> badipStream = env.addSource(new
> FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
> kafkaConsumerProperties));
>
> DataStream<String> badipStreamM =
> badipStream
> .map(new MapFunction<String, String>() {
>        private static final long serialVersionUID = -686775202L;
>        @Override
>        public String map(String value) throws Exception {
>         try {
>         String[] v = value.split("\\t");
>     if(v.length > 1) {
>     return v[0].toString();
>     } else
>     return "0.0.0.0";
>         } catch (Exception e) {
> System.err.println(e);
> return "0.0.0.0";
>         }
>        }
>    });
>
> Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
> r_proctime.proctime");*
>
> tableEnv.registerTable("BadIP", badipTable);
> TemporalTableFunction badIPTT =
> badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
> tableEnv.registerFunction("BadIPTT", badIPTT);
>
>
>
> DataStream<ObjectNode> inKafkaStream = env
> .addSource(new FlinkKafkaConsumer<>("tests", new
> JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
> DataStream<Tuple2<String,String>> inKafkaStreamM =
> inKafkaStream
> .rebalance()
> .filter(value -> value != null)
> .map(new MapFunction<ObjectNode, Tuple2<String,String>>() {
>        private static final long serialVersionUID = -6867120202L;
>        @Override
>        public Tuple2<String,String> map(ObjectNode node) throws Exception
> {
>         try {
>         ObjectNode nodeValue = (ObjectNode) node.get("value");
>             return new Tuple2<>(nodeValue.get("source.ip").asText(),
> nodeValue.get("destination.ip").asText());
>         } catch (Exception e) {
> System.err.println(e);
> System.out.println(node);
> return null;
>         }
>        }
>    });
>
> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
> destinationIp, k_proctime.proctime"*);
> tableEnv.registerTable("KafkaSource", kafkaSource);
> * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
> K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
> (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*
>
> TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>(
>  Types.STRING(),
>  Types.STRING());
>
> DataStream<Tuple2<String,String>> outStreamMalicious =
> tableEnv.toAppendStream(resultKafkaMalicious, tupleType);
>
> Properties kafkaProducerProperties = new Properties();
>
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>
> ObjectMapper mapper = new ObjectMapper();
> DataStream<String> sinkStreamMaliciousData = outStreamMalicious
> .map(new MapFunction<Tuple2<String,String>,String>() {
> private static final long serialVersionUID = -6347120202L;
> @Override
> public String map(Tuple2<String,String> tuple) throws Exception {
> try {
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", tuple.f0);
> node.put("destination.ip", tuple.f1);
> return node.toString();
> } catch (Exception e) {
> System.err.println(e);
> System.out.println(tuple);
> return null;
> }
> }
> });
>
>
> sinkStreamMaliciousData.addSink(new
> FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
> kafkaProducerProperties));
> env.execute("Flink List Matching");
> }
> -------------------------------------------------------
>
> On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Nishant,
>>
>> You should model the query as a join with a time-versioned table [1].
>> The bad-ips table would be the time-time versioned table [2].
>> Since it is a time-versioned table, it could even be updated with new IPs.
>>
>> This type of join will only keep the time-versioned table (the bad-ips in
>> state) and not the other (high-volume) table.
>>
>> Best,
>> Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html
>>
>> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
>> nishantgupta1010@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for your reply
>>> I have a continuous stream of kafka coming and static table of badips. I
>>> wanted to segregate records having bad ip.
>>>
>>> So therefore i was joining it. But with that 60 gb memory getting run out
>>>
>>> So i used below query.
>>> Can u please guide me in this regard
>>>
>>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> The query that you wrote is not a time-windowed join.
>>>>
>>>> INSERT INTO sourceKafkaMalicious
>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>> sourceKafka.`source.ip`=badips.ip
>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>
>>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing
>>>> time (or event time) attribute of badips.
>>>>
>>>> What exactly are you trying to achieve with the query?
>>>>
>>>> Best, Fabian
>>>>
>>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>>>> nishantgupta1010@gmail.com>:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I am running a query for Time Window Join as below
>>>>>
>>>>> INSERT INTO sourceKafkaMalicious
>>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>>> sourceKafka.`source.ip`=badips.ip
>>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>>
>>>>> Time windowed join, Flink SQL should automatically clear older
>>>>> records, Some how the query does not clear the heapspace and fails
>>>>> with error after sometime.
>>>>>
>>>>> Can you please let me know what could go wrong, or is it a issue
>>>>>
>>>>> Environment File chunks
>>>>>
>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>> tables:
>>>>>   - name: sourceKafka
>>>>>     type: source-table
>>>>>     update-mode: append
>>>>>     connector:
>>>>>       type: kafka
>>>>>       version: "universal"
>>>>>       topic: test-data-flatten
>>>>>       properties:
>>>>>         - key: zookeeper.connect
>>>>>           value: x.x.x.x:2181
>>>>>         - key: bootstrap.servers
>>>>>           value: x.x.x.x:9092
>>>>>         - key: group.id
>>>>>           value: testgroup
>>>>>     format:
>>>>>       type: json
>>>>>       fail-on-missing-field: false
>>>>>       json-schema: >
>>>>>         {
>>>>>           type: 'object',
>>>>>           properties: {
>>>>>             'source.ip': {
>>>>>                type: 'string'
>>>>>             },
>>>>>             'source.port': {
>>>>>                type: 'string'
>>>>>             }
>>>>>           }
>>>>>         }
>>>>>       derive-schema: false
>>>>>     schema:
>>>>>       - name: ' source.ip '
>>>>>         type: VARCHAR
>>>>>       - name: 'source.port'
>>>>>         type: VARCHAR
>>>>>
>>>>>   - name: sourceKafkaMalicious
>>>>>     type: sink-table
>>>>>     update-mode: append
>>>>>     connector:
>>>>>       type: kafka
>>>>>       version: "universal"
>>>>>       topic: test-data-mal
>>>>>       properties:
>>>>>         - key: zookeeper.connect
>>>>>           value: x.x.x.x:2181
>>>>>         - key: bootstrap.servers
>>>>>           value: x.x.x.x:9092
>>>>>         - key: group.id
>>>>>           value: testgroupmal
>>>>>     format:
>>>>>       type: json
>>>>>       fail-on-missing-field: false
>>>>>       json-schema: >
>>>>>         {
>>>>>           type: 'object',
>>>>>           properties: {
>>>>>             'source.ip': {
>>>>>                type: 'string'
>>>>>             },
>>>>>             'source.port': {
>>>>>                type: 'string'
>>>>>             }
>>>>>           }
>>>>>         }
>>>>>       derive-schema: false
>>>>>     schema:
>>>>>       - name: ' source.ip '
>>>>>         type: VARCHAR
>>>>>       - name: 'source.port'
>>>>>         type: VARCHAR
>>>>>
>>>>>   - name: badips
>>>>>     type: source-table
>>>>>     #update-mode: append
>>>>>     connector:
>>>>>       type: filesystem
>>>>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>>>     format:
>>>>>       type: csv
>>>>>       fields:
>>>>>         - name: ip
>>>>>           type: VARCHAR
>>>>>       comment-prefix: "#"
>>>>>     schema:
>>>>>       - name: ip
>>>>>         type: VARCHAR
>>>>>
>>>>> execution:
>>>>>   planner: blink
>>>>>   type: streaming
>>>>>   time-characteristic: event-time
>>>>>   periodic-watermarks-interval: 200
>>>>>   result-mode: table
>>>>>   max-table-result-rows: 1000000
>>>>>   parallelism: 3
>>>>>   max-parallelism: 128
>>>>>   min-idle-state-retention: 0
>>>>>   max-idle-state-retention: 0
>>>>>   restart-strategy:
>>>>>     type: fallback
>>>>>
>>>>> configuration:
>>>>>   table.optimizer.join-reorder-enabled: true
>>>>>   table.exec.spill-compression.enabled: true
>>>>>   table.exec.spill-compression.block-size: 128kb
>>>>>  Properties that describe the cluster to which table programs are
>>>>> submitted to.
>>>>>
>>>>> deployment:
>>>>>   response-timeout: 5000
>>>>>
>>>>>
>>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>
>>>>

Re: Time Window Flink SQL join

Posted by Nishant Gupta <ni...@gmail.com>.
Hi Fabian,

Thanks for the information.
I have been reading about it and doing the same as a part of flink job
written in Java

I am using proctime for both the tables. Can you please verify once the
implementation of temporal tables


here is the snippet.
----------------------------
public class StreamingJob {

public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Properties kafkaConsumerProperties = new Properties();
kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cg54");
kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest");

kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

DataStream<String> badipStream = env.addSource(new
FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
kafkaConsumerProperties));

DataStream<String> badipStreamM =
badipStream
.map(new MapFunction<String, String>() {
       private static final long serialVersionUID = -686775202L;
       @Override
       public String map(String value) throws Exception {
        try {
        String[] v = value.split("\\t");
    if(v.length > 1) {
    return v[0].toString();
    } else
    return "0.0.0.0";
        } catch (Exception e) {
System.err.println(e);
return "0.0.0.0";
        }
       }
   });

Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
r_proctime.proctime");*

tableEnv.registerTable("BadIP", badipTable);
TemporalTableFunction badIPTT =
badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
tableEnv.registerFunction("BadIPTT", badIPTT);



DataStream<ObjectNode> inKafkaStream = env
.addSource(new FlinkKafkaConsumer<>("tests", new
JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
DataStream<Tuple2<String,String>> inKafkaStreamM =
inKafkaStream
.rebalance()
.filter(value -> value != null)
.map(new MapFunction<ObjectNode, Tuple2<String,String>>() {
       private static final long serialVersionUID = -6867120202L;
       @Override
       public Tuple2<String,String> map(ObjectNode node) throws Exception {
        try {
        ObjectNode nodeValue = (ObjectNode) node.get("value");
            return new Tuple2<>(nodeValue.get("source.ip").asText(),
nodeValue.get("destination.ip").asText());
        } catch (Exception e) {
System.err.println(e);
System.out.println(node);
return null;
        }
       }
   });

Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
destinationIp, k_proctime.proctime"*);
tableEnv.registerTable("KafkaSource", kafkaSource);
* Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
(BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*

TupleTypeInfo<Tuple2<String, String>> tupleType = new TupleTypeInfo<>(
 Types.STRING(),
 Types.STRING());

DataStream<Tuple2<String,String>> outStreamMalicious =
tableEnv.toAppendStream(resultKafkaMalicious, tupleType);

Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

ObjectMapper mapper = new ObjectMapper();
DataStream<String> sinkStreamMaliciousData = outStreamMalicious
.map(new MapFunction<Tuple2<String,String>,String>() {
private static final long serialVersionUID = -6347120202L;
@Override
public String map(Tuple2<String,String> tuple) throws Exception {
try {
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", tuple.f0);
node.put("destination.ip", tuple.f1);
return node.toString();
} catch (Exception e) {
System.err.println(e);
System.out.println(tuple);
return null;
}
}
});


sinkStreamMaliciousData.addSink(new
FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
kafkaProducerProperties));
env.execute("Flink List Matching");
}
-------------------------------------------------------

On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Nishant,
>
> You should model the query as a join with a time-versioned table [1].
> The bad-ips table would be the time-time versioned table [2].
> Since it is a time-versioned table, it could even be updated with new IPs.
>
> This type of join will only keep the time-versioned table (the bad-ips in
> state) and not the other (high-volume) table.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html
>
> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
> nishantgupta1010@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for your reply
>> I have a continuous stream of kafka coming and static table of badips. I
>> wanted to segregate records having bad ip.
>>
>> So therefore i was joining it. But with that 60 gb memory getting run out
>>
>> So i used below query.
>> Can u please guide me in this regard
>>
>> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The query that you wrote is not a time-windowed join.
>>>
>>> INSERT INTO sourceKafkaMalicious
>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>> sourceKafka.`source.ip`=badips.ip
>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>
>>> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
>>> (or event time) attribute of badips.
>>>
>>> What exactly are you trying to achieve with the query?
>>>
>>> Best, Fabian
>>>
>>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>>> nishantgupta1010@gmail.com>:
>>>
>>>> Hi Team,
>>>>
>>>> I am running a query for Time Window Join as below
>>>>
>>>> INSERT INTO sourceKafkaMalicious
>>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>>> sourceKafka.`source.ip`=badips.ip
>>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>>
>>>> Time windowed join, Flink SQL should automatically clear older records, Some
>>>> how the query does not clear the heapspace and fails with error after
>>>> sometime.
>>>>
>>>> Can you please let me know what could go wrong, or is it a issue
>>>>
>>>> Environment File chunks
>>>>
>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> tables:
>>>>   - name: sourceKafka
>>>>     type: source-table
>>>>     update-mode: append
>>>>     connector:
>>>>       type: kafka
>>>>       version: "universal"
>>>>       topic: test-data-flatten
>>>>       properties:
>>>>         - key: zookeeper.connect
>>>>           value: x.x.x.x:2181
>>>>         - key: bootstrap.servers
>>>>           value: x.x.x.x:9092
>>>>         - key: group.id
>>>>           value: testgroup
>>>>     format:
>>>>       type: json
>>>>       fail-on-missing-field: false
>>>>       json-schema: >
>>>>         {
>>>>           type: 'object',
>>>>           properties: {
>>>>             'source.ip': {
>>>>                type: 'string'
>>>>             },
>>>>             'source.port': {
>>>>                type: 'string'
>>>>             }
>>>>           }
>>>>         }
>>>>       derive-schema: false
>>>>     schema:
>>>>       - name: ' source.ip '
>>>>         type: VARCHAR
>>>>       - name: 'source.port'
>>>>         type: VARCHAR
>>>>
>>>>   - name: sourceKafkaMalicious
>>>>     type: sink-table
>>>>     update-mode: append
>>>>     connector:
>>>>       type: kafka
>>>>       version: "universal"
>>>>       topic: test-data-mal
>>>>       properties:
>>>>         - key: zookeeper.connect
>>>>           value: x.x.x.x:2181
>>>>         - key: bootstrap.servers
>>>>           value: x.x.x.x:9092
>>>>         - key: group.id
>>>>           value: testgroupmal
>>>>     format:
>>>>       type: json
>>>>       fail-on-missing-field: false
>>>>       json-schema: >
>>>>         {
>>>>           type: 'object',
>>>>           properties: {
>>>>             'source.ip': {
>>>>                type: 'string'
>>>>             },
>>>>             'source.port': {
>>>>                type: 'string'
>>>>             }
>>>>           }
>>>>         }
>>>>       derive-schema: false
>>>>     schema:
>>>>       - name: ' source.ip '
>>>>         type: VARCHAR
>>>>       - name: 'source.port'
>>>>         type: VARCHAR
>>>>
>>>>   - name: badips
>>>>     type: source-table
>>>>     #update-mode: append
>>>>     connector:
>>>>       type: filesystem
>>>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>>     format:
>>>>       type: csv
>>>>       fields:
>>>>         - name: ip
>>>>           type: VARCHAR
>>>>       comment-prefix: "#"
>>>>     schema:
>>>>       - name: ip
>>>>         type: VARCHAR
>>>>
>>>> execution:
>>>>   planner: blink
>>>>   type: streaming
>>>>   time-characteristic: event-time
>>>>   periodic-watermarks-interval: 200
>>>>   result-mode: table
>>>>   max-table-result-rows: 1000000
>>>>   parallelism: 3
>>>>   max-parallelism: 128
>>>>   min-idle-state-retention: 0
>>>>   max-idle-state-retention: 0
>>>>   restart-strategy:
>>>>     type: fallback
>>>>
>>>> configuration:
>>>>   table.optimizer.join-reorder-enabled: true
>>>>   table.exec.spill-compression.enabled: true
>>>>   table.exec.spill-compression.block-size: 128kb
>>>>  Properties that describe the cluster to which table programs are
>>>> submitted to.
>>>>
>>>> deployment:
>>>>   response-timeout: 5000
>>>>
>>>>
>>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>
>>>

Re: Time Window Flink SQL join

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Nishant,

You should model the query as a join with a time-versioned table [1].
The bad-ips table would be the time-time versioned table [2].
Since it is a time-versioned table, it could even be updated with new IPs.

This type of join will only keep the time-versioned table (the bad-ips in
state) and not the other (high-volume) table.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html

Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
nishantgupta1010@gmail.com>:

> Hi Fabian,
>
> Thanks for your reply
> I have a continuous stream of kafka coming and static table of badips. I
> wanted to segregate records having bad ip.
>
> So therefore i was joining it. But with that 60 gb memory getting run out
>
> So i used below query.
> Can u please guide me in this regard
>
> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> The query that you wrote is not a time-windowed join.
>>
>> INSERT INTO sourceKafkaMalicious
>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>> sourceKafka.`source.ip`=badips.ip
>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
>> '15' MINUTE AND CURRENT_TIMESTAMP;
>>
>> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
>> (or event time) attribute of badips.
>>
>> What exactly are you trying to achieve with the query?
>>
>> Best, Fabian
>>
>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>> nishantgupta1010@gmail.com>:
>>
>>> Hi Team,
>>>
>>> I am running a query for Time Window Join as below
>>>
>>> INSERT INTO sourceKafkaMalicious
>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>> sourceKafka.`source.ip`=badips.ip
>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>
>>> Time windowed join, Flink SQL should automatically clear older records, Some
>>> how the query does not clear the heapspace and fails with error after
>>> sometime.
>>>
>>> Can you please let me know what could go wrong, or is it a issue
>>>
>>> Environment File chunks
>>>
>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>> tables:
>>>   - name: sourceKafka
>>>     type: source-table
>>>     update-mode: append
>>>     connector:
>>>       type: kafka
>>>       version: "universal"
>>>       topic: test-data-flatten
>>>       properties:
>>>         - key: zookeeper.connect
>>>           value: x.x.x.x:2181
>>>         - key: bootstrap.servers
>>>           value: x.x.x.x:9092
>>>         - key: group.id
>>>           value: testgroup
>>>     format:
>>>       type: json
>>>       fail-on-missing-field: false
>>>       json-schema: >
>>>         {
>>>           type: 'object',
>>>           properties: {
>>>             'source.ip': {
>>>                type: 'string'
>>>             },
>>>             'source.port': {
>>>                type: 'string'
>>>             }
>>>           }
>>>         }
>>>       derive-schema: false
>>>     schema:
>>>       - name: ' source.ip '
>>>         type: VARCHAR
>>>       - name: 'source.port'
>>>         type: VARCHAR
>>>
>>>   - name: sourceKafkaMalicious
>>>     type: sink-table
>>>     update-mode: append
>>>     connector:
>>>       type: kafka
>>>       version: "universal"
>>>       topic: test-data-mal
>>>       properties:
>>>         - key: zookeeper.connect
>>>           value: x.x.x.x:2181
>>>         - key: bootstrap.servers
>>>           value: x.x.x.x:9092
>>>         - key: group.id
>>>           value: testgroupmal
>>>     format:
>>>       type: json
>>>       fail-on-missing-field: false
>>>       json-schema: >
>>>         {
>>>           type: 'object',
>>>           properties: {
>>>             'source.ip': {
>>>                type: 'string'
>>>             },
>>>             'source.port': {
>>>                type: 'string'
>>>             }
>>>           }
>>>         }
>>>       derive-schema: false
>>>     schema:
>>>       - name: ' source.ip '
>>>         type: VARCHAR
>>>       - name: 'source.port'
>>>         type: VARCHAR
>>>
>>>   - name: badips
>>>     type: source-table
>>>     #update-mode: append
>>>     connector:
>>>       type: filesystem
>>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>>     format:
>>>       type: csv
>>>       fields:
>>>         - name: ip
>>>           type: VARCHAR
>>>       comment-prefix: "#"
>>>     schema:
>>>       - name: ip
>>>         type: VARCHAR
>>>
>>> execution:
>>>   planner: blink
>>>   type: streaming
>>>   time-characteristic: event-time
>>>   periodic-watermarks-interval: 200
>>>   result-mode: table
>>>   max-table-result-rows: 1000000
>>>   parallelism: 3
>>>   max-parallelism: 128
>>>   min-idle-state-retention: 0
>>>   max-idle-state-retention: 0
>>>   restart-strategy:
>>>     type: fallback
>>>
>>> configuration:
>>>   table.optimizer.join-reorder-enabled: true
>>>   table.exec.spill-compression.enabled: true
>>>   table.exec.spill-compression.block-size: 128kb
>>>  Properties that describe the cluster to which table programs are
>>> submitted to.
>>>
>>> deployment:
>>>   response-timeout: 5000
>>>
>>>
>>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>
>>

Re: Time Window Flink SQL join

Posted by Nishant Gupta <ni...@gmail.com>.
Hi Fabian,

Thanks for your reply
I have a continuous stream of kafka coming and static table of badips. I
wanted to segregate records having bad ip.

So therefore i was joining it. But with that 60 gb memory getting run out

So i used below query.
Can u please guide me in this regard

On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> The query that you wrote is not a time-windowed join.
>
> INSERT INTO sourceKafkaMalicious
> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
> sourceKafka.`source.ip`=badips.ip
> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
> '15' MINUTE AND CURRENT_TIMESTAMP;
>
> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
> (or event time) attribute of badips.
>
> What exactly are you trying to achieve with the query?
>
> Best, Fabian
>
> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
> nishantgupta1010@gmail.com>:
>
>> Hi Team,
>>
>> I am running a query for Time Window Join as below
>>
>> INSERT INTO sourceKafkaMalicious
>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>> sourceKafka.`source.ip`=badips.ip
>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
>> '15' MINUTE AND CURRENT_TIMESTAMP;
>>
>> Time windowed join, Flink SQL should automatically clear older records, Some
>> how the query does not clear the heapspace and fails with error after
>> sometime.
>>
>> Can you please let me know what could go wrong, or is it a issue
>>
>> Environment File chunks
>>
>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>> tables:
>>   - name: sourceKafka
>>     type: source-table
>>     update-mode: append
>>     connector:
>>       type: kafka
>>       version: "universal"
>>       topic: test-data-flatten
>>       properties:
>>         - key: zookeeper.connect
>>           value: x.x.x.x:2181
>>         - key: bootstrap.servers
>>           value: x.x.x.x:9092
>>         - key: group.id
>>           value: testgroup
>>     format:
>>       type: json
>>       fail-on-missing-field: false
>>       json-schema: >
>>         {
>>           type: 'object',
>>           properties: {
>>             'source.ip': {
>>                type: 'string'
>>             },
>>             'source.port': {
>>                type: 'string'
>>             }
>>           }
>>         }
>>       derive-schema: false
>>     schema:
>>       - name: ' source.ip '
>>         type: VARCHAR
>>       - name: 'source.port'
>>         type: VARCHAR
>>
>>   - name: sourceKafkaMalicious
>>     type: sink-table
>>     update-mode: append
>>     connector:
>>       type: kafka
>>       version: "universal"
>>       topic: test-data-mal
>>       properties:
>>         - key: zookeeper.connect
>>           value: x.x.x.x:2181
>>         - key: bootstrap.servers
>>           value: x.x.x.x:9092
>>         - key: group.id
>>           value: testgroupmal
>>     format:
>>       type: json
>>       fail-on-missing-field: false
>>       json-schema: >
>>         {
>>           type: 'object',
>>           properties: {
>>             'source.ip': {
>>                type: 'string'
>>             },
>>             'source.port': {
>>                type: 'string'
>>             }
>>           }
>>         }
>>       derive-schema: false
>>     schema:
>>       - name: ' source.ip '
>>         type: VARCHAR
>>       - name: 'source.port'
>>         type: VARCHAR
>>
>>   - name: badips
>>     type: source-table
>>     #update-mode: append
>>     connector:
>>       type: filesystem
>>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>     format:
>>       type: csv
>>       fields:
>>         - name: ip
>>           type: VARCHAR
>>       comment-prefix: "#"
>>     schema:
>>       - name: ip
>>         type: VARCHAR
>>
>> execution:
>>   planner: blink
>>   type: streaming
>>   time-characteristic: event-time
>>   periodic-watermarks-interval: 200
>>   result-mode: table
>>   max-table-result-rows: 1000000
>>   parallelism: 3
>>   max-parallelism: 128
>>   min-idle-state-retention: 0
>>   max-idle-state-retention: 0
>>   restart-strategy:
>>     type: fallback
>>
>> configuration:
>>   table.optimizer.join-reorder-enabled: true
>>   table.exec.spill-compression.enabled: true
>>   table.exec.spill-compression.block-size: 128kb
>>  Properties that describe the cluster to which table programs are
>> submitted to.
>>
>> deployment:
>>   response-timeout: 5000
>>
>>
>> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>

Re: Time Window Flink SQL join

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

The query that you wrote is not a time-windowed join.

INSERT INTO sourceKafkaMalicious
SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.`source.ip`=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

The problem is the use of CURRENT_TIMESTAMP instead of a processing time
(or event time) attribute of badips.

What exactly are you trying to achieve with the query?

Best, Fabian

Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
nishantgupta1010@gmail.com>:

> Hi Team,
>
> I am running a query for Time Window Join as below
>
> INSERT INTO sourceKafkaMalicious
> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
> sourceKafka.`source.ip`=badips.ip
> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
> '15' MINUTE AND CURRENT_TIMESTAMP;
>
> Time windowed join, Flink SQL should automatically clear older records, Some
> how the query does not clear the heapspace and fails with error after
> sometime.
>
> Can you please let me know what could go wrong, or is it a issue
>
> Environment File chunks
>
> --------------------------------------------------------------------------------------------------------------------------------------------------------------
> tables:
>   - name: sourceKafka
>     type: source-table
>     update-mode: append
>     connector:
>       type: kafka
>       version: "universal"
>       topic: test-data-flatten
>       properties:
>         - key: zookeeper.connect
>           value: x.x.x.x:2181
>         - key: bootstrap.servers
>           value: x.x.x.x:9092
>         - key: group.id
>           value: testgroup
>     format:
>       type: json
>       fail-on-missing-field: false
>       json-schema: >
>         {
>           type: 'object',
>           properties: {
>             'source.ip': {
>                type: 'string'
>             },
>             'source.port': {
>                type: 'string'
>             }
>           }
>         }
>       derive-schema: false
>     schema:
>       - name: ' source.ip '
>         type: VARCHAR
>       - name: 'source.port'
>         type: VARCHAR
>
>   - name: sourceKafkaMalicious
>     type: sink-table
>     update-mode: append
>     connector:
>       type: kafka
>       version: "universal"
>       topic: test-data-mal
>       properties:
>         - key: zookeeper.connect
>           value: x.x.x.x:2181
>         - key: bootstrap.servers
>           value: x.x.x.x:9092
>         - key: group.id
>           value: testgroupmal
>     format:
>       type: json
>       fail-on-missing-field: false
>       json-schema: >
>         {
>           type: 'object',
>           properties: {
>             'source.ip': {
>                type: 'string'
>             },
>             'source.port': {
>                type: 'string'
>             }
>           }
>         }
>       derive-schema: false
>     schema:
>       - name: ' source.ip '
>         type: VARCHAR
>       - name: 'source.port'
>         type: VARCHAR
>
>   - name: badips
>     type: source-table
>     #update-mode: append
>     connector:
>       type: filesystem
>       path: "/home/cyanadmin/ipsum/levels/badips.csv"
>     format:
>       type: csv
>       fields:
>         - name: ip
>           type: VARCHAR
>       comment-prefix: "#"
>     schema:
>       - name: ip
>         type: VARCHAR
>
> execution:
>   planner: blink
>   type: streaming
>   time-characteristic: event-time
>   periodic-watermarks-interval: 200
>   result-mode: table
>   max-table-result-rows: 1000000
>   parallelism: 3
>   max-parallelism: 128
>   min-idle-state-retention: 0
>   max-idle-state-retention: 0
>   restart-strategy:
>     type: fallback
>
> configuration:
>   table.optimizer.join-reorder-enabled: true
>   table.exec.spill-compression.enabled: true
>   table.exec.spill-compression.block-size: 128kb
>  Properties that describe the cluster to which table programs are
> submitted to.
>
> deployment:
>   response-timeout: 5000
>
>
> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>