You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Ramesh Bhojan <rb...@gmail.com> on 2016/02/06 00:42:29 UTC

Re: Need Help with Samza - Changelog System

Jason,
Can we please share more information about the exact stack trace and the
job configuration, especially the Kafka producer configuration for the
changelog system, as requested by Yi Pan?

Regards,
Ramesh

On Thu, Feb 4, 2016 at 11:56 AM, Ramesh Bhojan <rb...@gmail.com>
wrote:

> Dear team @ Samza,
> I would really appreciate some help with the following question posted in
> Stack Overflow :
>
>
> http://stackoverflow.com/questions/35168641/is-there-a-configuration-setting-to-allow-large-values-in-my-samza-store
>
> Thanks,
> Ramesh
>

Re: Need Help with Samza - Changelog System

Posted by Jagadish Venkatraman <ja...@gmail.com>.
Another pointer is to run the job with DEBUG level turned on and share the
logs with us.

On Fri, Feb 5, 2016 at 5:05 PM, Jagadish Venkatraman <jagadish1989@gmail.com
> wrote:

> Hey Jason,
>
> Can you share the entire container log? It will be useful to find out what
> went wrong.
> If this is a non-yarn, it will be also useful if you share the JobRunner
> logs.
>
> Thanks,
> Jagadish
>
> On Fri, Feb 5, 2016 at 4:33 PM, Jason Erickson <ja...@stormpath.com>
> wrote:
>
>> Is the Kafka producer configuration different than the Samza configuration
>> of the Samza task that references the store? If not, here is are those
>> configuration values.  The changelog in question is
>> resourceStore-changelog.
>>
>> # Job
>> job.factory.class=org.apache.samza.job.local.ThreadJobFactory
>> job.name=resource_normalizer
>>
>> job.coordinator.system=kafka
>>
>> # Task
>>
>> task.class=com.foo.blazer.resource.normalizer.samza.ResourceNormalizerSamzaTask
>> task.inputs=kafka.com.foo.iam.indexing.resource.mutation
>> task.window.ms=10000
>>
>> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>> task.checkpoint.system=kafka
>> # Normally, this would be 3, but we have only one broker.
>> task.checkpoint.replication.factor=3
>> task.checkpoint.skip-migration=true
>>
>> # Serializers
>>
>> serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
>>
>> serializers.registry.entity.class=com.foo.blazer.resource.normalizer.serde.ResourceEventEntitySerdeFactory
>>
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>>
>> serializers.registry.int.class=org.apache.samza.serializers.IntegerSerdeFactory
>>
>> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
>>
>> # Systems
>>
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> systems.kafka.samza.msg.serde=byte
>> systems.kafka.samza.offset.default=oldest
>> systems.kafka.consumer.zookeeper.connect=${ZK_NODES}/${ZK_ROOT}
>> systems.kafka.consumer.auto.offset.reset=smallest
>> systems.kafka.producer.bootstrap.servers=${KAFKA_NODES}
>> systems.kafka.producer.max.request.size=52428800
>> systems.kafka.streams.metrics.samza.msg.serde=metrics
>>
>> # Metrics
>>
>> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactor
>> metrics.reporter.snapshot.stream=kafka.metrics
>>
>> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
>> metrics.reporters=snapshot,jmx
>>
>> # Stores
>>
>> stores.resourceStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
>> stores.resourceStore.changelog=kafka.resourceStore-changelog
>> stores.resourceStore.changelog.replication.factor=3
>>
>> stores.resourceStore.key.serde=string
>> stores.resourceStore.msg.serde=entity
>>
>>
>> stores.deletedFlagStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
>> stores.deletedFlagStore.changelog=kafka.deletedFlagStore-changelog
>> stores.deletedFlagStore.changelog.replication.factor=3
>>
>> stores.deletedFlagStore.key.serde=string
>> stores.deletedFlagStore.msg.serde=int
>>
>>
>>
>> We do not get a stack trace from the Samza task itself, it just never
>> seems
>> to fully start.  However if we use kafka-console-consumer to try to
>> examine
>> the changelog we get this:
>>
>> 2016-02-02 22:10:11,252] ERROR Error processing message, terminating
>> consumer process:  (kafka.tools.ConsoleConsumer$)
>> kafka.common.MessageSizeTooLargeException: Found a message larger than the
>> maximum fetch size of this consumer on topic resourceStore-changelog
>> partition 7 at fetch offset 0. Increase the fetch size, or decrease the
>> maximum message size the broker will allow.
>> at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
>> at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>> at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>> at kafka.consumer.OldConsumer.receive(BaseConsumer.scala:79)
>> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:110)
>> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
>> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
>> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
>>
>>
>> On Fri, Feb 5, 2016 at 3:42 PM, Ramesh Bhojan <rb...@gmail.com>
>> wrote:
>>
>> > Jason,
>> > Can we please share more information about the exact stack trace and the
>> > job configuration, especially the Kafka producer configuration for the
>> > changelog system, as requested by Yi Pan?
>> >
>> > Regards,
>> > Ramesh
>> >
>> > On Thu, Feb 4, 2016 at 11:56 AM, Ramesh Bhojan <
>> rbhojan.social@gmail.com>
>> > wrote:
>> >
>> >> Dear team @ Samza,
>> >> I would really appreciate some help with the following question posted
>> in
>> >> Stack Overflow :
>> >>
>> >>
>> >>
>> http://stackoverflow.com/questions/35168641/is-there-a-configuration-setting-to-allow-large-values-in-my-samza-store
>> >>
>> >> Thanks,
>> >> Ramesh
>> >>
>> >
>> >
>>
>>
>> --
>> Thanks,
>>
>> Jason Erickson
>>
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Re: Need Help with Samza - Changelog System

