You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Eleanore Jin <el...@gmail.com> on 2020/08/03 20:22:48 UTC

Beam 2.23.0 KafkaIO with EOS does not work

Hi all,

I have running beam version 2.23.0, with Flink runner version 1.8.2.

The pipeline is:
read from topic with name ieos4 with 4 partitions and publish to topic
oeos8 with 8 partitions.
There is no transformation in between.

I am running inside IDE with flink local mode (so both flink job manager
and task manager runs in the same JVM).

From consumer:

return KafkaIO.<String, Message>read()
  .withBootstrapServers(kafkaSettings.getBootstrapServers())
  .withTopic(topic)
  .withKeyDeserializer(MessageKeyDeserializer.class)
  .withValueDeserializerAndCoder(getDeserializer(encoding), MESSAGE_CODER)
  .withConsumerConfigUpdates(consumerConfig)
  .commitOffsetsInFinalize()
  .withReadCommitted()
  .withoutMetadata();


From producer:

streamWithKeys.apply("writeToTopic-" + dto.getName(), KafkaIO.<String,
Message>write()
  .withBootstrapServers(kafkaSettings.getBootstrapServers())
  .withKeySerializer(MessageKeySerializer.class)
  .withValueSerializer(getSerializer(dto.getEncoding()))
  .withTopic(topicName)
  .withEOS(8, UUID.randomUUID().toString())
  .withProducerConfigUpdates(getKafkaProducerProperties(stepMetricsPrefix)));

with parallelism and max parallelism set to 4.


I have kafka running locally on my laptop.

The behaviour I observe is I published 100 messages to input topic,
and only receives half of the messages

from output topic, this experiment has been repeated a few times.


Also I see from the log:

[image: image.png]

Any suggestions what is wrong?
Thanks a lot!
Eleanore

Re: Beam 2.23.0 KafkaIO with EOS does not work

Posted by Eleanore Jin <el...@gmail.com>.
I did more experiment, noticed that if i change the FS to do asyncSnapshot,
then it works fine, with EOS, I got all the messages from output topic. I
wonder why this is the case, any suggestions?

public class FsBackendFactory implements FlinkStateBackendFactory {
  @Override
  public StateBackend createStateBackend(FlinkPipelineOptions options) {
    return new FsStateBackend("file:///tmp/checkpoints", true);
  }
}

Thanks a lot!

Eleanore


On Tue, Aug 4, 2020 at 10:58 AM Eleanore Jin <el...@gmail.com> wrote:

> I did more experiment, please see the table below, so from the
> observation, it looks like for the setting, the Shard cannot be more than
> 3. I just wonder why this is the case? Any help is really appreciated!
>
> Thanks
> Eleanore
>
> Beam version
>
> Input topic partition
>
> Output topic partition
>
> parallelism
>
> Max parallelism
>
> Shard
>
> Flink mode
>
> work fine
>
> 2.23.0
>
> 4
>
> 8
>
> 1
>
> 1
>
> 1
>
> Local/IDE
>
> yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 1
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 1
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 2
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 3
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 4
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 8
>
> 8
>
> 6
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 8
>
> 8
>
> 4
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 8
>
> 8
>
> 3
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 8
>
> 3
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 8
>
> 8
>
> 4
>
> 4
>
> 4
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 8
>
> 8
>
> 8
>
> 8
>
> 6
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 4
>
> Standalone docker
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 3
>
> Standalone docker
>
> Yes
>
> 2.16.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 3
>
> Local/IDE
>
> Yes
>
> 2.16.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 4
>
> Local/IDE
>
> No
>
> On Mon, Aug 3, 2020 at 1:22 PM Eleanore Jin <el...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I have running beam version 2.23.0, with Flink runner version 1.8.2.
>>
>> The pipeline is:
>> read from topic with name ieos4 with 4 partitions and publish to topic
>> oeos8 with 8 partitions.
>> There is no transformation in between.
>>
>> I am running inside IDE with flink local mode (so both flink job manager
>> and task manager runs in the same JVM).
>>
>> From consumer:
>>
>> return KafkaIO.<String, Message>read()
>>   .withBootstrapServers(kafkaSettings.getBootstrapServers())
>>   .withTopic(topic)
>>   .withKeyDeserializer(MessageKeyDeserializer.class)
>>   .withValueDeserializerAndCoder(getDeserializer(encoding), MESSAGE_CODER)
>>   .withConsumerConfigUpdates(consumerConfig)
>>   .commitOffsetsInFinalize()
>>   .withReadCommitted()
>>   .withoutMetadata();
>>
>>
>> From producer:
>>
>> streamWithKeys.apply("writeToTopic-" + dto.getName(), KafkaIO.<String, Message>write()
>>   .withBootstrapServers(kafkaSettings.getBootstrapServers())
>>   .withKeySerializer(MessageKeySerializer.class)
>>   .withValueSerializer(getSerializer(dto.getEncoding()))
>>   .withTopic(topicName)
>>   .withEOS(8, UUID.randomUUID().toString())
>>   .withProducerConfigUpdates(getKafkaProducerProperties(stepMetricsPrefix)));
>>
>> with parallelism and max parallelism set to 4.
>>
>>
>> I have kafka running locally on my laptop.
>>
>> The behaviour I observe is I published 100 messages to input topic, and only receives half of the messages
>>
>> from output topic, this experiment has been repeated a few times.
>>
>>
>> Also I see from the log:
>>
>> [image: image.png]
>>
>> Any suggestions what is wrong?
>> Thanks a lot!
>> Eleanore
>>
>>

