You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Mhaskar, Tushar" <tm...@paypal.com.INVALID> on 2015/08/26 21:12:32 UTC
key not found exception while starting a job with RocksDB kv store
Hi All,
I am getting the following exception while starting a samza job
2015-08-26 12:05:13 VerifiableProperties [INFO] Verifying properties
2015-08-26 12:05:13 VerifiableProperties [INFO] Property auto.offset.reset is overridden to smallest
2015-08-26 12:05:13 VerifiableProperties [INFO] Property client.id is overridden to samza_consumer-samza_parser-1-1440615913700-5
2015-08-26 12:05:13 VerifiableProperties [INFO] Property group.id is overridden to undefined-samza-consumer-group-d005e5a6-7ee8-49ff-a6f7-782a1404402a
2015-08-26 12:05:13 VerifiableProperties [INFO] Property zookeeper.connect is overridden to 10.25.106.183:2181/
2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers: Map(samza-parser -> org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)
2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers: Map(samza-parser -> org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)
2015-08-26 12:05:13 SamzaContainerExceptionHandler [ERROR] Uncaught exception in thread (name=main). Exiting process now.
java.util.NoSuchElementException: key not found: string
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(SamzaContainer.scala:456)
at org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(SamzaContainer.scala:448)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer.scala:448)
at org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer.scala:425)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:425)
at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Below is my samza-parser.properties file.
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=samza-parser
# YARN
yarn.package.path=file:///Documents/workspace/hello-samza/target/hello-samza-0.9.1-dist.tar.gz
# Task
task.class=samza.examples.wikipedia.task.Parser
task.inputs=kafka.samza_test
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1
task.window.ms=5000
# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.streams.metrics.samza.msg.serde=metrics
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=smallest
systems.kafka.producer.bootstrap.servers=localhost:9092
# Key-value storage
stores.samza-parser.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.samza-parser.changelog=kafka.samza-parser-changelog
stores.samza-parser.key.serde=string
stores.samza-parser.msg.serde=integer
# Normally, we'd leave this alone, but we have only one broker.
stores.samza-parser.changelog.replication.factor=1
# Normally, we'd set this much higher, but we want things to look snappy in the demo.
stores.samza-parser.write.batch.size=0
stores.samza-parser.object.cache.size=0
Can some one tell me where I am going wrong.
Regards,
Tushar Mhaskar
Re: key not found exception while starting a job with RocksDB kv
store
Posted by "Mhaskar, Tushar" <tm...@paypal.com.INVALID>.
Added the below in properties file and the error was gone.
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeF
actory
Regards,
Tushar Mhaskar
Cell : 213-572-7867
Skype : tmhaskarpp
On 8/26/15, 12:12 PM, "Mhaskar, Tushar" <tm...@paypal.com.INVALID>
wrote:
>Hi All,
>
>I am getting the following exception while starting a samza job
>
>
>2015-08-26 12:05:13 VerifiableProperties [INFO] Verifying properties
>2015-08-26 12:05:13 VerifiableProperties [INFO] Property
>auto.offset.reset is overridden to smallest
>2015-08-26 12:05:13 VerifiableProperties [INFO] Property client.id is
>overridden to samza_consumer-samza_parser-1-1440615913700-5
>2015-08-26 12:05:13 VerifiableProperties [INFO] Property group.id is
>overridden to
>undefined-samza-consumer-group-d005e5a6-7ee8-49ff-a6f7-782a1404402a
>2015-08-26 12:05:13 VerifiableProperties [INFO] Property
>zookeeper.connect is overridden to 10.25.106.183:2181/
>2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers:
>Map(samza-parser ->
>org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)
>
>2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers:
>Map(samza-parser ->
>org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)
>2015-08-26 12:05:13 SamzaContainerExceptionHandler [ERROR] Uncaught
>exception in thread (name=main). Exiting process now.
>java.util.NoSuchElementException: key not found: string
>
>at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.MapLike$class.apply(MapLike.scala:141)
> at scala.collection.AbstractMap.apply(Map.scala:58)
> at
>org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(Sa
>mzaContainer.scala:456)
> at
>org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(Sa
>mzaContainer.scala:448)
> at
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:244)
> at
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:244)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at
>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
>org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer
>.scala:448)
> at
>org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer
>.scala:425)
> at
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:244)
> at
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
>scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
>org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:425)
> at
>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:9
>3)
> at
>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
> at
>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
>
>Below is my samza-parser.properties file.
>
>
># Job
>
>job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>
>job.name=samza-parser
>
>
># YARN
>
>yarn.package.path=file:///Documents/workspace/hello-samza/target/hello-sam
>za-0.9.1-dist.tar.gz
>
>
># Task
>
>task.class=samza.examples.wikipedia.task.Parser
>
>task.inputs=kafka.samza_test
>
>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointM
>anagerFactory
>
>task.checkpoint.system=kafka
>
>
>
>
># Normally, this would be 3, but we have only one broker.
>
>task.checkpoint.replication.factor=1
>
>task.window.ms=5000
>
>
>
>
># Metrics
>
>metrics.reporters=snapshot,jmx
>
>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsS
>napshotReporterFactory
>
>metrics.reporter.snapshot.stream=kafka.metrics
>
>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFa
>ctory
>
>
># Serializers
>
>serializers.registry.json.class=org.apache.samza.serializers.StringSerdeFa
>ctory
>
>serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSna
>pshotSerdeFactory
>
># Systems
>
>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFacto
>ry
>
>systems.kafka.samza.msg.serde=string
>
>systems.kafka.streams.metrics.samza.msg.serde=metrics
>
>systems.kafka.consumer.zookeeper.connect=localhost:2181/
>
>systems.kafka.consumer.auto.offset.reset=smallest
>
>systems.kafka.producer.bootstrap.servers=localhost:9092
>
>
># Key-value storage
>
>
>stores.samza-parser.factory=org.apache.samza.storage.kv.RocksDbKeyValueSto
>rageEngineFactory
>
>stores.samza-parser.changelog=kafka.samza-parser-changelog
>
>stores.samza-parser.key.serde=string
>
>stores.samza-parser.msg.serde=integer
>
>
># Normally, we'd leave this alone, but we have only one broker.
>
>stores.samza-parser.changelog.replication.factor=1
>
>
># Normally, we'd set this much higher, but we want things to look snappy
>in the demo.
>
>stores.samza-parser.write.batch.size=0
>
>stores.samza-parser.object.cache.size=0
>
>
>Can some one tell me where I am going wrong.
>
>
>Regards,
>Tushar Mhaskar
>