Posted by Jagadish Venkatraman <ja...@gmail.com>.
Hey Jason,

Can you share the entire container log? It will be useful to find out what
went wrong.
If this is a non-yarn, it will be also useful if you share the JobRunner
logs.

Thanks,
Jagadish

On Fri, Feb 5, 2016 at 4:33 PM, Jason Erickson <ja...@stormpath.com> wrote:

> Is the Kafka producer configuration different than the Samza configuration
> of the Samza task that references the store? If not, here is are those
> configuration values.  The changelog in question is
> resourceStore-changelog.
>
> # Job
> job.factory.class=org.apache.samza.job.local.ThreadJobFactory
> job.name=resource_normalizer
>
> job.coordinator.system=kafka
>
> # Task
>
> task.class=com.foo.blazer.resource.normalizer.samza.ResourceNormalizerSamzaTask
> task.inputs=kafka.com.foo.iam.indexing.resource.mutation
> task.window.ms=10000
>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> task.checkpoint.system=kafka
> # Normally, this would be 3, but we have only one broker.
> task.checkpoint.replication.factor=3
> task.checkpoint.skip-migration=true
>
> # Serializers
>
> serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
>
> serializers.registry.entity.class=com.foo.blazer.resource.normalizer.serde.ResourceEventEntitySerdeFactory
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> serializers.registry.int.class=org.apache.samza.serializers.IntegerSerdeFactory
>
> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
>
> # Systems
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.msg.serde=byte
> systems.kafka.samza.offset.default=oldest
> systems.kafka.consumer.zookeeper.connect=${ZK_NODES}/${ZK_ROOT}
> systems.kafka.consumer.auto.offset.reset=smallest
> systems.kafka.producer.bootstrap.servers=${KAFKA_NODES}
> systems.kafka.producer.max.request.size=52428800
> systems.kafka.streams.metrics.samza.msg.serde=metrics
>
> # Metrics
>
> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactor
> metrics.reporter.snapshot.stream=kafka.metrics
>
> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
> metrics.reporters=snapshot,jmx
>
> # Stores
>
> stores.resourceStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
> stores.resourceStore.changelog=kafka.resourceStore-changelog
> stores.resourceStore.changelog.replication.factor=3
>
> stores.resourceStore.key.serde=string
> stores.resourceStore.msg.serde=entity
>
>
> stores.deletedFlagStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
> stores.deletedFlagStore.changelog=kafka.deletedFlagStore-changelog
> stores.deletedFlagStore.changelog.replication.factor=3
>
> stores.deletedFlagStore.key.serde=string
> stores.deletedFlagStore.msg.serde=int
>
>
>
> We do not get a stack trace from the Samza task itself, it just never seems
> to fully start.  However if we use kafka-console-consumer to try to examine
> the changelog we get this:
>
> 2016-02-02 22:10:11,252] ERROR Error processing message, terminating
> consumer process:  (kafka.tools.ConsoleConsumer$)
> kafka.common.MessageSizeTooLargeException: Found a message larger than the
> maximum fetch size of this consumer on topic resourceStore-changelog
> partition 7 at fetch offset 0. Increase the fetch size, or decrease the
> maximum message size the broker will allow.
> at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
> at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> at kafka.consumer.OldConsumer.receive(BaseConsumer.scala:79)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:110)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
>
>
> On Fri, Feb 5, 2016 at 3:42 PM, Ramesh Bhojan <rb...@gmail.com>
> wrote:
>
> > Jason,
> > Can we please share more information about the exact stack trace and the
> > job configuration, especially the Kafka producer configuration for the
> > changelog system, as requested by Yi Pan?
> >
> > Regards,
> > Ramesh
> >
> > On Thu, Feb 4, 2016 at 11:56 AM, Ramesh Bhojan <rbhojan.social@gmail.com
> >
> > wrote:
> >
> >> Dear team @ Samza,
> >> I would really appreciate some help with the following question posted
> in
> >> Stack Overflow :
> >>
> >>
> >>
> http://stackoverflow.com/questions/35168641/is-there-a-configuration-setting-to-allow-large-values-in-my-samza-store
> >>
> >> Thanks,
> >> Ramesh
> >>
> >
> >
>
>
> --
> Thanks,
>
> Jason Erickson
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Re: Need Help with Samza - Changelog System

