You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Talat Uyarer <tu...@paloaltonetworks.com> on 2021/08/11 00:03:31 UTC

Kafka Offset Commit

Hi Samza Community,

This is my first email. Forgive my lack of knowledge about samza. I am
running a testing job in my environment. I run in local model but somehow
my job is processing data however it does not commit offset on Kafka side.
I use an apache beam samza runner.

My pipeline is simply read from kafka write to GCS bucket. Do you have any
idea where I should look for debugging this issue?

This is my job.properties file

app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
job.coordinator.zk.connect=localhost:2181
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
job.config.rewriters=env-config
job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter
job.default.system=filereader
systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
job.container.thread.pool.size=300
job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory
task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
task.checkpoint.path=/home/checkpoints

Thanks for your help in advance.

Talat

Re: Kafka Offset Commit

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi Bharath,

Thank you for the detailed explanation. My code does not use samza
directly. We use Beam's Samza Runner. And I use Beam's KafkaIO. When I
check the internal of Beam KafkaIO they use kafka consumer client and they
use Assign mode. So even though Kafka does not assign partitions, Beam code
uses kafka consumer groups and reads data from per partition by itself. I
also checked that Samza Runner code looks like they run Beam's code. Am I
missing something else ?

Thanks

On Wed, Aug 11, 2021 at 3:37 PM Bharath Kumara Subramanian <
codin.martial@gmail.com> wrote:

> Hi Talat,
>
> It is expected behavior since Samza uses low level Kafka consumer instead
> of the high level consumer. As a result, offset management is done by Samza
> and doesn't leverage the offset management that Kafka consumer has by
> default.
>
> Additionally, kafka consumer group doesn't apply to samza as well since
> Samza manages the assignments of Kafka partitions to tasks and doesn't
> leverage Kafka's high level consumer behavior to assign its partition to
> different consumers.
>
> Hope that answers your question.
>
> Thank you,
> Bharath
>
> On Tue, Aug 10, 2021 at 9:41 PM Talat Uyarer <tuyarer@paloaltonetworks.com
> >
> wrote:
>
> > Thank you rayman. But my question is when i check kafka consumer group of
> > the job. I dont see any offset movement. I chose to store checkpoints on
> > file system. Do you think because of that i dont see my job's consumer
> > group does not move offset ?
> >
> >
> >
> > On Tue, Aug 10, 2021, 9:32 PM rayman preet <ra...@gmail.com> wrote:
> >
> > > Hi Talat,
> > >
> > > Since in the job.properties the task.checkpoint.factory is set to
> > > FileSystemCheckpointManagerFactory
> > > and not
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory.
> > > That is why its writing checkpoints to the filesystem (with its.
> location
> > > controlled by task.checkpoint.path).
> > >
> > >
> > >
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__samza.apache.org_learn_documentation_1.0.0_container_checkpointing.html&d=DwIBaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=sDaqSy0r4X9x43flCgkiuMeZhtbLCX9uwEMkqxtvQ2g&s=vlTnWi-4Xxnk52pXrMZTfQekBoDWp66hovL2E_qi-Z8&e=
> > >
> > > has details on the configs we need to add to enable checkpointing to
> > kafka
> > > for a job.
> > >
> > > thanks
> > >
> > >
> > > On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer <
> > tuyarer@paloaltonetworks.com
> > > >
> > > wrote:
> > >
> > > > Hi Samza Community,
> > > >
> > > > This is my first email. Forgive my lack of knowledge about samza. I
> am
> > > > running a testing job in my environment. I run in local model but
> > somehow
> > > > my job is processing data however it does not commit offset on Kafka
> > > side.
> > > > I use an apache beam samza runner.
> > > >
> > > > My pipeline is simply read from kafka write to GCS bucket. Do you
> have
> > > any
> > > > idea where I should look for debugging this issue?
> > > >
> > > > This is my job.properties file
> > > >
> > > > app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
> > > > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
> > > > job.coordinator.zk.connect=localhost:2181
> > > >
> > > >
> > >
> >
> task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
> > > > job.config.rewriters=env-config
> > > >
> > > >
> > >
> >
> job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter
> > > > job.default.system=filereader
> > > >
> > > >
> > >
> >
> systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > > > job.container.thread.pool.size=300
> > > >
> > > >
> > >
> >
> job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory
> > > >
> > > >
> > >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
> > > > task.checkpoint.path=/home/checkpoints
> > > >
> > > > Thanks for your help in advance.
> > > >
> > > > Talat
> > > >
> > >
> > >
> > > --
> > > thanks
> > > rayman
> > >
> >
>

Re: Kafka Offset Commit

Posted by Bharath Kumara Subramanian <co...@gmail.com>.
Hi Talat,

It is expected behavior since Samza uses low level Kafka consumer instead
of the high level consumer. As a result, offset management is done by Samza
and doesn't leverage the offset management that Kafka consumer has by
default.

Additionally, kafka consumer group doesn't apply to samza as well since
Samza manages the assignments of Kafka partitions to tasks and doesn't
leverage Kafka's high level consumer behavior to assign its partition to
different consumers.

Hope that answers your question.

Thank you,
Bharath

