You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Eleanore Jin <el...@gmail.com> on 2020/08/03 20:22:48 UTC
Beam 2.23.0 KafkaIO with EOS does not work
Hi all,
I have running beam version 2.23.0, with Flink runner version 1.8.2.
The pipeline is:
read from topic with name ieos4 with 4 partitions and publish to topic
oeos8 with 8 partitions.
There is no transformation in between.
I am running inside IDE with flink local mode (so both flink job manager
and task manager runs in the same JVM).
From consumer:
return KafkaIO.<String, Message>read()
.withBootstrapServers(kafkaSettings.getBootstrapServers())
.withTopic(topic)
.withKeyDeserializer(MessageKeyDeserializer.class)
.withValueDeserializerAndCoder(getDeserializer(encoding), MESSAGE_CODER)
.withConsumerConfigUpdates(consumerConfig)
.commitOffsetsInFinalize()
.withReadCommitted()
.withoutMetadata();
From producer:
streamWithKeys.apply("writeToTopic-" + dto.getName(), KafkaIO.<String,
Message>write()
.withBootstrapServers(kafkaSettings.getBootstrapServers())
.withKeySerializer(MessageKeySerializer.class)
.withValueSerializer(getSerializer(dto.getEncoding()))
.withTopic(topicName)
.withEOS(8, UUID.randomUUID().toString())
.withProducerConfigUpdates(getKafkaProducerProperties(stepMetricsPrefix)));
with parallelism and max parallelism set to 4.
I have kafka running locally on my laptop.
The behaviour I observe is I published 100 messages to input topic,
and only receives half of the messages
from output topic, this experiment has been repeated a few times.
Also I see from the log:
[image: image.png]
Any suggestions what is wrong?
Thanks a lot!
Eleanore
Re: Beam 2.23.0 KafkaIO with EOS does not work
Posted by Eleanore Jin <el...@gmail.com>.
I did more experiment, noticed that if i change the FS to do asyncSnapshot,
then it works fine, with EOS, I got all the messages from output topic. I
wonder why this is the case, any suggestions?
public class FsBackendFactory implements FlinkStateBackendFactory {
@Override
public StateBackend createStateBackend(FlinkPipelineOptions options) {
return new FsStateBackend("file:///tmp/checkpoints", true);
}
}
Thanks a lot!
Eleanore
On Tue, Aug 4, 2020 at 10:58 AM Eleanore Jin <el...@gmail.com> wrote:
> I did more experiment, please see the table below, so from the
> observation, it looks like for the setting, the Shard cannot be more than
> 3. I just wonder why this is the case? Any help is really appreciated!
>
> Thanks
> Eleanore
>
> Beam version
>
> Input topic partition
>
> Output topic partition
>
> parallelism
>
> Max parallelism
>
> Shard
>
> Flink mode
>
> work fine
>
> 2.23.0
>
> 4
>
> 8
>
> 1
>
> 1
>
> 1
>
> Local/IDE
>
> yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 1
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 1
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 2
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 3
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 4
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 8
>
> 8
>
> 6
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 8
>
> 8
>
> 4
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 8
>
> 8
>
> 3
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 8
>
> 3
>
> Local/IDE
>
> Yes
>
> 2.23.0
>
> 8
>
> 8
>
> 4
>
> 4
>
> 4
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 8
>
> 8
>
> 8
>
> 8
>
> 6
>
> Local/IDE
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 4
>
> Standalone docker
>
> No
>
> 2.23.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 3
>
> Standalone docker
>
> Yes
>
> 2.16.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 3
>
> Local/IDE
>
> Yes
>
> 2.16.0
>
> 4
>
> 8
>
> 4
>
> 4
>
> 4
>
> Local/IDE
>
> No
>
> On Mon, Aug 3, 2020 at 1:22 PM Eleanore Jin <el...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I have running beam version 2.23.0, with Flink runner version 1.8.2.
>>
>> The pipeline is:
>> read from topic with name ieos4 with 4 partitions and publish to topic
>> oeos8 with 8 partitions.
>> There is no transformation in between.
>>
>> I am running inside IDE with flink local mode (so both flink job manager
>> and task manager runs in the same JVM).
>>
>> From consumer:
>>
>> return KafkaIO.<String, Message>read()
>> .withBootstrapServers(kafkaSettings.getBootstrapServers())
>> .withTopic(topic)
>> .withKeyDeserializer(MessageKeyDeserializer.class)
>> .withValueDeserializerAndCoder(getDeserializer(encoding), MESSAGE_CODER)
>> .withConsumerConfigUpdates(consumerConfig)
>> .commitOffsetsInFinalize()
>> .withReadCommitted()
>> .withoutMetadata();
>>
>>
>> From producer:
>>
>> streamWithKeys.apply("writeToTopic-" + dto.getName(), KafkaIO.<String, Message>write()
>> .withBootstrapServers(kafkaSettings.getBootstrapServers())
>> .withKeySerializer(MessageKeySerializer.class)
>> .withValueSerializer(getSerializer(dto.getEncoding()))
>> .withTopic(topicName)
>> .withEOS(8, UUID.randomUUID().toString())
>> .withProducerConfigUpdates(getKafkaProducerProperties(stepMetricsPrefix)));
>>
>> with parallelism and max parallelism set to 4.
>>
>>
>> I have kafka running locally on my laptop.
>>
>> The behaviour I observe is I published 100 messages to input topic, and only receives half of the messages
>>
>> from output topic, this experiment has been repeated a few times.
>>
>>
>> Also I see from the log:
>>
>> [image: image.png]
>>
>> Any suggestions what is wrong?
>> Thanks a lot!
>> Eleanore
>>
>>
Re: Beam 2.23.0 KafkaIO with EOS does not work
Posted by Eleanore Jin <el...@gmail.com>.
I did more experiment, please see the table below, so from the observation,
it looks like for the setting, the Shard cannot be more than 3. I just
wonder why this is the case? Any help is really appreciated!
Thanks
Eleanore
Beam version
Input topic partition
Output topic partition
parallelism
Max parallelism
Shard
Flink mode
work fine
2.23.0
4
8
1
1
1
Local/IDE
yes
2.23.0
4
8
4
4
1
Local/IDE
Yes
2.23.0
4
8
4
4
1
Local/IDE
Yes
2.23.0
4
8
4
4
2
Local/IDE
Yes
2.23.0
4
8
4
4
3
Local/IDE
Yes
2.23.0
4
8
4
4
4
Local/IDE
No
2.23.0
4
8
8
8
6
Local/IDE
No
2.23.0
4
8
8
8
4
Local/IDE
No
2.23.0
4
8
8
8
3
Local/IDE
Yes
2.23.0
4
8
4
8
3
Local/IDE
Yes
2.23.0
8
8
4
4
4
Local/IDE
No
2.23.0
8
8
8
8
6
Local/IDE
No
2.23.0
4
8
4
4
4
Standalone docker
No
2.23.0
4
8
4
4
3
Standalone docker
Yes
2.16.0
4
8
4
4
3
Local/IDE
Yes
2.16.0
4
8
4
4
4
Local/IDE
No
On Mon, Aug 3, 2020 at 1:22 PM Eleanore Jin <el...@gmail.com> wrote:
> Hi all,
>
> I have running beam version 2.23.0, with Flink runner version 1.8.2.
>
> The pipeline is:
> read from topic with name ieos4 with 4 partitions and publish to topic
> oeos8 with 8 partitions.
> There is no transformation in between.
>
> I am running inside IDE with flink local mode (so both flink job manager
> and task manager runs in the same JVM).
>
> From consumer:
>
> return KafkaIO.<String, Message>read()
> .withBootstrapServers(kafkaSettings.getBootstrapServers())
> .withTopic(topic)
> .withKeyDeserializer(MessageKeyDeserializer.class)
> .withValueDeserializerAndCoder(getDeserializer(encoding), MESSAGE_CODER)
> .withConsumerConfigUpdates(consumerConfig)
> .commitOffsetsInFinalize()
> .withReadCommitted()
> .withoutMetadata();
>
>
> From producer:
>
> streamWithKeys.apply("writeToTopic-" + dto.getName(), KafkaIO.<String, Message>write()
> .withBootstrapServers(kafkaSettings.getBootstrapServers())
> .withKeySerializer(MessageKeySerializer.class)
> .withValueSerializer(getSerializer(dto.getEncoding()))
> .withTopic(topicName)
> .withEOS(8, UUID.randomUUID().toString())
> .withProducerConfigUpdates(getKafkaProducerProperties(stepMetricsPrefix)));
>
> with parallelism and max parallelism set to 4.
>
>
> I have kafka running locally on my laptop.
>
> The behaviour I observe is I published 100 messages to input topic, and only receives half of the messages
>
> from output topic, this experiment has been repeated a few times.
>
>
> Also I see from the log:
>
> [image: image.png]
>
> Any suggestions what is wrong?
> Thanks a lot!
> Eleanore
>
>