You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Selina Tech <sw...@gmail.com> on 2015/10/15 07:07:15 UTC

SamzaContainer NullPointerException for read byte[] topic from Kafka

Dear All:
    I tried to consumer kafka topic "cnr-proto" in Java. It got the
 SamzaContainer NullPointerException as below.
    The messages can be shown by command line correctly
"deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
--from-beginning --topic cnr-proto"

    My Key and message of topic "cir-proto" at Kafka are both in byte[]

   run
   deploy/samza/bin/run-job.sh
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
--config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties

    the properties file and java code are list below also.

   Your help is highly appreciated.

Sincerely,
Selina

-------------error in samza-container-0.log-------------

2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process loop.
java.lang.NullPointerException
at samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
at
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
at
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
multiplexer.
2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
10.1.10.141:9092
2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
socket error: java.nio.channels.ClosedByInterruptException
2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
10.1.10.141:9092

---------mct-aggregation.properties----------
 # Job
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 job.name=mct-aggregation

 task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
 task.checkpoint.system=kafka
 # Normally, this would be 3, but we have only one broker.
 task.checkpoint.replication.factor=1

 # YARN
 yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz

 # Task
# path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
 task.class=samza.http.demo.task.MctAggregateTask
 task.inputs=kafka.cnr-proto

 # Serializers
 serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
 serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory


 # Kafka System
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.samza.key.serde=byte
 systems.kafka.samza.msg.serde=byte

# Use the "byte" serializer for messages in the "cnr-proto" topic
systems.kafka.streams.cnr-proto.samza.key.serde=byte
systems.kafka.streams.cnr-proto.samza.msg.serde=byte

 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.producer.bootstrap.servers=localhost:9092

 #stream from begining
 #systems.kafka.consumer.auto.offset.reset=smallest
#http-demo from the oldest
 systems.kafka.cnr-proto.samza.offset.default=oldest
# all stream from the oldest
 systems.kafka.streams.cnr-proto.samza.offset.default=oldest
 systems.kafka.streams.cnr-proto.samza.reset.offset=true

-------------------MctAggregateTask.java----------

public class MctAggregateTask implements StreamTask {

  private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "cnr-proto-tmp");

  @SuppressWarnings("unchecked")
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) throws Exception {

    byte[] key = (byte[])envelope.getKey();
    byte[] message = (byte[]) envelope.getMessage();

    logger.info("key="+key.toString()+": message="+message.toString());
    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
  }

Re: SamzaContainer NullPointerException for read byte[] topic from Kafka

Posted by Selina Tech <sw...@gmail.com>.
Hi, Yi:

       Thanks for your suggestion. I found the bug. the line of logger.info
got Exception.

Thanks a lot!


Sincerely,
Selina