Re: Beam 2.23.0 KafkaIO with EOS does not work

Posted by Eleanore Jin <el...@gmail.com>.
I did more experiment, please see the table below, so from the observation,
it looks like for the setting, the Shard cannot be more than 3. I just
wonder why this is the case? Any help is really appreciated!

Thanks
Eleanore

Beam version

Input topic partition

Output topic partition

parallelism

Max parallelism

Shard

Flink mode

work fine

2.23.0

4

8

1

1

1

Local/IDE

yes

2.23.0

4

8

4

4

1

Local/IDE

Yes

2.23.0

4

8

4

4

1

Local/IDE

Yes

2.23.0

4

8

4

4

2

Local/IDE

Yes

2.23.0

4

8

4

4

3

Local/IDE

Yes

2.23.0

4

8

4

4

4

Local/IDE

No

2.23.0

4

8

8

8

6

Local/IDE

No

2.23.0

4

8

8

8

4

Local/IDE

No

2.23.0

4

8

8

8

3

Local/IDE

Yes

2.23.0

4

8

4

8

3

Local/IDE

Yes

2.23.0

8

8

4

4

4

Local/IDE

No

2.23.0

8

8

8

8

6

Local/IDE

No

2.23.0

4

8

4

4

4

Standalone docker

No

2.23.0

4

8

4

4

3

Standalone docker

Yes

2.16.0

4

8

4

4

3

Local/IDE

Yes

2.16.0

4

8

4

4

4

Local/IDE

No

On Mon, Aug 3, 2020 at 1:22 PM Eleanore Jin <el...@gmail.com> wrote:

> Hi all,
>
> I have running beam version 2.23.0, with Flink runner version 1.8.2.
>
> The pipeline is:
> read from topic with name ieos4 with 4 partitions and publish to topic
> oeos8 with 8 partitions.
> There is no transformation in between.
>
> I am running inside IDE with flink local mode (so both flink job manager
> and task manager runs in the same JVM).
>
> From consumer:
>
> return KafkaIO.<String, Message>read()
>   .withBootstrapServers(kafkaSettings.getBootstrapServers())
>   .withTopic(topic)
>   .withKeyDeserializer(MessageKeyDeserializer.class)
>   .withValueDeserializerAndCoder(getDeserializer(encoding), MESSAGE_CODER)
>   .withConsumerConfigUpdates(consumerConfig)
>   .commitOffsetsInFinalize()
>   .withReadCommitted()
>   .withoutMetadata();
>
>
> From producer:
>
> streamWithKeys.apply("writeToTopic-" + dto.getName(), KafkaIO.<String, Message>write()
>   .withBootstrapServers(kafkaSettings.getBootstrapServers())
>   .withKeySerializer(MessageKeySerializer.class)
>   .withValueSerializer(getSerializer(dto.getEncoding()))
>   .withTopic(topicName)
>   .withEOS(8, UUID.randomUUID().toString())
>   .withProducerConfigUpdates(getKafkaProducerProperties(stepMetricsPrefix)));
>
> with parallelism and max parallelism set to 4.
>
>
> I have kafka running locally on my laptop.
>
> The behaviour I observe is I published 100 messages to input topic, and only receives half of the messages
>
> from output topic, this experiment has been repeated a few times.
>
>
> Also I see from the log:
>
> [image: image.png]
>
> Any suggestions what is wrong?
> Thanks a lot!
> Eleanore
>
>