You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kevin Kwon <fs...@gmail.com> on 2020/11/02 23:01:01 UTC

I have some interesting result with my test code

Hi guys, I've been recently experimenting with end-to-end testing
environment with Kafka and Flink (1.11)

I've setup an infrastructure with Docker Compose composed of single Kafka
broker / Flink (1.11) / MinIO for checkpoint saves

Here's the test scenario

1. Send 1000 messages with manual timestamp assigned to each event
increased by 100 milliseconds per loop (first message and last message has
a difference of 100 seconds). There are 3 partitions for the topic I'm
writing to. Below code is the test message producer using Confluent's
Python SDK

order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
    order_producer.produce(
        topic="order",
        key={"key": i % 100},
        value={
            "id": 1000,
            "customerId": i % 10,
            "timestamp": current_timestamp + i * 100
        }
    )
    order_producer.flush()


2. Flink performs an SQL query on this stream and publishes it back to
Kafka topic that has 3 partitions. Below is the SQL code

| SELECT
|   o.id,
|   COUNT(*),
|   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
| FROM
|   order o
| GROUP BY
|   o.id,
|   TUMBLE(o.ts, INTERVAL '5' SECONDS)

So I expect the sum of all the counts of the result to be equal to 1000 but
it seems that a lot of messages are missing (797 as below). I can't seem to
figure out why though. I'm using event time for the environment

[image: Screenshot 2020-11-02 at 23.35.23.png]

*Below is the configuration code*
Here's the code for the consumer settings for Kafka

private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
  val properties = new Properties()
  properties.setProperty("bootstrap.servers", kafkaBrokers)
  properties.setProperty("group.id", "awesome_order")

  val kafkaConsumer = new FlinkKafkaConsumer[Order](
    "order",
    ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
      classOf[Order],
      kafkaSchemaRegistry
    ),
    properties
  )
  kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
  kafkaConsumer.setStartFromGroupOffsets()
  kafkaConsumer.assignTimestampsAndWatermarks {
    WatermarkStrategy
      .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
      .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
        override def extractTimestamp(order: Order, recordTimestamp:
Long): Long = {
          order.getTimestamp
        }
      })
  }
  kafkaConsumer
}

Afterwards,
1. I create a tempview from this source data stream
2. perform SQL queries on it
3. append it back to a processed datastream
4. attach the stream to kafka sink

Here's the code for the producer settings for Kafka

private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
  val properties: Properties = new Properties()
  properties.put("bootstrap.servers", kafkaBrokers)
  properties.put("transaction.timeout.ms", "60000")

  val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
    "processed_model",
    ConfluentRegistryAvroSerializationSchema.forSpecific(
      classOf[ProcessedModel],
      "procssed_model-value",
      kafkaSchemaRegistry
    ),
    properties,
    null,
    Semantic.EXACTLY_ONCE,
    5
  )
  kafkaProducer
}



*Side Note*
Another interesting part is that, if I flush "after" publishing all
events, the processed event doesn't even seem to arrive at the sink at
all. The source is still populated in normally in Flink. It's as if
there is no progress after the message arrived to source

order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
    order_producer.produce(
        topic="order",
        key={"key": i % 100},
        value={
            "id": 1000,
            "customerId": i % 10,
            "timestamp": current_timestamp + i * 100
        }
    )
    order_producer.flush()  # if I flush "AFTER" the loop, there is no
processed data in the sink of Flink. event itself arrives without any
problem in the source in Flink though

Re: I have some interesting result with my test code

Posted by Jark Wu <im...@gmail.com>.
Great to hear it works!

`setStartFromGroupOffset` [1] will start reading partitions from the
consumer group’s (group.id setting in the consumer properties) committed
offsets in Kafka brokers. If offsets could not be found for a partition,
the 'auto.offset.reset' setting in the properties will be used. And the
default value of 'auto.offset.reset' property is latest [2].

I think that's why `setStartFromGroupOffset` doesn't consume all the events.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
[2]: https://kafka.apache.org/documentation/#auto.offset.reset

On Fri, 6 Nov 2020 at 07:04, Kevin Kwon <fs...@gmail.com> wrote:

> Hi Jark, setStartFromEarliest actually worked. It's strange since my test
> is stateless (complete teardown of all docker containers) and the consumer
> creates the topic once it starts consuming a topic. I was assuming the
> setStartFromGroupOffset will let the consumer consume from the beginning
> anyways. I'll share the code if I have any further problems, since I can't
> just copy paste code created inside my company
>
> Thanks though! I appreciate your help
>
> On Thu, Nov 5, 2020 at 4:55 AM Jark Wu <im...@gmail.com> wrote:
>
>> Hi Kevin,
>>
>> Could you share the code of how you register the FlinkKafkaConsumer as a
>> table?
>>
>> Regarding your initialization of FlinkKafkaConsumer, I would recommend to
>> setStartFromEarliest() to guarantee it consumes all the records in
>> partitions.
>>
>> Regarding the flush(), it seems it is in the foreach loop? So it is not flushing
>> after publishing ALL events?
>> I'm not experienced with the flush() API, could this method block and the
>> following random events can't be published to Kafka?
>>
>> Best,
>> Jark
>>
>> On Wed, 4 Nov 2020 at 04:04, Robert Metzger <rm...@apache.org> wrote:
>>
>>> Hi Kevin,
>>> thanks a lot for posting this problem.
>>> I'm adding Jark to the thread, he or another committer working on Flink
>>> SQL can maybe provide some insights.
>>>
>>> On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <fs...@gmail.com> wrote:
>>>
>>>> Looks like the event time that I've specified in the consumer is not
>>>> being respected. Does the timestamp assigner actually work in Kafka
>>>> consumers?
>>>>
>>>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>>>         override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
>>>>           order.getTimestamp
>>>>         }
>>>>       })
>>>>
>>>>
>>>> On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fs...@gmail.com> wrote:
>>>>
>>>>> Hi guys, I've been recently experimenting with end-to-end testing
>>>>> environment with Kafka and Flink (1.11)
>>>>>
>>>>> I've setup an infrastructure with Docker Compose composed of single
>>>>> Kafka broker / Flink (1.11) / MinIO for checkpoint saves
>>>>>
>>>>> Here's the test scenario
>>>>>
>>>>> 1. Send 1000 messages with manual timestamp assigned to each event
>>>>> increased by 100 milliseconds per loop (first message and last message has
>>>>> a difference of 100 seconds). There are 3 partitions for the topic I'm
>>>>> writing to. Below code is the test message producer using Confluent's
>>>>> Python SDK
>>>>>
>>>>> order_producer = get_order_producer()
>>>>> current_timestamp = int(round(time() * 1000))
>>>>> for i in range(0, 1000):
>>>>>     order_producer.produce(
>>>>>         topic="order",
>>>>>         key={"key": i % 100},
>>>>>         value={
>>>>>             "id": 1000,
>>>>>             "customerId": i % 10,
>>>>>             "timestamp": current_timestamp + i * 100
>>>>>         }
>>>>>     )
>>>>>     order_producer.flush()
>>>>>
>>>>>
>>>>> 2. Flink performs an SQL query on this stream and publishes it back to
>>>>> Kafka topic that has 3 partitions. Below is the SQL code
>>>>>
>>>>> | SELECT
>>>>> |   o.id,
>>>>> |   COUNT(*),
>>>>> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
>>>>> | FROM
>>>>> |   order o
>>>>> | GROUP BY
>>>>> |   o.id,
>>>>> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>>>>>
>>>>> So I expect the sum of all the counts of the result to be equal to
>>>>> 1000 but it seems that a lot of messages are missing (797 as below). I
>>>>> can't seem to figure out why though. I'm using event time for the
>>>>> environment
>>>>>
>>>>> [image: Screenshot 2020-11-02 at 23.35.23.png]
>>>>>
>>>>> *Below is the configuration code*
>>>>> Here's the code for the consumer settings for Kafka
>>>>>
>>>>> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>>>>>   val properties = new Properties()
>>>>>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>>>>>   properties.setProperty("group.id", "awesome_order")
>>>>>
>>>>>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
>>>>>     "order",
>>>>>     ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>>>>>       classOf[Order],
>>>>>       kafkaSchemaRegistry
>>>>>     ),
>>>>>     properties
>>>>>   )
>>>>>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>>>>>   kafkaConsumer.setStartFromGroupOffsets()
>>>>>   kafkaConsumer.assignTimestampsAndWatermarks {
>>>>>     WatermarkStrategy
>>>>>       .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>>>>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>>>>         override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
>>>>>           order.getTimestamp
>>>>>         }
>>>>>       })
>>>>>   }
>>>>>   kafkaConsumer
>>>>> }
>>>>>
>>>>> Afterwards,
>>>>> 1. I create a tempview from this source data stream
>>>>> 2. perform SQL queries on it
>>>>> 3. append it back to a processed datastream
>>>>> 4. attach the stream to kafka sink
>>>>>
>>>>> Here's the code for the producer settings for Kafka
>>>>>
>>>>> private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
>>>>>   val properties: Properties = new Properties()
>>>>>   properties.put("bootstrap.servers", kafkaBrokers)
>>>>>   properties.put("transaction.timeout.ms", "60000")
>>>>>
>>>>>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
>>>>>     "processed_model",
>>>>>     ConfluentRegistryAvroSerializationSchema.forSpecific(
>>>>>       classOf[ProcessedModel],
>>>>>       "procssed_model-value",
>>>>>       kafkaSchemaRegistry
>>>>>     ),
>>>>>     properties,
>>>>>     null,
>>>>>     Semantic.EXACTLY_ONCE,
>>>>>     5
>>>>>   )
>>>>>   kafkaProducer
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> *Side Note*
>>>>> Another interesting part is that, if I flush "after" publishing all events, the processed event doesn't even seem to arrive at the sink at all. The source is still populated in normally in Flink. It's as if there is no progress after the message arrived to source
>>>>>
>>>>> order_producer = get_order_producer()
>>>>> current_timestamp = int(round(time() * 1000))
>>>>> for i in range(0, 1000):
>>>>>     order_producer.produce(
>>>>>         topic="order",
>>>>>         key={"key": i % 100},
>>>>>         value={
>>>>>             "id": 1000,
>>>>>             "customerId": i % 10,
>>>>>             "timestamp": current_timestamp + i * 100
>>>>>         }
>>>>>     )
>>>>>     order_producer.flush()  # if I flush "AFTER" the loop, there is no processed data in the sink of Flink. event itself arrives without any problem in the source in Flink though
>>>>>
>>>>>