On Thu, Oct 15, 2015 at 10:03 AM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Selina,
>
> Your stack trace showed that the exception was thrown at line 50 in your
> task code. Could you point out which line is it?
>
> It would be helpful if you can add some log info regarding to the message
> you receive in the process() vs the message you read from Kafka console
> consumer.
>
> Thanks!
>
> -Yi
>
> On Wed, Oct 14, 2015 at 10:07 PM, Selina Tech <sw...@gmail.com>
> wrote:
>
> > Dear All:
> >     I tried to consumer kafka topic "cnr-proto" in Java. It got the
> >  SamzaContainer NullPointerException as below.
> >     The messages can be shown by command line correctly
> > "deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> > --from-beginning --topic cnr-proto"
> >
> >     My Key and message of topic "cir-proto" at Kafka are both in byte[]
> >
> >    run
> >    deploy/samza/bin/run-job.sh
> >
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> > --config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties
> >
> >     the properties file and java code are list below also.
> >
> >    Your help is highly appreciated.
> >
> > Sincerely,
> > Selina
> >
> > -------------error in samza-container-0.log-------------
> >
> > 2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process
> > loop.
> > java.lang.NullPointerException
> > at
> samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
> > at
> >
> >
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
> > at
> >
> >
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> > at
> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
> > at
> >
> >
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
> > at
> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> > at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
> > at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
> > at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
> > at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
> > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
> > 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
> > multiplexer.
> > 2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
> > 2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
> > 10.1.10.141:9092
> > 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
> > socket error: java.nio.channels.ClosedByInterruptException
> > 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
> > 10.1.10.141:9092
> >
> > ---------mct-aggregation.properties----------
> >  # Job
> >  job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >  job.name=mct-aggregation
> >
> >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >  task.checkpoint.system=kafka
> >  # Normally, this would be 3, but we have only one broker.
> >  task.checkpoint.replication.factor=1
> >
> >  # YARN
> >
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz
> >
> >  # Task
> > # path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
> >  task.class=samza.http.demo.task.MctAggregateTask
> >  task.inputs=kafka.cnr-proto
> >
> >  # Serializers
> >
> >
> serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >
> >
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
> >
> >
> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
> >
> >
> >  # Kafka System
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >  systems.kafka.samza.key.serde=byte
> >  systems.kafka.samza.msg.serde=byte
> >
> > # Use the "byte" serializer for messages in the "cnr-proto" topic
> > systems.kafka.streams.cnr-proto.samza.key.serde=byte
> > systems.kafka.streams.cnr-proto.samza.msg.serde=byte
> >
> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >  systems.kafka.producer.bootstrap.servers=localhost:9092
> >
> >  #stream from begining
> >  #systems.kafka.consumer.auto.offset.reset=smallest
> > #http-demo from the oldest
> >  systems.kafka.cnr-proto.samza.offset.default=oldest
> > # all stream from the oldest
> >  systems.kafka.streams.cnr-proto.samza.offset.default=oldest
> >  systems.kafka.streams.cnr-proto.samza.reset.offset=true
> >
> > -------------------MctAggregateTask.java----------
> >
> > public class MctAggregateTask implements StreamTask {
> >
> >   private static final SystemStream OUTPUT_STREAM = new
> > SystemStream("kafka", "cnr-proto-tmp");
> >
> >   @SuppressWarnings("unchecked")
> >   @Override
> >   public void process(IncomingMessageEnvelope envelope, MessageCollector
> > collector, TaskCoordinator coordinator) throws Exception {
> >
> >     byte[] key = (byte[])envelope.getKey();
> >     byte[] message = (byte[]) envelope.getMessage();
> >
> >     logger.info("key="+key.toString()+": message="+message.toString());
> >     collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
> >   }
> >
>

Re: SamzaContainer NullPointerException for read byte[] topic from Kafka

Posted by Yi Pan <ni...@gmail.com>.
Hi, Selina,

Your stack trace showed that the exception was thrown at line 50 in your
task code. Could you point out which line is it?

It would be helpful if you can add some log info regarding to the message
you receive in the process() vs the message you read from Kafka console
consumer.

Thanks!

-Yi

On Wed, Oct 14, 2015 at 10:07 PM, Selina Tech <sw...@gmail.com> wrote:

> Dear All:
>     I tried to consumer kafka topic "cnr-proto" in Java. It got the
>  SamzaContainer NullPointerException as below.
>     The messages can be shown by command line correctly
> "deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> --from-beginning --topic cnr-proto"
>
>     My Key and message of topic "cir-proto" at Kafka are both in byte[]
>
>    run
>    deploy/samza/bin/run-job.sh
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> --config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties
>
>     the properties file and java code are list below also.
>
>    Your help is highly appreciated.
>
> Sincerely,
> Selina
>
> -------------error in samza-container-0.log-------------
>
> 2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process
> loop.
> java.lang.NullPointerException
> at samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
> at
>
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
> at
>
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
> at
>
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
> at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
> at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
> at
>
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
> at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
> 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
> multiplexer.
> 2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
> 2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
> 10.1.10.141:9092
> 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
> socket error: java.nio.channels.ClosedByInterruptException
> 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
> 10.1.10.141:9092
>
> ---------mct-aggregation.properties----------
>  # Job
>  job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>  job.name=mct-aggregation
>
>
>  task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>  task.checkpoint.system=kafka
>  # Normally, this would be 3, but we have only one broker.
>  task.checkpoint.replication.factor=1
>
>  # YARN
>
>  yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz
>
>  # Task
> # path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
>  task.class=samza.http.demo.task.MctAggregateTask
>  task.inputs=kafka.cnr-proto
>
>  # Serializers
>
>  serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
>
>  serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
>  serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
>
>  serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
>
>
>  # Kafka System
>
>  systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>  systems.kafka.samza.key.serde=byte
>  systems.kafka.samza.msg.serde=byte
>
> # Use the "byte" serializer for messages in the "cnr-proto" topic
> systems.kafka.streams.cnr-proto.samza.key.serde=byte
> systems.kafka.streams.cnr-proto.samza.msg.serde=byte
>
>  systems.kafka.consumer.zookeeper.connect=localhost:2181/
>  systems.kafka.producer.bootstrap.servers=localhost:9092
>
>  #stream from begining
>  #systems.kafka.consumer.auto.offset.reset=smallest
> #http-demo from the oldest
>  systems.kafka.cnr-proto.samza.offset.default=oldest
> # all stream from the oldest
>  systems.kafka.streams.cnr-proto.samza.offset.default=oldest
>  systems.kafka.streams.cnr-proto.samza.reset.offset=true
>
> -------------------MctAggregateTask.java----------
>
> public class MctAggregateTask implements StreamTask {
>
>   private static final SystemStream OUTPUT_STREAM = new
> SystemStream("kafka", "cnr-proto-tmp");
>
>   @SuppressWarnings("unchecked")
>   @Override
>   public void process(IncomingMessageEnvelope envelope, MessageCollector
> collector, TaskCoordinator coordinator) throws Exception {
>
>     byte[] key = (byte[])envelope.getKey();
>     byte[] message = (byte[]) envelope.getMessage();
>
>     logger.info("key="+key.toString()+": message="+message.toString());
>     collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>   }
>