You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Selina Tech <sw...@gmail.com> on 2015/10/15 07:07:15 UTC
SamzaContainer NullPointerException for read byte[] topic from Kafka
Dear All:
I tried to consumer kafka topic "cnr-proto" in Java. It got the
SamzaContainer NullPointerException as below.
The messages can be shown by command line correctly
"deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
--from-beginning --topic cnr-proto"
My Key and message of topic "cir-proto" at Kafka are both in byte[]
run
deploy/samza/bin/run-job.sh
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
--config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties
the properties file and java code are list below also.
Your help is highly appreciated.
Sincerely,
Selina
-------------error in samza-container-0.log-------------
2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process loop.
java.lang.NullPointerException
at samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
at
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
at
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
multiplexer.
2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
10.1.10.141:9092
2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
socket error: java.nio.channels.ClosedByInterruptException
2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
10.1.10.141:9092
---------mct-aggregation.properties----------
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=mct-aggregation
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1
# YARN
yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz
# Task
# path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
task.class=samza.http.demo.task.MctAggregateTask
task.inputs=kafka.cnr-proto
# Serializers
serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.key.serde=byte
systems.kafka.samza.msg.serde=byte
# Use the "byte" serializer for messages in the "cnr-proto" topic
systems.kafka.streams.cnr-proto.samza.key.serde=byte
systems.kafka.streams.cnr-proto.samza.msg.serde=byte
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
#stream from begining
#systems.kafka.consumer.auto.offset.reset=smallest
#http-demo from the oldest
systems.kafka.cnr-proto.samza.offset.default=oldest
# all stream from the oldest
systems.kafka.streams.cnr-proto.samza.offset.default=oldest
systems.kafka.streams.cnr-proto.samza.reset.offset=true
-------------------MctAggregateTask.java----------
public class MctAggregateTask implements StreamTask {
private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "cnr-proto-tmp");
@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) throws Exception {
byte[] key = (byte[])envelope.getKey();
byte[] message = (byte[]) envelope.getMessage();
logger.info("key="+key.toString()+": message="+message.toString());
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
}
Re: SamzaContainer NullPointerException for read byte[] topic from Kafka
Posted by Selina Tech <sw...@gmail.com>.
Hi, Yi:
Thanks for your suggestion. I found the bug. the line of logger.info
got Exception.
Thanks a lot!
Sincerely,
Selina
On Thu, Oct 15, 2015 at 10:03 AM, Yi Pan <ni...@gmail.com> wrote:
> Hi, Selina,
>
> Your stack trace showed that the exception was thrown at line 50 in your
> task code. Could you point out which line is it?
>
> It would be helpful if you can add some log info regarding to the message
> you receive in the process() vs the message you read from Kafka console
> consumer.
>
> Thanks!
>
> -Yi
>
> On Wed, Oct 14, 2015 at 10:07 PM, Selina Tech <sw...@gmail.com>
> wrote:
>
> > Dear All:
> > I tried to consumer kafka topic "cnr-proto" in Java. It got the
> > SamzaContainer NullPointerException as below.
> > The messages can be shown by command line correctly
> > "deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> > --from-beginning --topic cnr-proto"
> >
> > My Key and message of topic "cir-proto" at Kafka are both in byte[]
> >
> > run
> > deploy/samza/bin/run-job.sh
> >
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> > --config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties
> >
> > the properties file and java code are list below also.
> >
> > Your help is highly appreciated.
> >
> > Sincerely,
> > Selina
> >
> > -------------error in samza-container-0.log-------------
> >
> > 2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process
> > loop.
> > java.lang.NullPointerException
> > at
> samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
> > at
> >
> >
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
> > at
> >
> >
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> > at
> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
> > at
> >
> >
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
> > at
> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> > at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
> > at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
> > at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
> > at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
> > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
> > 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
> > multiplexer.
> > 2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
> > 2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
> > 10.1.10.141:9092
> > 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
> > socket error: java.nio.channels.ClosedByInterruptException
> > 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
> > 10.1.10.141:9092
> >
> > ---------mct-aggregation.properties----------
> > # Job
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > job.name=mct-aggregation
> >
> >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> > task.checkpoint.system=kafka
> > # Normally, this would be 3, but we have only one broker.
> > task.checkpoint.replication.factor=1
> >
> > # YARN
> >
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz
> >
> > # Task
> > # path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
> > task.class=samza.http.demo.task.MctAggregateTask
> > task.inputs=kafka.cnr-proto
> >
> > # Serializers
> >
> >
> serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >
> >
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
> >
> >
> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
> >
> >
> > # Kafka System
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > systems.kafka.samza.key.serde=byte
> > systems.kafka.samza.msg.serde=byte
> >
> > # Use the "byte" serializer for messages in the "cnr-proto" topic
> > systems.kafka.streams.cnr-proto.samza.key.serde=byte
> > systems.kafka.streams.cnr-proto.samza.msg.serde=byte
> >
> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
> > systems.kafka.producer.bootstrap.servers=localhost:9092
> >
> > #stream from begining
> > #systems.kafka.consumer.auto.offset.reset=smallest
> > #http-demo from the oldest
> > systems.kafka.cnr-proto.samza.offset.default=oldest
> > # all stream from the oldest
> > systems.kafka.streams.cnr-proto.samza.offset.default=oldest
> > systems.kafka.streams.cnr-proto.samza.reset.offset=true
> >
> > -------------------MctAggregateTask.java----------
> >
> > public class MctAggregateTask implements StreamTask {
> >
> > private static final SystemStream OUTPUT_STREAM = new
> > SystemStream("kafka", "cnr-proto-tmp");
> >
> > @SuppressWarnings("unchecked")
> > @Override
> > public void process(IncomingMessageEnvelope envelope, MessageCollector
> > collector, TaskCoordinator coordinator) throws Exception {
> >
> > byte[] key = (byte[])envelope.getKey();
> > byte[] message = (byte[]) envelope.getMessage();
> >
> > logger.info("key="+key.toString()+": message="+message.toString());
> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
> > }
> >
>
Re: SamzaContainer NullPointerException for read byte[] topic from Kafka
Posted by Yi Pan <ni...@gmail.com>.
Hi, Selina,
Your stack trace showed that the exception was thrown at line 50 in your
task code. Could you point out which line is it?
It would be helpful if you can add some log info regarding to the message
you receive in the process() vs the message you read from Kafka console
consumer.
Thanks!
-Yi
On Wed, Oct 14, 2015 at 10:07 PM, Selina Tech <sw...@gmail.com> wrote:
> Dear All:
> I tried to consumer kafka topic "cnr-proto" in Java. It got the
> SamzaContainer NullPointerException as below.
> The messages can be shown by command line correctly
> "deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> --from-beginning --topic cnr-proto"
>
> My Key and message of topic "cir-proto" at Kafka are both in byte[]
>
> run
> deploy/samza/bin/run-job.sh
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> --config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties
>
> the properties file and java code are list below also.
>
> Your help is highly appreciated.
>
> Sincerely,
> Selina
>
> -------------error in samza-container-0.log-------------
>
> 2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process
> loop.
> java.lang.NullPointerException
> at samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
> at
>
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
> at
>
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
> at
>
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
> at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
> at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
> at
>
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
> at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
> 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
> multiplexer.
> 2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
> 2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
> 10.1.10.141:9092
> 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
> socket error: java.nio.channels.ClosedByInterruptException
> 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
> 10.1.10.141:9092
>
> ---------mct-aggregation.properties----------
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=mct-aggregation
>
>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> task.checkpoint.system=kafka
> # Normally, this would be 3, but we have only one broker.
> task.checkpoint.replication.factor=1
>
> # YARN
>
> yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz
>
> # Task
> # path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
> task.class=samza.http.demo.task.MctAggregateTask
> task.inputs=kafka.cnr-proto
>
> # Serializers
>
> serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
>
> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
>
>
> # Kafka System
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.key.serde=byte
> systems.kafka.samza.msg.serde=byte
>
> # Use the "byte" serializer for messages in the "cnr-proto" topic
> systems.kafka.streams.cnr-proto.samza.key.serde=byte
> systems.kafka.streams.cnr-proto.samza.msg.serde=byte
>
> systems.kafka.consumer.zookeeper.connect=localhost:2181/
> systems.kafka.producer.bootstrap.servers=localhost:9092
>
> #stream from begining
> #systems.kafka.consumer.auto.offset.reset=smallest
> #http-demo from the oldest
> systems.kafka.cnr-proto.samza.offset.default=oldest
> # all stream from the oldest
> systems.kafka.streams.cnr-proto.samza.offset.default=oldest
> systems.kafka.streams.cnr-proto.samza.reset.offset=true
>
> -------------------MctAggregateTask.java----------
>
> public class MctAggregateTask implements StreamTask {
>
> private static final SystemStream OUTPUT_STREAM = new
> SystemStream("kafka", "cnr-proto-tmp");
>
> @SuppressWarnings("unchecked")
> @Override
> public void process(IncomingMessageEnvelope envelope, MessageCollector
> collector, TaskCoordinator coordinator) throws Exception {
>
> byte[] key = (byte[])envelope.getKey();
> byte[] message = (byte[]) envelope.getMessage();
>
> logger.info("key="+key.toString()+": message="+message.toString());
> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
> }
>