You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/11/03 16:59:49 UTC

[GitHub] Yitian-Zhang opened a new issue #3103: NullPointerException happened in KafkaSpout running on Heron

Yitian-Zhang opened a new issue #3103: NullPointerException happened in KafkaSpout running on Heron
URL: https://github.com/apache/incubator-heron/issues/3103
 
 
   When I run a topology of Storm with KafkaSpout in Heron, the following exception occurs:
   ```
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: 
   Starting instance container_2_ads_2 for topology AdvertisingTopology and topologyId AdvertisingTopologyf7b4acbe-bdbc-4772-aaa4-9dd2f113f405 for component ads with taskId 2 and componentIndex 0 and stmgrId stmgr-2 and stmgrPort 31162 and metricsManagerPort 31067  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: System Config: {heron.streammgr.network.backpressure.lowwatermark.mb=50, heron.streammgr.connection.write.batch.size.mb=1, heron.streammgr.stateful.buffer.size.mb=100, heron.instance.internal.bolt.write.queue.capacity=128, heron.instance.tuning.expected.spout.read.queue.size=512, heron.metricsmgr.network.write.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.reconnect.streammgr.interval.sec=PT5S, heron.instance.tuning.interval.ms=PT0.1S, heron.instance.emit.batch.size.bytes=ByteAmount{32768 bytes}, heron.logging.directory=log-files, heron.check.tmaster.location.interval.sec=120, heron.instance.reconnect.metricsmgr.interval.sec=PT5S, heron.streammgr.client.reconnect.tmaster.max.attempts=30, heron.streammgr.network.backpressure.highwatermark.mb=100, heron.instance.network.read.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.tuning.expected.metrics.write.queue.size=8, heron.instance.internal.spout.write.queue.capacity=128, heron.instance.force.exit.timeout.ms=PT2S, heron.tmaster.network.stats.options.maximum.packet.mb=1, heron.streammgr.xormgr.rotatingmap.nbuckets=3, heron.instance.set.control.tuple.capacity=1024, heron.metricsmgr.network.read.batch.size.bytes=ByteAmount{32768 bytes}, heron.streammgr.client.reconnect.tmaster.interval.sec=10, heron.instance.execute.batch.time.ms=PT0.016S, heron.metrics.export.interval.sec=PT1M, heron.streammgr.connection.read.batch.size.mb=1, heron.streammgr.cache.drain.size.mb=100, heron.tmaster.network.master.options.maximum.packet.mb=16, heron.tmaster.establish.retry.interval.sec=1, heron.metrics.max.exceptions.per.message.count=1024, heron.tmaster.stmgr.state.timeout.sec=60, heron.instance.network.write.batch.size.bytes=ByteAmount{32768 bytes}, heron.logging.err.threshold=3, heron.tmaster.network.controller.options.maximum.packet.mb=1, heron.tmaster.metrics.collector.maximum.exception=256, heron.instance.network.write.batch.time.ms=PT0.016S, heron.instance.network.options.socket.send.buffer.size.bytes=ByteAmount{6 MB (6553600 bytes)}, heron.streammgr.mempool.max.message.number=512, heron.logging.maximum.size.mb=100, heron.streammgr.tmaster.heartbeat.interval.sec=10, heron.instance.network.read.batch.time.ms=PT0.016S, heron.tmaster.metrics.network.bindallinterfaces=false, heron.streammgr.network.options.maximum.packet.mb=10, heron.instance.tuning.expected.bolt.write.queue.size=8, heron.metricsmgr.network.options.socket.received.buffer.size.bytes=ByteAmount{8 MB (8738000 bytes)}, heron.logging.maximum.files=5, heron.instance.network.options.socket.received.buffer.size.bytes=ByteAmount{8 MB (8738000 bytes)}, heron.instance.execute.batch.size.bytes=ByteAmount{32768 bytes}, heron.instance.acknowledgement.nbuckets=10, heron.metricsmgr.network.read.batch.time.ms=PT0.016S, heron.metricsmgr.network.options.socket.send.buffer.size.bytes=ByteAmount{6 MB (6553600 bytes)}, heron.metricsmgr.network.options.maximum.packetsize.bytes=ByteAmount{1 MB (1048576 bytes)}, heron.instance.tuning.expected.bolt.read.queue.size=8, heron.logging.flush.interval.sec=10, heron.streammgr.cache.drain.frequency.ms=10, heron.tmaster.establish.retry.times=30, heron.instance.network.options.maximum.packetsize.bytes=ByteAmount{10 MB (10485760 bytes)}, heron.instance.tuning.current.sample.weight=0.8, heron.instance.reconnect.streammgr.times=60, heron.logging.prune.interval.sec=300, heron.instance.reconnect.metricsmgr.times=60, heron.tmaster.metrics.collector.maximum.interval.min=PT3H, heron.tmaster.metrics.collector.purge.interval.sec=PT1M, heron.streammgr.client.reconnect.interval.sec=1, heron.instance.internal.spout.read.queue.capacity=1024, heron.instance.ack.batch.time.ms=PT0.128S, heron.instance.set.data.tuple.size.bytes=ByteAmount{8 MB (8388608 bytes)}, heron.instance.tuning.expected.spout.write.queue.size=8, heron.instance.internal.bolt.read.queue.capacity=128, heron.instance.set.data.tuple.capacity=1024, heron.instance.metrics.system.sample.interval.sec=PT10S, heron.streammgr.network.backpressure.threshold=3, heron.instance.emit.batch.time.ms=PT0.016S, heron.metricsmgr.network.write.batch.time.ms=PT0.016S, heron.instance.internal.metrics.write.queue.capacity=128}  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: Connecting to endpoint: /127.0.0.1:31162  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: Connecting to endpoint: /127.0.0.1:31067  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Connected to Stream Manager. Ready to send register request  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: Connected to Metrics Manager. Ready to send register request  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Stop writing due to not yet connected to Stream Manager.  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Stop writing due to not yet connected to Stream Manager.  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: We registered ourselves to the Stream Manager  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Handling assignment message from response  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: We received a new Physical Plan.  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Push to Slave  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: We registered ourselves to the Metrics Manager  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Building configs for component: ads  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Added topology-level configs: {topology.acker.executors=2, topology.workers=3, topology.skip.missing.kryo.registrations=false, topology.enable.message.timeouts=true, topology.serializer.classname=org.apache.storm.serialization.HeronPluggableSerializerDelegate, topology.debug=false, topology.max.spout.pending=100, topology.kryo.factory=org.apache.storm.serialization.DefaultKryoFactory, topology.fall.back.on.java.serialization=false, topology.name=AdvertisingTopology, topology.component.parallelism=1, topology.stmgrs=3, topology.reliability.mode=ATLEAST_ONCE, topology.message.timeout.secs=30}  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.misc.PhysicalPlanHelper: Added component-specific configs: {topology.acker.executors=2, config.zkRoot=/ad-events/6647e83d-6bd8-454e-ad91-d3ec0a012e62, topology.workers=3, topology.skip.missing.kryo.registrations=false, topology.enable.message.timeouts=true, topology.serializer.classname=org.apache.storm.serialization.HeronPluggableSerializerDelegate, topology.debug=false, topology.max.spout.pending=100, topology.kryo.factory=org.apache.storm.serialization.DefaultKryoFactory, topology.fall.back.on.java.serialization=false, topology.name=AdvertisingTopology, topology.component.parallelism=1, config.topics=ad-events, topology.stmgrs=3, topology.reliability.mode=ATLEAST_ONCE, topology.message.timeout.secs=30, config.zkNodeBrokers=/brokers}  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Incarnating ourselves as ads with task id 2  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: Is this topology stateful: false  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: Enable Ack: true  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.spout.SpoutInstance: EnableMessageTimeouts: true  
   [2018-11-01 22:43:49 +0800] [SEVERE] com.twitter.heron.instance.HeronInstance: Exception caught in thread: SlaveThread with id: 12 
   java.lang.NullPointerException
       at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:80)
       at org.apache.storm.topology.IRichSpoutDelegate.open(IRichSpoutDelegate.java:53)
       at com.twitter.heron.instance.spout.SpoutInstance.init(SpoutInstance.java:173)
       at com.twitter.heron.instance.Slave.startInstanceIfNeeded(Slave.java:222)
       at com.twitter.heron.instance.Slave.handleNewAssignment(Slave.java:173)
       at com.twitter.heron.instance.Slave.handleNewPhysicalPlan(Slave.java:349)
       at com.twitter.heron.instance.Slave.access$300(Slave.java:49)
       at com.twitter.heron.instance.Slave$1.run(Slave.java:118)
       at com.twitter.heron.common.basics.WakeableLooper.executeTasksOnWakeup(WakeableLooper.java:160)
       at com.twitter.heron.common.basics.WakeableLooper.runOnce(WakeableLooper.java:89)
       at com.twitter.heron.common.basics.WakeableLooper.loop(WakeableLooper.java:79)
       at com.twitter.heron.instance.Slave.run(Slave.java:180)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:748)
   
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.HeronInstance: Waiting for process exit in PT2S  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Closing the Slave Thread  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.metrics.MetricsCollector: Forcing to gather all metrics and flush out.  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Slave: Shutting down the instance  
   [2018-11-01 22:43:49 +0800] [WARNING] com.twitter.heron.common.basics.SysUtils: Failed to close com.twitter.heron.instance.Slave@4bef4d93 
   java.lang.NullPointerException
       at org.apache.storm.kafka.KafkaSpout.close(KafkaSpout.java:136)
       at org.apache.storm.topology.IRichSpoutDelegate.close(IRichSpoutDelegate.java:58)
       at com.twitter.heron.instance.spout.SpoutInstance.clean(SpoutInstance.java:195)
       at com.twitter.heron.instance.spout.SpoutInstance.shutdown(SpoutInstance.java:204)
       at com.twitter.heron.instance.Slave.close(Slave.java:238)
       at com.twitter.heron.common.basics.SysUtils.closeIgnoringExceptions(SysUtils.java:66)
       at com.twitter.heron.instance.HeronInstance$SlaveExitTask.run(HeronInstance.java:428)
       at com.twitter.heron.instance.HeronInstance$DefaultExceptionHandler.handleException(HeronInstance.java:396)
       at com.twitter.heron.instance.HeronInstance$DefaultExceptionHandler.uncaughtException(HeronInstance.java:360)
       at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1057)
       at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1052)
       at java.lang.Thread.dispatchUncaughtException(Thread.java:1959)
   
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.instance.Gateway: Closing the Gateway thread  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.utils.metrics.MetricsCollector: Forcing to gather all metrics and flush out.  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: Flushing all pending data in MetricsManagerClient  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: Flushing all pending data in StreamManagerClient  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.SocketChannelHelper: Forcing to flush data to socket with best effort.  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: To stop the HeronClient.  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.MetricsManagerClient: MetricsManagerClient exits  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.SocketChannelHelper: Forcing to flush data to socket with best effort.  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.common.network.HeronClient: To stop the HeronClient.  
   [2018-11-01 22:43:49 +0800] [INFO] com.twitter.heron.network.StreamManagerClient: StreamManagerClient exits.  
   [2018-11-01 22:43:49 +0800] [SEVERE] com.twitter.heron.instance.HeronInstance: Instance Process exiting.
   ```
   The codes of the topology as follows:
   ```
   String zkServerHosts = "MY_ZK_IP:2181";
   ZkHosts hosts = new ZkHosts(zkServerHosts);
   
   SpoutConfig spoutConfig = new SpoutConfig(hosts, kafkaTopic, "/" + kafkaTopic, UUID.randomUUID().toString());
   spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
   KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
   ```
   The location of the NPE is 80 lines of the open method in KafkaSpout class:
   ```
   public Object getValueAndReset() {
       List<PartitionManager> pms = KafkaSpout.this.coordinator.getMyManagedPartitions();
       Set<Partition> latestPartitions = new HashSet();
       Iterator var3 = pms.iterator();
   
       PartitionManager pm;
       while(var3.hasNext()) { // the line of NPE happened
            pm = (PartitionManager)var3.next();
            latestPartitions.add(pm.getPartition());
       }
   
       this.kafkaOffsetMetric.refreshPartitions(latestPartitions);
       var3 = pms.iterator();
   
       while(var3.hasNext()) {
            pm = (PartitionManager)var3.next();      
            this.kafkaOffsetMetric.setOffsetData(pm.getPartition(), 
            pm.getOffsetData());
       }
   
       return this.kafkaOffsetMetric.getValueAndReset();
    }
   ```
   The dependencies of the pom.xml as follows:
   ```
               <dependency>
                   <groupId>org.apache.storm</groupId>
                   <artifactId>storm-kafka</artifactId>
                   <version>${storm.version}</version>
               </dependency>
               <dependency>
                   <groupId>org.apache.kafka</groupId>
                   <artifactId>kafka_${scala.binary.version}</artifactId>
                   <version>${kafka.version}</version>
                   <exclusions>
                       <exclusion>
                           <groupId>org.apache.zookeeper</groupId>
                           <artifactId>zookeeper</artifactId>
                       </exclusion>
                       <exclusion>
                           <groupId>log4j</groupId>
                           <artifactId>log4j</artifactId>
                       </exclusion>
                   </exclusions>
               </dependency>
               <dependency>
                   <groupId>org.apache.kafka</groupId>
                   <artifactId>kafka-clients</artifactId>
                   <version>${kafka.version}</version>
               </dependency>
               <dependency>
                   <groupId>com.twitter.heron</groupId>
                   <artifactId>heron-storm</artifactId>
                   <version>0.17.5</version>
               </dependency>
   ```
   I have tried to change storm-kafka to heron-kafka and added heron-api, what's more, removed the storm-kafka. But there is still the same NPE problem.
   I don't know what caused this problem and how to fix it. Any help is grateful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services