You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "michel hummel (JIRA)" <ji...@apache.org> on 2018/03/11 14:28:00 UTC

[jira] [Comment Edited] (STORM-2979) WorkerHooks EOFException during run_worker_shutdown_hooks

    [ https://issues.apache.org/jira/browse/STORM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16394521#comment-16394521 ] 

michel hummel edited comment on STORM-2979 at 3/11/18 2:27 PM:
---------------------------------------------------------------

The issue seems to be related with the use if a ByteBuffer to store the serialized workerhook.
 The byteBuffer is deserialized two time:
 * On start
 * On Stop

On worker start, the worker retrieves the byte array of the ByteBuffer and the position on the ByteBuffer is then at the end of the ByteBuffer.
 On worker stop, the worker retrieves the byte array of the ByteBuffer but its position is at the end of the bytebuffer.

There multiple way to fix this issue:
 *  in worker.clj run-worker-start-hooks, reseting the Bytebuffer position after the workerhook start.

This is the purpose of my commit: [https://github.com/hummelm/storm/commit/7afeff6d8db4a78250ff8827207e80247b0acd25]

 
 * Add a method to the Utils class to allow the retreiving of a byteArray from byteBuffer without changing the internal position of the byteBuffer and use it in the worker.clj

May be a 'storm' developer can give us an advice on the better way to fix the issue, it is blocking our development and we will be happy to create a pullRequest.

 

One thing is strange, the ByteBuffer seems to contain more data than expected, I mean the serialized workerhook is about 106 Bytes and the buffer position is more than 2800.

whatever it is the purposed fix is working.


was (Author: hummel.michel@gmail.com):
The issue seems to be related with the use if a ByteBuffer to store the serialized workerhook.
The byteBuffer is deserialized two time:
 * On start
 * On Stop

On worker start, the worker retrieves the byte array of the ByteBuffer and the position on the ByteBuffer is then at the end of the ByteBuffer.
On worker stop, the worker retrieves the byte array of the ByteBuffer but its position is at the end of the bytebuffer.


There multiple way to fix this issue:


 *  in worker.clj run-worker-start-hooks, reseting the Bytebuffer position after the workerhook start.

This is the purpose of my commit: [https://github.com/hummelm/storm/commit/7afeff6d8db4a78250ff8827207e80247b0acd25]

 
 * Add a method to the Utils class to allow the retreiving of a byteArray from byteBuffer without changing the internal position of the byteBuffer and use it in the worker.clj

May be a 'storm' developer can give us an advice on the better way to fix the issue, it is blocking our development and we will be happy the create a pullRequest.

 

One thing is strange, the ByteBuffer seems to contain more data than expected, I mean the serialized workerhook is about 106 Bytes and the buffer position is more than 2800.

whatever it is the purposed fix is working.

> WorkerHooks EOFException during run_worker_shutdown_hooks
> ---------------------------------------------------------
>
>                 Key: STORM-2979
>                 URL: https://issues.apache.org/jira/browse/STORM-2979
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>    Affects Versions: 1.0.6
>         Environment: centos 7
> storm-core 1.0.6
> eclipse Mars2
> java 1.8.0_151
>            Reporter: Robin Perice
>            Priority: Major
>
> Hi,
> I'm trying to use the BaseWorkerHook but an exception is thrown after I killed the topology.
> The issue is exactly the same as : [http://user.storm.apache.narkive.com/uchOrwlH/workerhook-deserialization-problem|http://user.storm.apache.narkive.com/uchOrwlH/workerhook-deserialization-problem]
> An extract of my code :
>  
> {code:java}
> // topology
> final TridentTopology topology = new TridentTopology();
> // ... I skip all the topology configuration part
> final StormTopology topo = topology.build();
> // hook
> final BaseWorkerHook hook = new BaseWorkerHook();
> final ByteBuffer serializedHook = ByteBuffer.wrap(Utils.javaSerialize(hook ));
> topo.add_to_worker_hooks(hook);
> // submit topology
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology(name,config,topo);
> Utils.sleep(60000);
> // kill topology
> final KillOptions killOptions = new KillOptions();
> killOptions.set_wait_secs(0);
> cluster.killTopologyWithOpts(name, killOptions);
> Utils.sleep(10000);
> cluster.shutdown();
> {code}
>  
> I have the following error :
> {code:java}
> java.lang.RuntimeException: java.io.EOFException
>     at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:254)
>     at org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__5456__5460$fn__5461.invoke(worker.clj:578)
>     at clojure.lang.LazySeq.sval(LazySeq.java:40)
>     at clojure.lang.LazySeq.seq(LazySeq.java:49)
>     at clojure.lang.RT.seq(RT.java:507)
>     at clojure.core$seq__4128.invoke(core.clj:137)
>     at clojure.core$dorun.invoke(core.clj:3009)
>     at clojure.core$doall.invoke(core.clj:3025)
>     at org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:576)
>     at org.apache.storm.daemon.worker$fn__5471$exec_fn__1371__auto__$reify__5473$shutdown_STAR___5493.invoke(worker.clj:693)
>     at org.apache.storm.daemon.worker$fn__5471$exec_fn__1371__auto__$reify$reify__5519.shutdown(worker.clj:706)
>     at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:67)
>     at org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:59)
>     at org.apache.storm.daemon.supervisor.Slot.killContainerForChangedAssignment(Slot.java:311)
>     at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:527)
>     at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:265)
>     at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:741)
> Caused by: java.io.EOFException
>     at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2680)
>     at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3155)
>     at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:864)
>     at java.io.ObjectInputStream.<init>(ObjectInputStream.java:360)
>     at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:245)
>     ... 16 more
> {code}
>  
> Maybe it is related to log4j shutdown hooks (https://issues.apache.org/jira/browse/STORM-2176) so I tried to disable the hook in my src/test/resources/log4j2.xml.
>  
> {code:java}
> <Configuration monitorInterval="60" shutdownHook="disable">
>     <Appenders>
>         <Console name="Console" target="SYSTEM_OUT">
>             <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
>         </Console>
>     </Appenders>
>     <Loggers>
>         <Root level="debug">
>             <AppenderRef ref="Console" />
>         </Root>
>     </Loggers>
> </Configuration>
> {code}
> But it does not change anything.
>  
> Of course the purpose of my work is to use my own worker hook extending the BaseWorkerHook.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)