Re: I have some interesting result with my test code

Posted by Jark Wu <im...@gmail.com>.
Hi Kevin,

Could you share the code of how you register the FlinkKafkaConsumer as a
table?

Regarding your initialization of FlinkKafkaConsumer, I would recommend to
setStartFromEarliest() to guarantee it consumes all the records in
partitions.

Regarding the flush(), it seems it is in the foreach loop? So it is
not flushing
after publishing ALL events?
I'm not experienced with the flush() API, could this method block and the
following random events can't be published to Kafka?

Best,
Jark

On Wed, 4 Nov 2020 at 04:04, Robert Metzger <rm...@apache.org> wrote:

> Hi Kevin,
> thanks a lot for posting this problem.
> I'm adding Jark to the thread, he or another committer working on Flink
> SQL can maybe provide some insights.
>
> On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <fs...@gmail.com> wrote:
>
>> Looks like the event time that I've specified in the consumer is not
>> being respected. Does the timestamp assigner actually work in Kafka
>> consumers?
>>
>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>         override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
>>           order.getTimestamp
>>         }
>>       })
>>
>>
>> On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fs...@gmail.com> wrote:
>>
>>> Hi guys, I've been recently experimenting with end-to-end testing
>>> environment with Kafka and Flink (1.11)
>>>
>>> I've setup an infrastructure with Docker Compose composed of single
>>> Kafka broker / Flink (1.11) / MinIO for checkpoint saves
>>>
>>> Here's the test scenario
>>>
>>> 1. Send 1000 messages with manual timestamp assigned to each event
>>> increased by 100 milliseconds per loop (first message and last message has
>>> a difference of 100 seconds). There are 3 partitions for the topic I'm
>>> writing to. Below code is the test message producer using Confluent's
>>> Python SDK
>>>
>>> order_producer = get_order_producer()
>>> current_timestamp = int(round(time() * 1000))
>>> for i in range(0, 1000):
>>>     order_producer.produce(
>>>         topic="order",
>>>         key={"key": i % 100},
>>>         value={
>>>             "id": 1000,
>>>             "customerId": i % 10,
>>>             "timestamp": current_timestamp + i * 100
>>>         }
>>>     )
>>>     order_producer.flush()
>>>
>>>
>>> 2. Flink performs an SQL query on this stream and publishes it back to
>>> Kafka topic that has 3 partitions. Below is the SQL code
>>>
>>> | SELECT
>>> |   o.id,
>>> |   COUNT(*),
>>> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
>>> | FROM
>>> |   order o
>>> | GROUP BY
>>> |   o.id,
>>> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>>>
>>> So I expect the sum of all the counts of the result to be equal to 1000
>>> but it seems that a lot of messages are missing (797 as below). I can't
>>> seem to figure out why though. I'm using event time for the environment
>>>
>>> [image: Screenshot 2020-11-02 at 23.35.23.png]
>>>
>>> *Below is the configuration code*
>>> Here's the code for the consumer settings for Kafka
>>>
>>> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>>>   val properties = new Properties()
>>>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>>>   properties.setProperty("group.id", "awesome_order")
>>>
>>>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
>>>     "order",
>>>     ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>>>       classOf[Order],
>>>       kafkaSchemaRegistry
>>>     ),
>>>     properties
>>>   )
>>>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>>>   kafkaConsumer.setStartFromGroupOffsets()
>>>   kafkaConsumer.assignTimestampsAndWatermarks {
>>>     WatermarkStrategy
>>>       .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>>         override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
>>>           order.getTimestamp
>>>         }
>>>       })
>>>   }
>>>   kafkaConsumer
>>> }
>>>
>>> Afterwards,
>>> 1. I create a tempview from this source data stream
>>> 2. perform SQL queries on it
>>> 3. append it back to a processed datastream
>>> 4. attach the stream to kafka sink
>>>
>>> Here's the code for the producer settings for Kafka
>>>
>>> private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
>>>   val properties: Properties = new Properties()
>>>   properties.put("bootstrap.servers", kafkaBrokers)
>>>   properties.put("transaction.timeout.ms", "60000")
>>>
>>>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
>>>     "processed_model",
>>>     ConfluentRegistryAvroSerializationSchema.forSpecific(
>>>       classOf[ProcessedModel],
>>>       "procssed_model-value",
>>>       kafkaSchemaRegistry
>>>     ),
>>>     properties,
>>>     null,
>>>     Semantic.EXACTLY_ONCE,
>>>     5
>>>   )
>>>   kafkaProducer
>>> }
>>>
>>>
>>>
>>> *Side Note*
>>> Another interesting part is that, if I flush "after" publishing all events, the processed event doesn't even seem to arrive at the sink at all. The source is still populated in normally in Flink. It's as if there is no progress after the message arrived to source
>>>
>>> order_producer = get_order_producer()
>>> current_timestamp = int(round(time() * 1000))
>>> for i in range(0, 1000):
>>>     order_producer.produce(
>>>         topic="order",
>>>         key={"key": i % 100},
>>>         value={
>>>             "id": 1000,
>>>             "customerId": i % 10,
>>>             "timestamp": current_timestamp + i * 100
>>>         }
>>>     )
>>>     order_producer.flush()  # if I flush "AFTER" the loop, there is no processed data in the sink of Flink. event itself arrives without any problem in the source in Flink though
>>>
>>>

