You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Mhaskar, Tushar" <tm...@paypal.com.INVALID> on 2015/08/26 21:12:32 UTC

key not found exception while starting a job with RocksDB kv store

Hi All,

I am getting the following exception while starting a samza job


2015-08-26 12:05:13 VerifiableProperties [INFO] Verifying properties
2015-08-26 12:05:13 VerifiableProperties [INFO] Property auto.offset.reset is overridden to smallest
2015-08-26 12:05:13 VerifiableProperties [INFO] Property client.id is overridden to samza_consumer-samza_parser-1-1440615913700-5
2015-08-26 12:05:13 VerifiableProperties [INFO] Property group.id is overridden to undefined-samza-consumer-group-d005e5a6-7ee8-49ff-a6f7-782a1404402a
2015-08-26 12:05:13 VerifiableProperties [INFO] Property zookeeper.connect is overridden to 10.25.106.183:2181/
2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers: Map(samza-parser -> org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)

2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers: Map(samza-parser -> org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)
2015-08-26 12:05:13 SamzaContainerExceptionHandler [ERROR] Uncaught exception in thread (name=main). Exiting process now.
java.util.NoSuchElementException: key not found: string

at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:58)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:58)
        at org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(SamzaContainer.scala:456)
        at org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(SamzaContainer.scala:448)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer.scala:448)
        at org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer.scala:425)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:425)
        at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
        at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)


Below is my samza-parser.properties file.


# Job

job.factory.class=org.apache.samza.job.yarn.YarnJobFactory

job.name=samza-parser


# YARN

yarn.package.path=file:///Documents/workspace/hello-samza/target/hello-samza-0.9.1-dist.tar.gz


# Task

task.class=samza.examples.wikipedia.task.Parser

task.inputs=kafka.samza_test

task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory

task.checkpoint.system=kafka




# Normally, this would be 3, but we have only one broker.

task.checkpoint.replication.factor=1

task.window.ms=5000




# Metrics

metrics.reporters=snapshot,jmx

metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory

metrics.reporter.snapshot.stream=kafka.metrics

metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory


# Serializers

serializers.registry.json.class=org.apache.samza.serializers.StringSerdeFactory

serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

# Systems

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

systems.kafka.samza.msg.serde=string

systems.kafka.streams.metrics.samza.msg.serde=metrics

systems.kafka.consumer.zookeeper.connect=localhost:2181/

systems.kafka.consumer.auto.offset.reset=smallest

systems.kafka.producer.bootstrap.servers=localhost:9092


# Key-value storage


stores.samza-parser.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory

stores.samza-parser.changelog=kafka.samza-parser-changelog

stores.samza-parser.key.serde=string

stores.samza-parser.msg.serde=integer


# Normally, we'd leave this alone, but we have only one broker.

stores.samza-parser.changelog.replication.factor=1


# Normally, we'd set this much higher, but we want things to look snappy in the demo.

stores.samza-parser.write.batch.size=0

stores.samza-parser.object.cache.size=0


Can some one tell me where I am going wrong.


Regards,
Tushar Mhaskar


Re: key not found exception while starting a job with RocksDB kv store

Posted by "Mhaskar, Tushar" <tm...@paypal.com.INVALID>.
Added the below in properties file and the error was gone.

serializers.registry.string.class=org.apache.samza.serializers.StringSerdeF
actory



Regards,
Tushar Mhaskar
Cell : 213-572-7867
Skype : tmhaskarpp




On 8/26/15, 12:12 PM, "Mhaskar, Tushar" <tm...@paypal.com.INVALID>
wrote:

>Hi All,
>
>I am getting the following exception while starting a samza job
>
>
>2015-08-26 12:05:13 VerifiableProperties [INFO] Verifying properties
>2015-08-26 12:05:13 VerifiableProperties [INFO] Property
>auto.offset.reset is overridden to smallest
>2015-08-26 12:05:13 VerifiableProperties [INFO] Property client.id is
>overridden to samza_consumer-samza_parser-1-1440615913700-5
>2015-08-26 12:05:13 VerifiableProperties [INFO] Property group.id is
>overridden to 
>undefined-samza-consumer-group-d005e5a6-7ee8-49ff-a6f7-782a1404402a
>2015-08-26 12:05:13 VerifiableProperties [INFO] Property
>zookeeper.connect is overridden to 10.25.106.183:2181/
>2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers:
>Map(samza-parser ->
>org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)
>
>2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers:
>Map(samza-parser ->
>org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)
>2015-08-26 12:05:13 SamzaContainerExceptionHandler [ERROR] Uncaught
>exception in thread (name=main). Exiting process now.
>java.util.NoSuchElementException: key not found: string
>
>at scala.collection.MapLike$class.default(MapLike.scala:228)
>        at scala.collection.AbstractMap.default(Map.scala:58)
>        at scala.collection.MapLike$class.apply(MapLike.scala:141)
>        at scala.collection.AbstractMap.apply(Map.scala:58)
>        at 
>org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(Sa
>mzaContainer.scala:456)
>        at 
>org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(Sa
>mzaContainer.scala:448)
>        at 
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:244)
>        at 
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:244)
>        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>        at 
>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>        at 
>org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer
>.scala:448)
>        at 
>org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer
>.scala:425)
>        at 
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:244)
>        at 
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:244)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>        at 
>scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>        at 
>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>        at 
>org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:425)
>        at 
>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:9
>3)
>        at 
>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
>        at 
>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>
>
>Below is my samza-parser.properties file.
>
>
># Job
>
>job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>
>job.name=samza-parser
>
>
># YARN
>
>yarn.package.path=file:///Documents/workspace/hello-samza/target/hello-sam
>za-0.9.1-dist.tar.gz
>
>
># Task
>
>task.class=samza.examples.wikipedia.task.Parser
>
>task.inputs=kafka.samza_test
>
>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointM
>anagerFactory
>
>task.checkpoint.system=kafka
>
>
>
>
># Normally, this would be 3, but we have only one broker.
>
>task.checkpoint.replication.factor=1
>
>task.window.ms=5000
>
>
>
>
># Metrics
>
>metrics.reporters=snapshot,jmx
>
>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsS
>napshotReporterFactory
>
>metrics.reporter.snapshot.stream=kafka.metrics
>
>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFa
>ctory
>
>
># Serializers
>
>serializers.registry.json.class=org.apache.samza.serializers.StringSerdeFa
>ctory
>
>serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSna
>pshotSerdeFactory
>
># Systems
>
>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFacto
>ry
>
>systems.kafka.samza.msg.serde=string
>
>systems.kafka.streams.metrics.samza.msg.serde=metrics
>
>systems.kafka.consumer.zookeeper.connect=localhost:2181/
>
>systems.kafka.consumer.auto.offset.reset=smallest
>
>systems.kafka.producer.bootstrap.servers=localhost:9092
>
>
># Key-value storage
>
>
>stores.samza-parser.factory=org.apache.samza.storage.kv.RocksDbKeyValueSto
>rageEngineFactory
>
>stores.samza-parser.changelog=kafka.samza-parser-changelog
>
>stores.samza-parser.key.serde=string
>
>stores.samza-parser.msg.serde=integer
>
>
># Normally, we'd leave this alone, but we have only one broker.
>
>stores.samza-parser.changelog.replication.factor=1
>
>
># Normally, we'd set this much higher, but we want things to look snappy
>in the demo.
>
>stores.samza-parser.write.batch.size=0
>
>stores.samza-parser.object.cache.size=0
>
>
>Can some one tell me where I am going wrong.
>
>
>Regards,
>Tushar Mhaskar
>