Posted by Jason Erickson <ja...@stormpath.com>.
Is the Kafka producer configuration different than the Samza configuration
of the Samza task that references the store? If not, here is are those
configuration values.  The changelog in question is resourceStore-changelog.

# Job
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
job.name=resource_normalizer

job.coordinator.system=kafka

# Task
task.class=com.foo.blazer.resource.normalizer.samza.ResourceNormalizerSamzaTask
task.inputs=kafka.com.foo.iam.indexing.resource.mutation
task.window.ms=10000
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=3
task.checkpoint.skip-migration=true

# Serializers
serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
serializers.registry.entity.class=com.foo.blazer.resource.normalizer.serde.ResourceEventEntitySerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.int.class=org.apache.samza.serializers.IntegerSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=byte
systems.kafka.samza.offset.default=oldest
systems.kafka.consumer.zookeeper.connect=${ZK_NODES}/${ZK_ROOT}
systems.kafka.consumer.auto.offset.reset=smallest
systems.kafka.producer.bootstrap.servers=${KAFKA_NODES}
systems.kafka.producer.max.request.size=52428800
systems.kafka.streams.metrics.samza.msg.serde=metrics

# Metrics
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactor
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
metrics.reporters=snapshot,jmx

# Stores
stores.resourceStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.resourceStore.changelog=kafka.resourceStore-changelog
stores.resourceStore.changelog.replication.factor=3

stores.resourceStore.key.serde=string
stores.resourceStore.msg.serde=entity

stores.deletedFlagStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.deletedFlagStore.changelog=kafka.deletedFlagStore-changelog
stores.deletedFlagStore.changelog.replication.factor=3

stores.deletedFlagStore.key.serde=string
stores.deletedFlagStore.msg.serde=int



We do not get a stack trace from the Samza task itself, it just never seems
to fully start.  However if we use kafka-console-consumer to try to examine
the changelog we get this:

2016-02-02 22:10:11,252] ERROR Error processing message, terminating
consumer process:  (kafka.tools.ConsoleConsumer$)
kafka.common.MessageSizeTooLargeException: Found a message larger than the
maximum fetch size of this consumer on topic resourceStore-changelog
partition 7 at fetch offset 0. Increase the fetch size, or decrease the
maximum message size the broker will allow.
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at kafka.consumer.OldConsumer.receive(BaseConsumer.scala:79)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:110)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)


On Fri, Feb 5, 2016 at 3:42 PM, Ramesh Bhojan <rb...@gmail.com>
wrote:

> Jason,
> Can we please share more information about the exact stack trace and the
> job configuration, especially the Kafka producer configuration for the
> changelog system, as requested by Yi Pan?
>
> Regards,
> Ramesh
>
> On Thu, Feb 4, 2016 at 11:56 AM, Ramesh Bhojan <rb...@gmail.com>
> wrote:
>
>> Dear team @ Samza,
>> I would really appreciate some help with the following question posted in
>> Stack Overflow :
>>
>>
>> http://stackoverflow.com/questions/35168641/is-there-a-configuration-setting-to-allow-large-values-in-my-samza-store
>>
>> Thanks,
>> Ramesh
>>
>
>


-- 
Thanks,

Jason Erickson