Re: I have some interesting result with my test code

Posted by Robert Metzger <rm...@apache.org>.
Hi Kevin,
thanks a lot for posting this problem.
I'm adding Jark to the thread, he or another committer working on Flink SQL
can maybe provide some insights.

On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <fs...@gmail.com> wrote:

> Looks like the event time that I've specified in the consumer is not being
> respected. Does the timestamp assigner actually work in Kafka consumers?
>
>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>         override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
>           order.getTimestamp
>         }
>       })
>
>
> On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fs...@gmail.com> wrote:
>
>> Hi guys, I've been recently experimenting with end-to-end testing
>> environment with Kafka and Flink (1.11)
>>
>> I've setup an infrastructure with Docker Compose composed of single Kafka
>> broker / Flink (1.11) / MinIO for checkpoint saves
>>
>> Here's the test scenario
>>
>> 1. Send 1000 messages with manual timestamp assigned to each event
>> increased by 100 milliseconds per loop (first message and last message has
>> a difference of 100 seconds). There are 3 partitions for the topic I'm
>> writing to. Below code is the test message producer using Confluent's
>> Python SDK
>>
>> order_producer = get_order_producer()
>> current_timestamp = int(round(time() * 1000))
>> for i in range(0, 1000):
>>     order_producer.produce(
>>         topic="order",
>>         key={"key": i % 100},
>>         value={
>>             "id": 1000,
>>             "customerId": i % 10,
>>             "timestamp": current_timestamp + i * 100
>>         }
>>     )
>>     order_producer.flush()
>>
>>
>> 2. Flink performs an SQL query on this stream and publishes it back to
>> Kafka topic that has 3 partitions. Below is the SQL code
>>
>> | SELECT
>> |   o.id,
>> |   COUNT(*),
>> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
>> | FROM
>> |   order o
>> | GROUP BY
>> |   o.id,
>> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>>
>> So I expect the sum of all the counts of the result to be equal to 1000
>> but it seems that a lot of messages are missing (797 as below). I can't
>> seem to figure out why though. I'm using event time for the environment
>>
>> [image: Screenshot 2020-11-02 at 23.35.23.png]
>>
>> *Below is the configuration code*
>> Here's the code for the consumer settings for Kafka
>>
>> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>>   val properties = new Properties()
>>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>>   properties.setProperty("group.id", "awesome_order")
>>
>>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
>>     "order",
>>     ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>>       classOf[Order],
>>       kafkaSchemaRegistry
>>     ),
>>     properties
>>   )
>>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>>   kafkaConsumer.setStartFromGroupOffsets()
>>   kafkaConsumer.assignTimestampsAndWatermarks {
>>     WatermarkStrategy
>>       .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>>         override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
>>           order.getTimestamp
>>         }
>>       })
>>   }
>>   kafkaConsumer
>> }
>>
>> Afterwards,
>> 1. I create a tempview from this source data stream
>> 2. perform SQL queries on it
>> 3. append it back to a processed datastream
>> 4. attach the stream to kafka sink
>>
>> Here's the code for the producer settings for Kafka
>>
>> private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
>>   val properties: Properties = new Properties()
>>   properties.put("bootstrap.servers", kafkaBrokers)
>>   properties.put("transaction.timeout.ms", "60000")
>>
>>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
>>     "processed_model",
>>     ConfluentRegistryAvroSerializationSchema.forSpecific(
>>       classOf[ProcessedModel],
>>       "procssed_model-value",
>>       kafkaSchemaRegistry
>>     ),
>>     properties,
>>     null,
>>     Semantic.EXACTLY_ONCE,
>>     5
>>   )
>>   kafkaProducer
>> }
>>
>>
>>
>> *Side Note*
>> Another interesting part is that, if I flush "after" publishing all events, the processed event doesn't even seem to arrive at the sink at all. The source is still populated in normally in Flink. It's as if there is no progress after the message arrived to source
>>
>> order_producer = get_order_producer()
>> current_timestamp = int(round(time() * 1000))
>> for i in range(0, 1000):
>>     order_producer.produce(
>>         topic="order",
>>         key={"key": i % 100},
>>         value={
>>             "id": 1000,
>>             "customerId": i % 10,
>>             "timestamp": current_timestamp + i * 100
>>         }
>>     )
>>     order_producer.flush()  # if I flush "AFTER" the loop, there is no processed data in the sink of Flink. event itself arrives without any problem in the source in Flink though
>>
>>