On Tue, Aug 10, 2021 at 9:41 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Thank you rayman. But my question is when i check kafka consumer group of
> the job. I dont see any offset movement. I chose to store checkpoints on
> file system. Do you think because of that i dont see my job's consumer
> group does not move offset ?
>
>
>
> On Tue, Aug 10, 2021, 9:32 PM rayman preet <ra...@gmail.com> wrote:
>
> > Hi Talat,
> >
> > Since in the job.properties the task.checkpoint.factory is set to
> > FileSystemCheckpointManagerFactory
> > and not org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory.
> > That is why its writing checkpoints to the filesystem (with its. location
> > controlled by task.checkpoint.path).
> >
> >
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__samza.apache.org_learn_documentation_1.0.0_container_checkpointing.html&d=DwIBaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=sDaqSy0r4X9x43flCgkiuMeZhtbLCX9uwEMkqxtvQ2g&s=vlTnWi-4Xxnk52pXrMZTfQekBoDWp66hovL2E_qi-Z8&e=
> >
> > has details on the configs we need to add to enable checkpointing to
> kafka
> > for a job.
> >
> > thanks
> >
> >
> > On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer <
> tuyarer@paloaltonetworks.com
> > >
> > wrote:
> >
> > > Hi Samza Community,
> > >
> > > This is my first email. Forgive my lack of knowledge about samza. I am
> > > running a testing job in my environment. I run in local model but
> somehow
> > > my job is processing data however it does not commit offset on Kafka
> > side.
> > > I use an apache beam samza runner.
> > >
> > > My pipeline is simply read from kafka write to GCS bucket. Do you have
> > any
> > > idea where I should look for debugging this issue?
> > >
> > > This is my job.properties file
> > >
> > > app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
> > > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
> > > job.coordinator.zk.connect=localhost:2181
> > >
> > >
> >
> task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
> > > job.config.rewriters=env-config
> > >
> > >
> >
> job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter
> > > job.default.system=filereader
> > >
> > >
> >
> systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > > job.container.thread.pool.size=300
> > >
> > >
> >
> job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory
> > >
> > >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
> > > task.checkpoint.path=/home/checkpoints
> > >
> > > Thanks for your help in advance.
> > >
> > > Talat
> > >
> >
> >
> > --
> > thanks
> > rayman
> >
>

Re: Kafka Offset Commit

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Thank you rayman. But my question is when i check kafka consumer group of
the job. I dont see any offset movement. I chose to store checkpoints on
file system. Do you think because of that i dont see my job's consumer
group does not move offset ?



On Tue, Aug 10, 2021, 9:32 PM rayman preet <ra...@gmail.com> wrote:

> Hi Talat,
>
> Since in the job.properties the task.checkpoint.factory is set to
> FileSystemCheckpointManagerFactory
> and not org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory.
> That is why its writing checkpoints to the filesystem (with its. location
> controlled by task.checkpoint.path).
>
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__samza.apache.org_learn_documentation_1.0.0_container_checkpointing.html&d=DwIBaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=sDaqSy0r4X9x43flCgkiuMeZhtbLCX9uwEMkqxtvQ2g&s=vlTnWi-4Xxnk52pXrMZTfQekBoDWp66hovL2E_qi-Z8&e=
>
> has details on the configs we need to add to enable checkpointing to kafka
> for a job.
>
> thanks
>
>
> On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer <tuyarer@paloaltonetworks.com
> >
> wrote:
>
> > Hi Samza Community,
> >
> > This is my first email. Forgive my lack of knowledge about samza. I am
> > running a testing job in my environment. I run in local model but somehow
> > my job is processing data however it does not commit offset on Kafka
> side.
> > I use an apache beam samza runner.
> >
> > My pipeline is simply read from kafka write to GCS bucket. Do you have
> any
> > idea where I should look for debugging this issue?
> >
> > This is my job.properties file
> >
> > app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
> > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
> > job.coordinator.zk.connect=localhost:2181
> >
> >
> task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
> > job.config.rewriters=env-config
> >
> >
> job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter
> > job.default.system=filereader
> >
> >
> systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > job.container.thread.pool.size=300
> >
> >
> job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory
> >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
> > task.checkpoint.path=/home/checkpoints
> >
> > Thanks for your help in advance.
> >
> > Talat
> >
>
>
> --
> thanks
> rayman
>

Re: Kafka Offset Commit

Posted by rayman preet <ra...@gmail.com>.
Hi Talat,

Since in the job.properties the task.checkpoint.factory is set to
FileSystemCheckpointManagerFactory
and not org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory.
That is why its writing checkpoints to the filesystem (with its. location
controlled by task.checkpoint.path).

https://samza.apache.org/learn/documentation/1.0.0/container/checkpointing.html

has details on the configs we need to add to enable checkpointing to kafka
for a job.

thanks


On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Hi Samza Community,
>
> This is my first email. Forgive my lack of knowledge about samza. I am
> running a testing job in my environment. I run in local model but somehow
> my job is processing data however it does not commit offset on Kafka side.
> I use an apache beam samza runner.
>
> My pipeline is simply read from kafka write to GCS bucket. Do you have any
> idea where I should look for debugging this issue?
>
> This is my job.properties file
>
> app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
> job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
> job.coordinator.zk.connect=localhost:2181
>
> task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
> job.config.rewriters=env-config
>
> job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter
> job.default.system=filereader
>
> systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> job.container.thread.pool.size=300
>
> job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory
>
> task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
> task.checkpoint.path=/home/checkpoints
>
> Thanks for your help in advance.
>
> Talat
>


-- 
thanks
rayman