You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/07/31 06:48:44 UTC

[GitHub] [incubator-druid] ArtyomyuS opened a new issue #8204: HttpPostEmitter throw Class cast exception when using emitAndReturnBatch

ArtyomyuS opened a new issue #8204: HttpPostEmitter throw Class cast exception when using emitAndReturnBatch
URL: https://github.com/apache/incubator-druid/issues/8204
 
 
   ### Affected Version
   `0.13.0`, `0.14.2`
   
   ### Description
   The following exception is thrown on broker:
   ```
   2019-07-30T15:55:21,823 WARN [HttpClient-Netty-Worker-9] org.apache.druid.java.util.http.client.NettyHttpClient - [POST http://{instance}:8102/druid/v2/] Exception thrown while processing message, closing channel.
   java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.druid.java.util.emitter.core.Batch
           at org.apache.druid.java.util.emitter.core.HttpPostEmitter.emitAndReturnBatch(HttpPostEmitter.java:262) ~[druid-core-0.14.2-incubating.jar:0.14.2-incubating]
           at org.apache.druid.java.util.emitter.core.HttpPostEmitter.emit(HttpPostEmitter.java:227) ~[druid-core-0.14.2-incubating.jar:0.14.2-incubating]
           at org.apache.druid.java.util.emitter.service.ServiceEmitter.emit(ServiceEmitter.java:67) ~[druid-core-0.14.2-incubating.jar:0.14.2-incubating]
           at org.apache.druid.java.util.emitter.service.ServiceEmitter.emit(ServiceEmitter.java:72) ~[druid-core-0.14.2-incubating.jar:0.14.2-incubating]
           at org.apache.druid.query.DefaultQueryMetrics.emit(DefaultQueryMetrics.java:311) ~[druid-processing-0.14.2-incubating.jar:0.14.2-incubating]
           at org.apache.druid.client.DirectDruidClient$1.handleResponse(DirectDruidClient.java:230) ~[druid-server-0.14.2-incubating.jar:0.14.2-incubating]
           at org.apache.druid.java.util.http.client.NettyHttpClient$1.messageReceived(NettyHttpClient.java:224) [druid-core-0.14.2-incubating.jar:0.14.2-incubating]
           at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived(ReadTimeoutHandler.java:184) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:108) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.10.6.Final.jar:?]
           at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.10.6.Final.jar:?]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
   ```
   
   Was able to reproduce it on both: `0.13.0` and `0.14.2` versions, however I do see same code in `0.15.0` version.
   
   I was not able to understand properly how this may happen on a real cluster as the stack trace doesn't show a lot, however spotted the potential cause here: `HttpPostEmitter:308`, while a throwable thrown the `concurrentBatch` is compareAndSet(batch, batch.batchNumber) so that `compareAndSet(Batch.class, Long.class)`.
   
   While checking `HttpPostEmitter#emitAndReturnBatch` method I see there is a condition to check if batch object is instance of `Integer` to try to recover it. I suppose this must be check for `Long.class` as Batch.batchNumber is of type `Long`.
   
   
   I've tried to simulate somehow the behaviour of this flow by forcing `concurrentBatch` as Long type. Example: 
   ```
   public class HttpPostEmitterTest {
   
       private static final ObjectMapper objectMapper = new ObjectMapper() {
           @Override
           public byte[] writeValueAsBytes(Object value) {
               return Ints.toByteArray(((IntEvent) value).index);
           }
       };
   
       private final MockHttpClient httpClient = new MockHttpClient();
   
       @Before
       public void setup() {
           httpClient.setGoHandler(new GoHandler() {
               @Override
               protected ListenableFuture<Response> go(Request request) {
                   return GoHandlers.immediateFuture(EmitterTest.okResponse());
               }
           });
       }
   
   
       @Test(expected = ClassCastException.class)
       @SuppressWarnings("unchecked")
       public void testRecoveryEmitAndReturnBatch() throws InterruptedException, IOException, NoSuchFieldException, IllegalAccessException {
           HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
                   .setFlushMillis(100)
                   .setFlushCount(4)
                   .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
                   .setMaxBatchSize(1024 * 1024)
                   .setBatchQueueSizeLimit(1000)
                   .build();
           final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
           emitter.start();
           // emit first event
           emitter.emitAndReturnBatch(new IntEvent());
           Thread.sleep(1000L);
   
           // get concurrentBatch reference and set value to long as if it would fail while
           // HttpPostEmitter#onSealExclusive method invocation.
           Field concurrentBatch = emitter.getClass().getDeclaredField("concurrentBatch");
           concurrentBatch.setAccessible(true);
           ((AtomicReference<Object>) concurrentBatch.get(emitter)).getAndSet(1L);
           // something terrible happened previously so that batch has to recover
   
           // emit second event
           emitter.emitAndReturnBatch(new IntEvent()); // will fail with ClassCastException.class here
   
           emitter.flush();
           emitter.close();
       }
   
   }
   ``` 
   
   I suppose the fix should be relatively simple,  just changing in HttpPostEmitter:254 the following block, from Integer to Long:
   ```
   Object batchObj = concurrentBatch.get();
         if (batchObj instanceof Long) {
           tryRecoverCurrentBatch((Long) batchObj);
           continue;
         }
   ```
   and changing method declaration to consider Long, HttpPostEmitter:346:
   ```
   private void tryRecoverCurrentBatch(Long failedBatchNumber){
    ...
   }
   ```
   
   Also found that @leventov worked on a patch #5386 some time ago to make `onSealExclusive()` more robust in case of OOM Errors, what I notice is that there was no OOM issues found in the logs while class cast exception popup, not sure those are related.
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org