Re: I have some interesting result with my test code

Posted by Kevin Kwon <fs...@gmail.com>.
Looks like the event time that I've specified in the consumer is not being
respected. Does the timestamp assigner actually work in Kafka consumers?

      .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
        override def extractTimestamp(order: Order, recordTimestamp:
Long): Long = {
          order.getTimestamp
        }
      })


On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <fs...@gmail.com> wrote:

> Hi guys, I've been recently experimenting with end-to-end testing
> environment with Kafka and Flink (1.11)
>
> I've setup an infrastructure with Docker Compose composed of single Kafka
> broker / Flink (1.11) / MinIO for checkpoint saves
>
> Here's the test scenario
>
> 1. Send 1000 messages with manual timestamp assigned to each event
> increased by 100 milliseconds per loop (first message and last message has
> a difference of 100 seconds). There are 3 partitions for the topic I'm
> writing to. Below code is the test message producer using Confluent's
> Python SDK
>
> order_producer = get_order_producer()
> current_timestamp = int(round(time() * 1000))
> for i in range(0, 1000):
>     order_producer.produce(
>         topic="order",
>         key={"key": i % 100},
>         value={
>             "id": 1000,
>             "customerId": i % 10,
>             "timestamp": current_timestamp + i * 100
>         }
>     )
>     order_producer.flush()
>
>
> 2. Flink performs an SQL query on this stream and publishes it back to
> Kafka topic that has 3 partitions. Below is the SQL code
>
> | SELECT
> |   o.id,
> |   COUNT(*),
> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
> | FROM
> |   order o
> | GROUP BY
> |   o.id,
> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>
> So I expect the sum of all the counts of the result to be equal to 1000
> but it seems that a lot of messages are missing (797 as below). I can't
> seem to figure out why though. I'm using event time for the environment
>
> [image: Screenshot 2020-11-02 at 23.35.23.png]
>
> *Below is the configuration code*
> Here's the code for the consumer settings for Kafka
>
> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>   val properties = new Properties()
>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>   properties.setProperty("group.id", "awesome_order")
>
>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
>     "order",
>     ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>       classOf[Order],
>       kafkaSchemaRegistry
>     ),
>     properties
>   )
>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>   kafkaConsumer.setStartFromGroupOffsets()
>   kafkaConsumer.assignTimestampsAndWatermarks {
>     WatermarkStrategy
>       .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>       .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
>         override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
>           order.getTimestamp
>         }
>       })
>   }
>   kafkaConsumer
> }
>
> Afterwards,
> 1. I create a tempview from this source data stream
> 2. perform SQL queries on it
> 3. append it back to a processed datastream
> 4. attach the stream to kafka sink
>
> Here's the code for the producer settings for Kafka
>
> private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
>   val properties: Properties = new Properties()
>   properties.put("bootstrap.servers", kafkaBrokers)
>   properties.put("transaction.timeout.ms", "60000")
>
>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
>     "processed_model",
>     ConfluentRegistryAvroSerializationSchema.forSpecific(
>       classOf[ProcessedModel],
>       "procssed_model-value",
>       kafkaSchemaRegistry
>     ),
>     properties,
>     null,
>     Semantic.EXACTLY_ONCE,
>     5
>   )
>   kafkaProducer
> }
>
>
>
> *Side Note*
> Another interesting part is that, if I flush "after" publishing all events, the processed event doesn't even seem to arrive at the sink at all. The source is still populated in normally in Flink. It's as if there is no progress after the message arrived to source
>
> order_producer = get_order_producer()
> current_timestamp = int(round(time() * 1000))
> for i in range(0, 1000):
>     order_producer.produce(
>         topic="order",
>         key={"key": i % 100},
>         value={
>             "id": 1000,
>             "customerId": i % 10,
>             "timestamp": current_timestamp + i * 100
>         }
>     )
>     order_producer.flush()  # if I flush "AFTER" the loop, there is no processed data in the sink of Flink. event itself arrives without any problem in the source in Flink though
>
>