You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by weijie tong <to...@gmail.com> on 2018/06/20 03:35:28 UTC

how to release allocated ByteBuf which steps across two threads

HI:
   I faced a complicated problem by releasing the BloomFilter's direct
memory at some special cases. Hope someone could give some advices.

   Say, one join node sends out BloomFilter to the foreman
node(TestHashJoin.simpleEqualityJoin() ) .  The sending thread is netty's
BitClient. The BloomFilter's direct memory is allocated by another thread
allocator (i.e. the HashJoin fragment's allocator).  Once the fragment
completes quickly. Then its corresponding close logic will check the
allocator's memory assignment. But the async sender thread has not sent out
the BloomFilter to release the corresponding direct ByteBuffer as the query
has completed quickly , the wire has closed. Then the  corresponding
fragment's close logic will throw exception to complain about the memory
leak.

    So I want to know how to release the allocated direct ByteBuffer at
such case .


   The exception is :

[Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
at
org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633)
~[classes/:na]
at
org.apache.drill.exec.work.fragment.FragmentExecutor.sendFinalState(FragmentExecutor.java:359)
[classes/:na]
at
org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:214)
[classes/:na]
at
org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:325)
[classes/:na]
at
org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
[classes/:na]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[na:1.8.0_161]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[na:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Caused by: java.lang.IllegalStateException: Allocator[frag:0:0] closed with
outstanding buffers allocated (1).
Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
(res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
505919927378312..0] holds 2 buffers.
        DrillBuf[10198], udle: [6051 0..16777216]
        DrillBuf[10208], udle: [6051 0..16777216]
  reservations: 0

at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocator.java:503)
~[classes/:na]
at
org.apache.drill.exec.ops.FragmentContextImpl.suppressingClose(FragmentContextImpl.java:484)
~[classes/:na]
at
org.apache.drill.exec.ops.FragmentContextImpl.close(FragmentContextImpl.java:478)
~[classes/:na]
at
org.apache.drill.exec.work.fragment.FragmentExecutor.closeOutResources(FragmentExecutor.java:382)
[classes/:na]
at
org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:209)
[classes/:na]
... 5 common frames omitted
=====================fragment:0 : 0done!
*************************receive a bloom filter**********
************received a bloom filter
11:00:41.587 [main] ERROR o.a.d.exec.server.BootStrapContext - Error while
closing
java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding
child allocators.
Allocator(ROOT) 0/16777216/55640064/4294967296 (res/actual/peak/limit)
  child allocators: 1
    Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
(res/actual/peak/limit)
      child allocators: 0
      ledgers: 1
        ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
505919927378312..0] holds 2 buffers.
            DrillBuf[10198], udle: [6051 0..16777216]
            DrillBuf[10208], udle: [6051 0..16777216]
      reservations: 0
  ledgers: 0
  reservations: 0

at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocator.java:496)
~[classes/:na]
at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
[classes/:na]
at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
[classes/:na]
at
org.apache.drill.exec.server.BootStrapContext.close(BootStrapContext.java:259)
~[classes/:na]
at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
[classes/:na]
at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
[classes/:na]
at org.apache.drill.exec.server.Drillbit.close(Drillbit.java:263)
[classes/:na]
at
org.apache.drill.exec.physical.impl.join.TestHashJoin.simpleEqualityJoin(TestHashJoin.java:147)
[test-classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.8.0_161]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[na:1.8.0_161]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:1.8.0_161]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
[junit-4.12.jar:4.12]
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
[junit-4.12.jar:4.12]
at
mockit.integration.junit4.internal.JUnit4TestRunnerDecorator.executeTestMethod(JUnit4TestRunnerDecorator.java:154)
[jmockit-1.39.jar:1.39]
at
mockit.integration.junit4.internal.JUnit4TestRunnerDecorator.invokeExplosively(JUnit4TestRunnerDecorator.java:70)
[jmockit-1.39.jar:1.39]
at
mockit.integration.junit4.internal.FakeFrameworkMethod.invokeExplosively(FakeFrameworkMethod.java:34)
[jmockit-1.39.jar:1.39]
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
[junit-4.12.jar:4.12]
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
[junit-4.12.jar:4.12]
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
[junit-4.12.jar:4.12]
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
[junit-4.12.jar:4.12]
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
[junit-4.12.jar:4.12]
at
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
[junit-4.12.jar:4.12]
at org.junit.rules.RunRules.evaluate(RunRules.java:20) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
[junit-4.12.jar:4.12]
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
[junit-4.12.jar:4.12]
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
[junit-4.12.jar:4.12]
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
[junit-4.12.jar:4.12]
at org.junit.rules.RunRules.evaluate(RunRules.java:20) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
[junit-4.12.jar:4.12]
at org.junit.runner.JUnitCore.run(JUnitCore.java:137) [junit-4.12.jar:4.12]
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
[junit-rt.jar:na]
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
[junit-rt.jar:na]
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
[junit-rt.jar:na]
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
[junit-rt.jar:na]
11:00:41.593 [main] ERROR org.apache.drill.TestReporter - Test Failed (d: 0
B(1 B), h: -360.8 MiB(52.3 MiB), nh: 3.2 MiB(88.4 MiB)):
simpleEqualityJoin(org.apache.drill.exec.physical.impl.join.TestHashJoin)
org.apache.drill.exec.rpc.RpcException:
org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
IllegalStateException: Allocator[frag:0:0] closed with outstanding buffers
allocated (1).
Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
(res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
505919927378312..0] holds 2 buffers.
        DrillBuf[10198], udle: [6051 0..16777216]
        DrillBuf[10208], udle: [6051 0..16777216]
  reservations: 0


Fragment 0:0

[Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
at
org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:60)
~[classes/:na]
at
org.apache.drill.exec.client.DrillClient$ListHoldingResultsListener.getResults(DrillClient.java:881)
~[classes/:na]
at org.apache.drill.exec.client.DrillClient.runQuery(DrillClient.java:583)
~[classes/:na]
at
org.apache.drill.exec.physical.impl.join.TestHashJoin.simpleEqualityJoin(TestHashJoin.java:119)
~[test-classes/:na]
org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
IllegalStateException: Allocator[frag:0:0] closed with outstanding buffers
allocated (1).
Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
(res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
505919927378312..0] holds 2 buffers.
        DrillBuf[10198], udle: [6051 0..16777216]
        DrillBuf[10208], udle: [6051 0..16777216]
  reservations: 0


Fragment 0:0

[Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
at
org.apache.drill.exec.rpc.user.QueryResultHandler.resultArrived(QueryResultHandler.java:123)
~[classes/:na]
at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
~[classes/:na]
at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
~[classes/:na]
at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:275)
~[classes/:na]
at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:245)
~[classes/:na]
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
~[netty-handler-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)
~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
~[netty-common-4.0.48.Final.jar:4.0.48.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]

org.apache.drill.exec.rpc.RpcException:
org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
IllegalStateException: Allocator[frag:0:0] closed with outstanding buffers
allocated (1).
Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
(res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
505919927378312..0] holds 2 buffers.
        DrillBuf[10198], udle: [6051 0..16777216]
        DrillBuf[10208], udle: [6051 0..16777216]
  reservations: 0


Fragment 0:0

[Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]

at org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:60)
at
org.apache.drill.exec.client.DrillClient$ListHoldingResultsListener.getResults(DrillClient.java:881)
at org.apache.drill.exec.client.DrillClient.runQuery(DrillClient.java:583)
at
org.apache.drill.exec.physical.impl.join.TestHashJoin.simpleEqualityJoin(TestHashJoin.java:119)
Caused by: org.apache.drill.common.exceptions.UserRemoteException: SYSTEM
ERROR: IllegalStateException: Allocator[frag:0:0] closed with outstanding
buffers allocated (1).
Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
(res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
505919927378312..0] holds 2 buffers.
        DrillBuf[10198], udle: [6051 0..16777216]
        DrillBuf[10208], udle: [6051 0..16777216]
  reservations: 0


Fragment 0:0

[Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
at
org.apache.drill.exec.rpc.user.QueryResultHandler.resultArrived(QueryResultHandler.java:123)
at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:275)
at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:245)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at java.lang.Thread.run(Thread.java:748)

Re: how to release allocated ByteBuf which steps across two threads

Posted by Parth Chandra <pa...@apache.org>.
Glad you found waitForSendComplete(), that is exactly right ( I should have
mentioned it in the earlier post) !

I'm not sure I followed your description of how the query proceeds without
the bloom filter having been received, but that can wait till we all get to
see the implementation.

Thanks for working on this feature, it is really awesome.





On Wed, Jun 20, 2018 at 11:06 PM, weijie tong <to...@gmail.com>
wrote:

> Hi Parth:
>
>    thanks for your suggestion . After deeply investigate in what Drill has
> implemented ,precisely say the ```FragmentContextImpl.sendingAccountor```
> and ```FragmentContextImol. waitForSendComplete() ```  , I found that the
> logic is really perfect and performs the same theory as you describe
> above.  And by following this solving pattern, I solved this problem
> ,really appreciate of your advice. thanks you so much!
>
> On Thu, Jun 21, 2018 at 9:13 AM weijie tong <to...@gmail.com>
> wrote:
>
> > I also think this is a common problem to the case that the receiver has
> no
> > chance to sent out a ack reply, maybe it died, causing the query to
> > exception over. Then it will cause the allocator thread’s cleanup method
> to
> > complain about the memory leak as the sender thread’s exception solving
> > logic to release the ByteBuf maybe happed behind the allocator thread.
> >
> >
> > On Thu, Jun 21, 2018 at 8:51 AM weijie tong <to...@gmail.com>
> > wrote:
> >
> >> Hi Parth:
> >>
> >>     Thanks for your reply. Your detail description explain that problem
> >> clearly. This problem is not a common case. The bloom filter has not
> been
> >> sent out while the query has completed. I meet this exception while open
> >> the bloom filter option to run all the join related test cases, then it
> >> happen at TestHashJoin.simpleEqualityJoin() . Btw, the BloomFilter was
> >> sent out through the data tunnel not the control one.   To partitioned
> >> broadcast , the bloom filter will send out from each HashJoin node to
> the
> >> foreman node . The foreman node will wait for all the bloom filter
> which it
> >> known at the plan stage to come until the timeout occur. Once the
> foreman
> >> node received all the bloom filter ,it will aggregate them then
> broadcast
> >> them to all the probe side scan nodes. The design purpose is that all
> the
> >> origin flow will not blocked by the runtime bloom filter flow . The
> bloom
> >> filter sending, receiving , applying behaviors are all async ,just a
> >> helpful behavior to the original execution flow.
> >>
> >>
> >>      First, not all the HashJoin scenarios is possible to push down the
> >> join predicate such as the both sides has similar row numbers. I will
> add
> >> some check according the cost at the plan stage to prevent this happen
> so
> >> the exception scenario will happen less.
> >>
> >>      Send, I think the exception scenario still has to prevent to let
> the
> >> system robust.  Your suggestion to add a synchronization between (3)
> >> ,(4) and (6)  is good. But the question is that the corresponding
> receiving
> >> side has completed, it has no chance to give a reply ack. Maybe some
> other
> >> special timeout Ack was needed to take the lost role to let the Thread1
> to
> >> wait while the sending out behavior failed.     To this test case , the
> >> hash join node is the last fragment, and it has few data to complete its
> >> query.
> >>
> >>     As normal execution flow has worked out, I will share the dev branch
> >> soon by fix some trivial things. Still need suggestions to this problem.
> >>
> >>
> >>
> >> On Thu, Jun 21, 2018 at 6:44 AM Parth Chandra <pa...@apache.org>
> wrote:
> >>
> >>> Hi Weijie
> >>>
> >>>   It would also help to understand the flow of control that your design
> >>> uses. I've put a screenshot of a query profile here :
> >>>
> >>> https://docs.google.com/document/d/1DgAbGovEWV6rZ4GvioJz5Twe_
> m5o1ADoBpbqAzqa_aU/edit?usp=sharing
> >>>
> >>>   Looking at the subset under [ Hash Join 06-01 ],  can you annotate
> >>> and/or
> >>> explain how you see the control messages flowing? Also, are you using
> the
> >>> control channel to send the bloom filter?
> >>>
> >>> Parth
> >>>
> >>> On Wed, Jun 20, 2018 at 3:28 PM, Parth Chandra <pa...@apache.org>
> >>> wrote:
> >>>
> >>> > Hi Weijie,
> >>> >   This is a tricky problem. So let me first summarize how this should
> >>> be
> >>> > behaving -
> >>> >
> >>> >         Thread 1                     |     Thread 2
> >>> >   -----------------------------------+------------------------
> >>> > ------------------
> >>> > 1)  Allocate DrillBuf                |
> >>> > 2)  Pass memory to RPC layer         |   get reference to DrillBuf
> >>> > (refcount == 2)
> >>> > 3)                                   |   queue up the send (async)
> >>> > 4)                                   |   Send and release DrillBuf
> >>> > (refcount == 1)
> >>> > 5)  Continue to end of query         |
> >>> > 6)  Cleanup (release DrillBuf)       |
> >>> > 7)  Close Allocator (refcount of     |
> >>> >       DrillBuf *must* be zero)       |
> >>> >
> >>> > In your case, steps 3 and 4 are occurring after step 7 which is
> natural
> >>> > since the RPC send is async, but that is what we have to prevent. The
> >>> only
> >>> > way to do that is to have some synchronization between steps (3),
> (4),
> >>> and
> >>> > (6) such that (6) only happens after (4). With RPC the way to do so
> is
> >>> to
> >>> > require an ack.
> >>> >
> >>> >
> >>> >           Thread 1                     |     Thread 2  (Netty)
> >>> >          |   Thread 3  (foreman)
> >>> >     -----------------------------------+------------------------
> >>> > --------------------+------------------------
> >>> > 1)    Allocate DrillBuf                |
> >>> >          |
> >>> > 2)    Pass memory to RPC layer         |   get reference to DrillBuf
> >>> > (refcount == 2)|
> >>> > 3)                                     |   queue up the send (async)
> >>> >          |
> >>> > 4)                                     |   Send and release DrillBuf
> >>> > (refcount == 1)|
> >>> > 4.1)                                   |
> >>> >          |  Recv msg, send back Ack (The RPC layer
> >>> >                                        |
> >>> >          |    automatically does this)
> >>> > 4.2)  Check if Ack received            |
> >>> > 5)    Continue to end of query         |
> >>> > 6)    Cleanup (release DrillBuf)       |
> >>> > 7)    Close Allocator (refcount of     |
> >>> >         DrillBuf *must* be zero)       |
> >>> >
> >>> > Note that (4.2) does not have to complete before (5), only before (6)
> >>> for
> >>> > the memory to be released.
> >>> >
> >>> > One question I have is how the query completed without the Bloom
> Filter
> >>> > reaching its destination. How does the destination fragment know when
> >>> it
> >>> > has to wait for the Bloom Filter? I suspect this may be more
> >>> > complicated than it appears at first glance.
> >>> >
> >>> > Not sure if this helps narrow it down. If you can share a dev branch
> we
> >>> > can help take a look.
> >>> >
> >>> >
> >>> >
> >>> > On Tue, Jun 19, 2018 at 8:35 PM, weijie tong <
> tongweijie178@gmail.com>
> >>> > wrote:
> >>> >
> >>> >> HI:
> >>> >>    I faced a complicated problem by releasing the BloomFilter's
> direct
> >>> >> memory at some special cases. Hope someone could give some advices.
> >>> >>
> >>> >>    Say, one join node sends out BloomFilter to the foreman
> >>> >> node(TestHashJoin.simpleEqualityJoin() ) .  The sending thread is
> >>> netty's
> >>> >> BitClient. The BloomFilter's direct memory is allocated by another
> >>> thread
> >>> >> allocator (i.e. the HashJoin fragment's allocator).  Once the
> fragment
> >>> >> completes quickly. Then its corresponding close logic will check the
> >>> >> allocator's memory assignment. But the async sender thread has not
> >>> sent
> >>> >> out
> >>> >> the BloomFilter to release the corresponding direct ByteBuffer as
> the
> >>> >> query
> >>> >> has completed quickly , the wire has closed. Then the  corresponding
> >>> >> fragment's close logic will throw exception to complain about the
> >>> memory
> >>> >> leak.
> >>> >>
> >>> >>     So I want to know how to release the allocated direct ByteBuffer
> >>> at
> >>> >> such case .
> >>> >>
> >>> >>
> >>> >>    The exception is :
> >>> >>
> >>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on
> 10.15.235.86:31010
> >>> ]
> >>> >> at
> >>> >> org.apache.drill.common.exceptions.UserException$Builder.
> >>> >> build(UserException.java:633)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFin
> >>> >> alState(FragmentExecutor.java:359)
> >>> >> [classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
> >>> >> (FragmentExecutor.java:214)
> >>> >> [classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.run(Fra
> >>> >> gmentExecutor.java:325)
> >>> >> [classes/:na]
> >>> >> at
> >>> >> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleanin
> >>> >> gRunnable.java:38)
> >>> >> [classes/:na]
> >>> >> at
> >>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> >>> >> Executor.java:1149)
> >>> >> [na:1.8.0_161]
> >>> >> at
> >>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> >>> >> lExecutor.java:624)
> >>> >> [na:1.8.0_161]
> >>> >> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
> >>> >> Caused by: java.lang.IllegalStateException: Allocator[frag:0:0]
> closed
> >>> >> with
> >>> >> outstanding buffers allocated (1).
> >>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >>> >> (res/actual/peak/limit)
> >>> >>   child allocators: 0
> >>> >>   ledgers: 1
> >>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size:
> 16777216,
> >>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
> >>> life:
> >>> >> 505919927378312..0] holds 2 buffers.
> >>> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>> >>   reservations: 0
> >>> >>
> >>> >> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
> >>> >> or.java:503)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.ops.FragmentContextImpl.suppressingClo
> >>> >> se(FragmentContextImpl.java:484)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.ops.FragmentContextImpl.close(Fragment
> >>> >> ContextImpl.java:478)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.closeOu
> >>> >> tResources(FragmentExecutor.java:382)
> >>> >> [classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
> >>> >> (FragmentExecutor.java:209)
> >>> >> [classes/:na]
> >>> >> ... 5 common frames omitted
> >>> >> =====================fragment:0 : 0done!
> >>> >> *************************receive a bloom filter**********
> >>> >> ************received a bloom filter
> >>> >> 11:00:41.587 [main] ERROR o.a.d.exec.server.BootStrapContext -
> Error
> >>> >> while
> >>> >> closing
> >>> >> java.lang.IllegalStateException: Allocator[ROOT] closed with
> >>> outstanding
> >>> >> child allocators.
> >>> >> Allocator(ROOT) 0/16777216/55640064/4294967296
> (res/actual/peak/limit)
> >>> >>   child allocators: 1
> >>> >>     Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >>> >> (res/actual/peak/limit)
> >>> >>       child allocators: 0
> >>> >>       ledgers: 1
> >>> >>         ledger[6268] allocator: frag:0:0), isOwning: true, size:
> >>> 16777216,
> >>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
> >>> life:
> >>> >> 505919927378312..0] holds 2 buffers.
> >>> >>             DrillBuf[10198], udle: [6051 0..16777216]
> >>> >>             DrillBuf[10208], udle: [6051 0..16777216]
> >>> >>       reservations: 0
> >>> >>   ledgers: 0
> >>> >>   reservations: 0
> >>> >>
> >>> >> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
> >>> >> or.java:496)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
> >>> >> [classes/:na]
> >>> >> at
> >>> org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
> >>> >> [classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.server.BootStrapContext.close(BootStra
> >>> >> pContext.java:259)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
> >>> >> [classes/:na]
> >>> >> at
> >>> org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
> >>> >> [classes/:na]
> >>> >> at org.apache.drill.exec.server.Drillbit.close(Drillbit.java:263)
> >>> >> [classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
> >>> >> EqualityJoin(TestHashJoin.java:147)
> >>> >> [test-classes/:na]
> >>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>> >> ~[na:1.8.0_161]
> >>> >> at
> >>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
> >>> >> ssorImpl.java:62)
> >>> >> ~[na:1.8.0_161]
> >>> >> at
> >>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
> >>> >> thodAccessorImpl.java:43)
> >>> >> ~[na:1.8.0_161]
> >>> >> at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
> >>> >> at
> >>> >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> >>> >> FrameworkMethod.java:50)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.internal.runners.model.ReflectiveCallable.run(Refl
> >>> >> ectiveCallable.java:12)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
> >>> >> ameworkMethod.java:47)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
> >>> >> .executeTestMethod(JUnit4TestRunnerDecorator.java:154)
> >>> >> [jmockit-1.39.jar:1.39]
> >>> >> at
> >>> >> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
> >>> >> .invokeExplosively(JUnit4TestRunnerDecorator.java:70)
> >>> >> [jmockit-1.39.jar:1.39]
> >>> >> at
> >>> >> mockit.integration.junit4.internal.FakeFrameworkMethod.invok
> >>> >> eExplosively(FakeFrameworkMethod.java:34)
> >>> >> [jmockit-1.39.jar:1.39]
> >>> >> at
> >>> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
> >>> >> ameworkMethod.java)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.internal.runners.statements.InvokeMethod.evaluate(
> >>> >> InvokeMethod.java:17)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.internal.runners.statements.RunBefores.evaluate(
> >>> >> RunBefores.java:26)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.internal.runners.statements.RunAfters.evaluate(Run
> >>> >> Afters.java:27)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.rules.ExpectedException$ExpectedExceptionStatement
> >>> >> .evaluate(ExpectedException.java:239)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
> >>> >> 4ClassRunner.java:78)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
> >>> >> 4ClassRunner.java:57)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.runners.ParentRunner.runChildren(
> ParentRunner.java:288)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.internal.runners.statements.RunBefores.evaluate(
> >>> >> RunBefores.java:26)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> org.junit.internal.runners.statements.RunAfters.evaluate(Run
> >>> >> Afters.java:27)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> >>> >> [junit-4.12.jar:4.12]
> >>> >> at
> >>> >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs
> >>> >> (JUnit4IdeaTestRunner.java:68)
> >>> >> [junit-rt.jar:na]
> >>> >> at
> >>> >> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.star
> >>> >> tRunnerWithArgs(IdeaTestRunner.java:47)
> >>> >> [junit-rt.jar:na]
> >>> >> at
> >>> >> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsA
> >>> >> ndStart(JUnitStarter.java:242)
> >>> >> [junit-rt.jar:na]
> >>> >> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStart
> >>> >> er.java:70)
> >>> >> [junit-rt.jar:na]
> >>> >> 11:00:41.593 [main] ERROR org.apache.drill.TestReporter - Test
> Failed
> >>> (d:
> >>> >> 0
> >>> >> B(1 B), h: -360.8 MiB(52.3 MiB), nh: 3.2 MiB(88.4 MiB)):
> >>> >>
> >>> simpleEqualityJoin(org.apache.drill.exec.physical.impl.join.
> TestHashJoin)
> >>> >> org.apache.drill.exec.rpc.RpcException:
> >>> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM
> ERROR:
> >>> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
> >>> buffers
> >>> >> allocated (1).
> >>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >>> >> (res/actual/peak/limit)
> >>> >>   child allocators: 0
> >>> >>   ledgers: 1
> >>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size:
> 16777216,
> >>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
> >>> life:
> >>> >> 505919927378312..0] holds 2 buffers.
> >>> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>> >>   reservations: 0
> >>> >>
> >>> >>
> >>> >> Fragment 0:0
> >>> >>
> >>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on
> 10.15.235.86:31010
> >>> ]
> >>> >> at
> >>> >>
> >>> org.apache.drill.exec.rpc.RpcException.mapException(
> RpcException.java:60)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
> >>> >> istener.getResults(DrillClient.java:881)
> >>> >> ~[classes/:na]
> >>> >> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
> >>> >> t.java:583)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
> >>> >> EqualityJoin(TestHashJoin.java:119)
> >>> >> ~[test-classes/:na]
> >>> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM
> ERROR:
> >>> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
> >>> buffers
> >>> >> allocated (1).
> >>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >>> >> (res/actual/peak/limit)
> >>> >>   child allocators: 0
> >>> >>   ledgers: 1
> >>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size:
> 16777216,
> >>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
> >>> life:
> >>> >> 505919927378312..0] holds 2 buffers.
> >>> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>> >>   reservations: 0
> >>> >>
> >>> >>
> >>> >> Fragment 0:0
> >>> >>
> >>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on
> 10.15.235.86:31010
> >>> ]
> >>> >> at
> >>> >> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
> >>> >> ved(QueryResultHandler.java:123)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
> >>> >> ~[classes/:na]
> >>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
> >>> >> s.java:275)
> >>> >> ~[classes/:na]
> >>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
> >>> >> s.java:245)
> >>> >> ~[classes/:na]
> >>> >> at
> >>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
> >>> >> essageToMessageDecoder.java:88)
> >>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
> >>> >> ateHandler.java:287)
> >>> >> ~[netty-handler-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
> >>> >> essageToMessageDecoder.java:102)
> >>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
> >>> >> ByteToMessageDecoder.java:312)
> >>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
> >>> >> ToMessageDecoder.java:286)
> >>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
> >>> >> annelInboundHandlerAdapter.java:86)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.DefaultChannelPipeline$HeadContext.
> >>> >> channelRead(DefaultChannelPipeline.java:1294)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
> >>> >> ultChannelPipeline.java:911)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
> >>> >> read(AbstractNioByteChannel.java:131)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
> >>> >> tLoop.java:645)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
> >>> >> ed(NioEventLoop.java:580)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
> >>> >> ntLoop.java:497)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> >>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at
> >>> >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
> >>> >> SingleThreadEventExecutor.java:131)
> >>> >> ~[netty-common-4.0.48.Final.jar:4.0.48.Final]
> >>> >> at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
> >>> >>
> >>> >> org.apache.drill.exec.rpc.RpcException:
> >>> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM
> ERROR:
> >>> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
> >>> buffers
> >>> >> allocated (1).
> >>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >>> >> (res/actual/peak/limit)
> >>> >>   child allocators: 0
> >>> >>   ledgers: 1
> >>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size:
> 16777216,
> >>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
> >>> life:
> >>> >> 505919927378312..0] holds 2 buffers.
> >>> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>> >>   reservations: 0
> >>> >>
> >>> >>
> >>> >> Fragment 0:0
> >>> >>
> >>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on
> 10.15.235.86:31010
> >>> ]
> >>> >>
> >>> >> at org.apache.drill.exec.rpc.RpcException.mapException(RpcExcep
> >>> >> tion.java:60)
> >>> >> at
> >>> >> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
> >>> >> istener.getResults(DrillClient.java:881)
> >>> >> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
> >>> >> t.java:583)
> >>> >> at
> >>> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
> >>> >> EqualityJoin(TestHashJoin.java:119)
> >>> >> Caused by: org.apache.drill.common.exceptions.UserRemoteException:
> >>> SYSTEM
> >>> >> ERROR: IllegalStateException: Allocator[frag:0:0] closed with
> >>> outstanding
> >>> >> buffers allocated (1).
> >>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >>> >> (res/actual/peak/limit)
> >>> >>   child allocators: 0
> >>> >>   ledgers: 1
> >>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size:
> 16777216,
> >>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
> >>> life:
> >>> >> 505919927378312..0] holds 2 buffers.
> >>> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>> >>   reservations: 0
> >>> >>
> >>> >>
> >>> >> Fragment 0:0
> >>> >>
> >>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on
> 10.15.235.86:31010
> >>> ]
> >>> >> at
> >>> >> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
> >>> >> ved(QueryResultHandler.java:123)
> >>> >> at
> >>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
> >>> >> at
> >>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
> >>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
> >>> >> s.java:275)
> >>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
> >>> >> s.java:245)
> >>> >> at
> >>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
> >>> >> essageToMessageDecoder.java:88)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> at
> >>> >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
> >>> >> ateHandler.java:287)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> at
> >>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
> >>> >> essageToMessageDecoder.java:102)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> at
> >>> >> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
> >>> >> ByteToMessageDecoder.java:312)
> >>> >> at
> >>> >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
> >>> >> ToMessageDecoder.java:286)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> at
> >>> >> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
> >>> >> annelInboundHandlerAdapter.java:86)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >>> >> ad(AbstractChannelHandlerContext.java:335)
> >>> >> at
> >>> >> io.netty.channel.DefaultChannelPipeline$HeadContext.
> >>> >> channelRead(DefaultChannelPipeline.java:1294)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:356)
> >>> >> at
> >>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >>> >> Read(AbstractChannelHandlerContext.java:342)
> >>> >> at
> >>> >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
> >>> >> ultChannelPipeline.java:911)
> >>> >> at
> >>> >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
> >>> >> read(AbstractNioByteChannel.java:131)
> >>> >> at
> >>> >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
> >>> >> tLoop.java:645)
> >>> >> at
> >>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
> >>> >> ed(NioEventLoop.java:580)
> >>> >> at
> >>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
> >>> >> ntLoop.java:497)
> >>> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> >>> >> at
> >>> >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
> >>> >> SingleThreadEventExecutor.java:131)
> >>> >> at java.lang.Thread.run(Thread.java:748)
> >>> >>
> >>> >
> >>> >
> >>>
> >>
>

Re: how to release allocated ByteBuf which steps across two threads

Posted by weijie tong <to...@gmail.com>.
Hi Parth:

   thanks for your suggestion . After deeply investigate in what Drill has
implemented ,precisely say the ```FragmentContextImpl.sendingAccountor```
and ```FragmentContextImol. waitForSendComplete() ```  , I found that the
logic is really perfect and performs the same theory as you describe
above.  And by following this solving pattern, I solved this problem
,really appreciate of your advice. thanks you so much!

On Thu, Jun 21, 2018 at 9:13 AM weijie tong <to...@gmail.com> wrote:

> I also think this is a common problem to the case that the receiver has no
> chance to sent out a ack reply, maybe it died, causing the query to
> exception over. Then it will cause the allocator thread’s cleanup method to
> complain about the memory leak as the sender thread’s exception solving
> logic to release the ByteBuf maybe happed behind the allocator thread.
>
>
> On Thu, Jun 21, 2018 at 8:51 AM weijie tong <to...@gmail.com>
> wrote:
>
>> Hi Parth:
>>
>>     Thanks for your reply. Your detail description explain that problem
>> clearly. This problem is not a common case. The bloom filter has not been
>> sent out while the query has completed. I meet this exception while open
>> the bloom filter option to run all the join related test cases, then it
>> happen at TestHashJoin.simpleEqualityJoin() . Btw, the BloomFilter was
>> sent out through the data tunnel not the control one.   To partitioned
>> broadcast , the bloom filter will send out from each HashJoin node to the
>> foreman node . The foreman node will wait for all the bloom filter which it
>> known at the plan stage to come until the timeout occur. Once the foreman
>> node received all the bloom filter ,it will aggregate them then broadcast
>> them to all the probe side scan nodes. The design purpose is that all the
>> origin flow will not blocked by the runtime bloom filter flow . The bloom
>> filter sending, receiving , applying behaviors are all async ,just a
>> helpful behavior to the original execution flow.
>>
>>
>>      First, not all the HashJoin scenarios is possible to push down the
>> join predicate such as the both sides has similar row numbers. I will add
>> some check according the cost at the plan stage to prevent this happen so
>> the exception scenario will happen less.
>>
>>      Send, I think the exception scenario still has to prevent to let the
>> system robust.  Your suggestion to add a synchronization between (3)
>> ,(4) and (6)  is good. But the question is that the corresponding receiving
>> side has completed, it has no chance to give a reply ack. Maybe some other
>> special timeout Ack was needed to take the lost role to let the Thread1 to
>> wait while the sending out behavior failed.     To this test case , the
>> hash join node is the last fragment, and it has few data to complete its
>> query.
>>
>>     As normal execution flow has worked out, I will share the dev branch
>> soon by fix some trivial things. Still need suggestions to this problem.
>>
>>
>>
>> On Thu, Jun 21, 2018 at 6:44 AM Parth Chandra <pa...@apache.org> wrote:
>>
>>> Hi Weijie
>>>
>>>   It would also help to understand the flow of control that your design
>>> uses. I've put a screenshot of a query profile here :
>>>
>>> https://docs.google.com/document/d/1DgAbGovEWV6rZ4GvioJz5Twe_m5o1ADoBpbqAzqa_aU/edit?usp=sharing
>>>
>>>   Looking at the subset under [ Hash Join 06-01 ],  can you annotate
>>> and/or
>>> explain how you see the control messages flowing? Also, are you using the
>>> control channel to send the bloom filter?
>>>
>>> Parth
>>>
>>> On Wed, Jun 20, 2018 at 3:28 PM, Parth Chandra <pa...@apache.org>
>>> wrote:
>>>
>>> > Hi Weijie,
>>> >   This is a tricky problem. So let me first summarize how this should
>>> be
>>> > behaving -
>>> >
>>> >         Thread 1                     |     Thread 2
>>> >   -----------------------------------+------------------------
>>> > ------------------
>>> > 1)  Allocate DrillBuf                |
>>> > 2)  Pass memory to RPC layer         |   get reference to DrillBuf
>>> > (refcount == 2)
>>> > 3)                                   |   queue up the send (async)
>>> > 4)                                   |   Send and release DrillBuf
>>> > (refcount == 1)
>>> > 5)  Continue to end of query         |
>>> > 6)  Cleanup (release DrillBuf)       |
>>> > 7)  Close Allocator (refcount of     |
>>> >       DrillBuf *must* be zero)       |
>>> >
>>> > In your case, steps 3 and 4 are occurring after step 7 which is natural
>>> > since the RPC send is async, but that is what we have to prevent. The
>>> only
>>> > way to do that is to have some synchronization between steps (3), (4),
>>> and
>>> > (6) such that (6) only happens after (4). With RPC the way to do so is
>>> to
>>> > require an ack.
>>> >
>>> >
>>> >           Thread 1                     |     Thread 2  (Netty)
>>> >          |   Thread 3  (foreman)
>>> >     -----------------------------------+------------------------
>>> > --------------------+------------------------
>>> > 1)    Allocate DrillBuf                |
>>> >          |
>>> > 2)    Pass memory to RPC layer         |   get reference to DrillBuf
>>> > (refcount == 2)|
>>> > 3)                                     |   queue up the send (async)
>>> >          |
>>> > 4)                                     |   Send and release DrillBuf
>>> > (refcount == 1)|
>>> > 4.1)                                   |
>>> >          |  Recv msg, send back Ack (The RPC layer
>>> >                                        |
>>> >          |    automatically does this)
>>> > 4.2)  Check if Ack received            |
>>> > 5)    Continue to end of query         |
>>> > 6)    Cleanup (release DrillBuf)       |
>>> > 7)    Close Allocator (refcount of     |
>>> >         DrillBuf *must* be zero)       |
>>> >
>>> > Note that (4.2) does not have to complete before (5), only before (6)
>>> for
>>> > the memory to be released.
>>> >
>>> > One question I have is how the query completed without the Bloom Filter
>>> > reaching its destination. How does the destination fragment know when
>>> it
>>> > has to wait for the Bloom Filter? I suspect this may be more
>>> > complicated than it appears at first glance.
>>> >
>>> > Not sure if this helps narrow it down. If you can share a dev branch we
>>> > can help take a look.
>>> >
>>> >
>>> >
>>> > On Tue, Jun 19, 2018 at 8:35 PM, weijie tong <to...@gmail.com>
>>> > wrote:
>>> >
>>> >> HI:
>>> >>    I faced a complicated problem by releasing the BloomFilter's direct
>>> >> memory at some special cases. Hope someone could give some advices.
>>> >>
>>> >>    Say, one join node sends out BloomFilter to the foreman
>>> >> node(TestHashJoin.simpleEqualityJoin() ) .  The sending thread is
>>> netty's
>>> >> BitClient. The BloomFilter's direct memory is allocated by another
>>> thread
>>> >> allocator (i.e. the HashJoin fragment's allocator).  Once the fragment
>>> >> completes quickly. Then its corresponding close logic will check the
>>> >> allocator's memory assignment. But the async sender thread has not
>>> sent
>>> >> out
>>> >> the BloomFilter to release the corresponding direct ByteBuffer as the
>>> >> query
>>> >> has completed quickly , the wire has closed. Then the  corresponding
>>> >> fragment's close logic will throw exception to complain about the
>>> memory
>>> >> leak.
>>> >>
>>> >>     So I want to know how to release the allocated direct ByteBuffer
>>> at
>>> >> such case .
>>> >>
>>> >>
>>> >>    The exception is :
>>> >>
>>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010
>>> ]
>>> >> at
>>> >> org.apache.drill.common.exceptions.UserException$Builder.
>>> >> build(UserException.java:633)
>>> >> ~[classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFin
>>> >> alState(FragmentExecutor.java:359)
>>> >> [classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
>>> >> (FragmentExecutor.java:214)
>>> >> [classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.run(Fra
>>> >> gmentExecutor.java:325)
>>> >> [classes/:na]
>>> >> at
>>> >> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleanin
>>> >> gRunnable.java:38)
>>> >> [classes/:na]
>>> >> at
>>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> >> Executor.java:1149)
>>> >> [na:1.8.0_161]
>>> >> at
>>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> >> lExecutor.java:624)
>>> >> [na:1.8.0_161]
>>> >> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
>>> >> Caused by: java.lang.IllegalStateException: Allocator[frag:0:0] closed
>>> >> with
>>> >> outstanding buffers allocated (1).
>>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>>> >> (res/actual/peak/limit)
>>> >>   child allocators: 0
>>> >>   ledgers: 1
>>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
>>> life:
>>> >> 505919927378312..0] holds 2 buffers.
>>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>>> >>   reservations: 0
>>> >>
>>> >> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
>>> >> or.java:503)
>>> >> ~[classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.ops.FragmentContextImpl.suppressingClo
>>> >> se(FragmentContextImpl.java:484)
>>> >> ~[classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.ops.FragmentContextImpl.close(Fragment
>>> >> ContextImpl.java:478)
>>> >> ~[classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.closeOu
>>> >> tResources(FragmentExecutor.java:382)
>>> >> [classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
>>> >> (FragmentExecutor.java:209)
>>> >> [classes/:na]
>>> >> ... 5 common frames omitted
>>> >> =====================fragment:0 : 0done!
>>> >> *************************receive a bloom filter**********
>>> >> ************received a bloom filter
>>> >> 11:00:41.587 [main] ERROR o.a.d.exec.server.BootStrapContext - Error
>>> >> while
>>> >> closing
>>> >> java.lang.IllegalStateException: Allocator[ROOT] closed with
>>> outstanding
>>> >> child allocators.
>>> >> Allocator(ROOT) 0/16777216/55640064/4294967296 (res/actual/peak/limit)
>>> >>   child allocators: 1
>>> >>     Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>>> >> (res/actual/peak/limit)
>>> >>       child allocators: 0
>>> >>       ledgers: 1
>>> >>         ledger[6268] allocator: frag:0:0), isOwning: true, size:
>>> 16777216,
>>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
>>> life:
>>> >> 505919927378312..0] holds 2 buffers.
>>> >>             DrillBuf[10198], udle: [6051 0..16777216]
>>> >>             DrillBuf[10208], udle: [6051 0..16777216]
>>> >>       reservations: 0
>>> >>   ledgers: 0
>>> >>   reservations: 0
>>> >>
>>> >> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
>>> >> or.java:496)
>>> >> ~[classes/:na]
>>> >> at
>>> org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
>>> >> [classes/:na]
>>> >> at
>>> org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
>>> >> [classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.server.BootStrapContext.close(BootStra
>>> >> pContext.java:259)
>>> >> ~[classes/:na]
>>> >> at
>>> org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
>>> >> [classes/:na]
>>> >> at
>>> org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
>>> >> [classes/:na]
>>> >> at org.apache.drill.exec.server.Drillbit.close(Drillbit.java:263)
>>> >> [classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
>>> >> EqualityJoin(TestHashJoin.java:147)
>>> >> [test-classes/:na]
>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> >> ~[na:1.8.0_161]
>>> >> at
>>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> >> ssorImpl.java:62)
>>> >> ~[na:1.8.0_161]
>>> >> at
>>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> >> thodAccessorImpl.java:43)
>>> >> ~[na:1.8.0_161]
>>> >> at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
>>> >> at
>>> >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
>>> >> FrameworkMethod.java:50)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.internal.runners.model.ReflectiveCallable.run(Refl
>>> >> ectiveCallable.java:12)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>>> >> ameworkMethod.java:47)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
>>> >> .executeTestMethod(JUnit4TestRunnerDecorator.java:154)
>>> >> [jmockit-1.39.jar:1.39]
>>> >> at
>>> >> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
>>> >> .invokeExplosively(JUnit4TestRunnerDecorator.java:70)
>>> >> [jmockit-1.39.jar:1.39]
>>> >> at
>>> >> mockit.integration.junit4.internal.FakeFrameworkMethod.invok
>>> >> eExplosively(FakeFrameworkMethod.java:34)
>>> >> [jmockit-1.39.jar:1.39]
>>> >> at
>>> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>>> >> ameworkMethod.java)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.internal.runners.statements.InvokeMethod.evaluate(
>>> >> InvokeMethod.java:17)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.internal.runners.statements.RunBefores.evaluate(
>>> >> RunBefores.java:26)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.internal.runners.statements.RunAfters.evaluate(Run
>>> >> Afters.java:27)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.rules.ExpectedException$ExpectedExceptionStatement
>>> >> .evaluate(ExpectedException.java:239)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>>> >> 4ClassRunner.java:78)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>>> >> 4ClassRunner.java:57)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.internal.runners.statements.RunBefores.evaluate(
>>> >> RunBefores.java:26)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> org.junit.internal.runners.statements.RunAfters.evaluate(Run
>>> >> Afters.java:27)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>> >> [junit-4.12.jar:4.12]
>>> >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>>> >> [junit-4.12.jar:4.12]
>>> >> at
>>> >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs
>>> >> (JUnit4IdeaTestRunner.java:68)
>>> >> [junit-rt.jar:na]
>>> >> at
>>> >> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.star
>>> >> tRunnerWithArgs(IdeaTestRunner.java:47)
>>> >> [junit-rt.jar:na]
>>> >> at
>>> >> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsA
>>> >> ndStart(JUnitStarter.java:242)
>>> >> [junit-rt.jar:na]
>>> >> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStart
>>> >> er.java:70)
>>> >> [junit-rt.jar:na]
>>> >> 11:00:41.593 [main] ERROR org.apache.drill.TestReporter - Test Failed
>>> (d:
>>> >> 0
>>> >> B(1 B), h: -360.8 MiB(52.3 MiB), nh: 3.2 MiB(88.4 MiB)):
>>> >>
>>> simpleEqualityJoin(org.apache.drill.exec.physical.impl.join.TestHashJoin)
>>> >> org.apache.drill.exec.rpc.RpcException:
>>> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
>>> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
>>> buffers
>>> >> allocated (1).
>>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>>> >> (res/actual/peak/limit)
>>> >>   child allocators: 0
>>> >>   ledgers: 1
>>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
>>> life:
>>> >> 505919927378312..0] holds 2 buffers.
>>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>>> >>   reservations: 0
>>> >>
>>> >>
>>> >> Fragment 0:0
>>> >>
>>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010
>>> ]
>>> >> at
>>> >>
>>> org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:60)
>>> >> ~[classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
>>> >> istener.getResults(DrillClient.java:881)
>>> >> ~[classes/:na]
>>> >> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
>>> >> t.java:583)
>>> >> ~[classes/:na]
>>> >> at
>>> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
>>> >> EqualityJoin(TestHashJoin.java:119)
>>> >> ~[test-classes/:na]
>>> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
>>> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
>>> buffers
>>> >> allocated (1).
>>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>>> >> (res/actual/peak/limit)
>>> >>   child allocators: 0
>>> >>   ledgers: 1
>>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
>>> life:
>>> >> 505919927378312..0] holds 2 buffers.
>>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>>> >>   reservations: 0
>>> >>
>>> >>
>>> >> Fragment 0:0
>>> >>
>>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010
>>> ]
>>> >> at
>>> >> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
>>> >> ved(QueryResultHandler.java:123)
>>> >> ~[classes/:na]
>>> >> at
>>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
>>> >> ~[classes/:na]
>>> >> at
>>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
>>> >> ~[classes/:na]
>>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>>> >> s.java:275)
>>> >> ~[classes/:na]
>>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>>> >> s.java:245)
>>> >> ~[classes/:na]
>>> >> at
>>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>>> >> essageToMessageDecoder.java:88)
>>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
>>> >> ateHandler.java:287)
>>> >> ~[netty-handler-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>>> >> essageToMessageDecoder.java:102)
>>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
>>> >> ByteToMessageDecoder.java:312)
>>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
>>> >> ToMessageDecoder.java:286)
>>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
>>> >> annelInboundHandlerAdapter.java:86)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.DefaultChannelPipeline$HeadContext.
>>> >> channelRead(DefaultChannelPipeline.java:1294)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
>>> >> ultChannelPipeline.java:911)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
>>> >> read(AbstractNioByteChannel.java:131)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>>> >> tLoop.java:645)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
>>> >> ed(NioEventLoop.java:580)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
>>> >> ntLoop.java:497)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>>> >> at
>>> >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
>>> >> SingleThreadEventExecutor.java:131)
>>> >> ~[netty-common-4.0.48.Final.jar:4.0.48.Final]
>>> >> at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
>>> >>
>>> >> org.apache.drill.exec.rpc.RpcException:
>>> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
>>> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
>>> buffers
>>> >> allocated (1).
>>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>>> >> (res/actual/peak/limit)
>>> >>   child allocators: 0
>>> >>   ledgers: 1
>>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
>>> life:
>>> >> 505919927378312..0] holds 2 buffers.
>>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>>> >>   reservations: 0
>>> >>
>>> >>
>>> >> Fragment 0:0
>>> >>
>>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010
>>> ]
>>> >>
>>> >> at org.apache.drill.exec.rpc.RpcException.mapException(RpcExcep
>>> >> tion.java:60)
>>> >> at
>>> >> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
>>> >> istener.getResults(DrillClient.java:881)
>>> >> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
>>> >> t.java:583)
>>> >> at
>>> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
>>> >> EqualityJoin(TestHashJoin.java:119)
>>> >> Caused by: org.apache.drill.common.exceptions.UserRemoteException:
>>> SYSTEM
>>> >> ERROR: IllegalStateException: Allocator[frag:0:0] closed with
>>> outstanding
>>> >> buffers allocated (1).
>>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>>> >> (res/actual/peak/limit)
>>> >>   child allocators: 0
>>> >>   ledgers: 1
>>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050,
>>> life:
>>> >> 505919927378312..0] holds 2 buffers.
>>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>>> >>   reservations: 0
>>> >>
>>> >>
>>> >> Fragment 0:0
>>> >>
>>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010
>>> ]
>>> >> at
>>> >> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
>>> >> ved(QueryResultHandler.java:123)
>>> >> at
>>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
>>> >> at
>>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
>>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>>> >> s.java:275)
>>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>>> >> s.java:245)
>>> >> at
>>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>>> >> essageToMessageDecoder.java:88)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> at
>>> >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
>>> >> ateHandler.java:287)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> at
>>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>>> >> essageToMessageDecoder.java:102)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> at
>>> >> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
>>> >> ByteToMessageDecoder.java:312)
>>> >> at
>>> >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
>>> >> ToMessageDecoder.java:286)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> at
>>> >> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
>>> >> annelInboundHandlerAdapter.java:86)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>>> >> ad(AbstractChannelHandlerContext.java:335)
>>> >> at
>>> >> io.netty.channel.DefaultChannelPipeline$HeadContext.
>>> >> channelRead(DefaultChannelPipeline.java:1294)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:356)
>>> >> at
>>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>>> >> Read(AbstractChannelHandlerContext.java:342)
>>> >> at
>>> >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
>>> >> ultChannelPipeline.java:911)
>>> >> at
>>> >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
>>> >> read(AbstractNioByteChannel.java:131)
>>> >> at
>>> >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>>> >> tLoop.java:645)
>>> >> at
>>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
>>> >> ed(NioEventLoop.java:580)
>>> >> at
>>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
>>> >> ntLoop.java:497)
>>> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>>> >> at
>>> >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
>>> >> SingleThreadEventExecutor.java:131)
>>> >> at java.lang.Thread.run(Thread.java:748)
>>> >>
>>> >
>>> >
>>>
>>

Re: how to release allocated ByteBuf which steps across two threads

Posted by weijie tong <to...@gmail.com>.
I also think this is a common problem to the case that the receiver has no
chance to sent out a ack reply, maybe it died, causing the query to
exception over. Then it will cause the allocator thread’s cleanup method to
complain about the memory leak as the sender thread’s exception solving
logic to release the ByteBuf maybe happed behind the allocator thread.


On Thu, Jun 21, 2018 at 8:51 AM weijie tong <to...@gmail.com> wrote:

> Hi Parth:
>
>     Thanks for your reply. Your detail description explain that problem
> clearly. This problem is not a common case. The bloom filter has not been
> sent out while the query has completed. I meet this exception while open
> the bloom filter option to run all the join related test cases, then it
> happen at TestHashJoin.simpleEqualityJoin() . Btw, the BloomFilter was
> sent out through the data tunnel not the control one.   To partitioned
> broadcast , the bloom filter will send out from each HashJoin node to the
> foreman node . The foreman node will wait for all the bloom filter which it
> known at the plan stage to come until the timeout occur. Once the foreman
> node received all the bloom filter ,it will aggregate them then broadcast
> them to all the probe side scan nodes. The design purpose is that all the
> origin flow will not blocked by the runtime bloom filter flow . The bloom
> filter sending, receiving , applying behaviors are all async ,just a
> helpful behavior to the original execution flow.
>
>
>      First, not all the HashJoin scenarios is possible to push down the
> join predicate such as the both sides has similar row numbers. I will add
> some check according the cost at the plan stage to prevent this happen so
> the exception scenario will happen less.
>
>      Send, I think the exception scenario still has to prevent to let the
> system robust.  Your suggestion to add a synchronization between (3) ,(4)
> and (6)  is good. But the question is that the corresponding receiving side
> has completed, it has no chance to give a reply ack. Maybe some other
> special timeout Ack was needed to take the lost role to let the Thread1 to
> wait while the sending out behavior failed.     To this test case , the
> hash join node is the last fragment, and it has few data to complete its
> query.
>
>     As normal execution flow has worked out, I will share the dev branch
> soon by fix some trivial things. Still need suggestions to this problem.
>
>
>
> On Thu, Jun 21, 2018 at 6:44 AM Parth Chandra <pa...@apache.org> wrote:
>
>> Hi Weijie
>>
>>   It would also help to understand the flow of control that your design
>> uses. I've put a screenshot of a query profile here :
>>
>> https://docs.google.com/document/d/1DgAbGovEWV6rZ4GvioJz5Twe_m5o1ADoBpbqAzqa_aU/edit?usp=sharing
>>
>>   Looking at the subset under [ Hash Join 06-01 ],  can you annotate
>> and/or
>> explain how you see the control messages flowing? Also, are you using the
>> control channel to send the bloom filter?
>>
>> Parth
>>
>> On Wed, Jun 20, 2018 at 3:28 PM, Parth Chandra <pa...@apache.org> wrote:
>>
>> > Hi Weijie,
>> >   This is a tricky problem. So let me first summarize how this should be
>> > behaving -
>> >
>> >         Thread 1                     |     Thread 2
>> >   -----------------------------------+------------------------
>> > ------------------
>> > 1)  Allocate DrillBuf                |
>> > 2)  Pass memory to RPC layer         |   get reference to DrillBuf
>> > (refcount == 2)
>> > 3)                                   |   queue up the send (async)
>> > 4)                                   |   Send and release DrillBuf
>> > (refcount == 1)
>> > 5)  Continue to end of query         |
>> > 6)  Cleanup (release DrillBuf)       |
>> > 7)  Close Allocator (refcount of     |
>> >       DrillBuf *must* be zero)       |
>> >
>> > In your case, steps 3 and 4 are occurring after step 7 which is natural
>> > since the RPC send is async, but that is what we have to prevent. The
>> only
>> > way to do that is to have some synchronization between steps (3), (4),
>> and
>> > (6) such that (6) only happens after (4). With RPC the way to do so is
>> to
>> > require an ack.
>> >
>> >
>> >           Thread 1                     |     Thread 2  (Netty)
>> >          |   Thread 3  (foreman)
>> >     -----------------------------------+------------------------
>> > --------------------+------------------------
>> > 1)    Allocate DrillBuf                |
>> >          |
>> > 2)    Pass memory to RPC layer         |   get reference to DrillBuf
>> > (refcount == 2)|
>> > 3)                                     |   queue up the send (async)
>> >          |
>> > 4)                                     |   Send and release DrillBuf
>> > (refcount == 1)|
>> > 4.1)                                   |
>> >          |  Recv msg, send back Ack (The RPC layer
>> >                                        |
>> >          |    automatically does this)
>> > 4.2)  Check if Ack received            |
>> > 5)    Continue to end of query         |
>> > 6)    Cleanup (release DrillBuf)       |
>> > 7)    Close Allocator (refcount of     |
>> >         DrillBuf *must* be zero)       |
>> >
>> > Note that (4.2) does not have to complete before (5), only before (6)
>> for
>> > the memory to be released.
>> >
>> > One question I have is how the query completed without the Bloom Filter
>> > reaching its destination. How does the destination fragment know when it
>> > has to wait for the Bloom Filter? I suspect this may be more
>> > complicated than it appears at first glance.
>> >
>> > Not sure if this helps narrow it down. If you can share a dev branch we
>> > can help take a look.
>> >
>> >
>> >
>> > On Tue, Jun 19, 2018 at 8:35 PM, weijie tong <to...@gmail.com>
>> > wrote:
>> >
>> >> HI:
>> >>    I faced a complicated problem by releasing the BloomFilter's direct
>> >> memory at some special cases. Hope someone could give some advices.
>> >>
>> >>    Say, one join node sends out BloomFilter to the foreman
>> >> node(TestHashJoin.simpleEqualityJoin() ) .  The sending thread is
>> netty's
>> >> BitClient. The BloomFilter's direct memory is allocated by another
>> thread
>> >> allocator (i.e. the HashJoin fragment's allocator).  Once the fragment
>> >> completes quickly. Then its corresponding close logic will check the
>> >> allocator's memory assignment. But the async sender thread has not sent
>> >> out
>> >> the BloomFilter to release the corresponding direct ByteBuffer as the
>> >> query
>> >> has completed quickly , the wire has closed. Then the  corresponding
>> >> fragment's close logic will throw exception to complain about the
>> memory
>> >> leak.
>> >>
>> >>     So I want to know how to release the allocated direct ByteBuffer at
>> >> such case .
>> >>
>> >>
>> >>    The exception is :
>> >>
>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>> >> at
>> >> org.apache.drill.common.exceptions.UserException$Builder.
>> >> build(UserException.java:633)
>> >> ~[classes/:na]
>> >> at
>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFin
>> >> alState(FragmentExecutor.java:359)
>> >> [classes/:na]
>> >> at
>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
>> >> (FragmentExecutor.java:214)
>> >> [classes/:na]
>> >> at
>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.run(Fra
>> >> gmentExecutor.java:325)
>> >> [classes/:na]
>> >> at
>> >> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleanin
>> >> gRunnable.java:38)
>> >> [classes/:na]
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> >> Executor.java:1149)
>> >> [na:1.8.0_161]
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> >> lExecutor.java:624)
>> >> [na:1.8.0_161]
>> >> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
>> >> Caused by: java.lang.IllegalStateException: Allocator[frag:0:0] closed
>> >> with
>> >> outstanding buffers allocated (1).
>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> >> (res/actual/peak/limit)
>> >>   child allocators: 0
>> >>   ledgers: 1
>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> >> 505919927378312..0] holds 2 buffers.
>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>> >>   reservations: 0
>> >>
>> >> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
>> >> or.java:503)
>> >> ~[classes/:na]
>> >> at
>> >> org.apache.drill.exec.ops.FragmentContextImpl.suppressingClo
>> >> se(FragmentContextImpl.java:484)
>> >> ~[classes/:na]
>> >> at
>> >> org.apache.drill.exec.ops.FragmentContextImpl.close(Fragment
>> >> ContextImpl.java:478)
>> >> ~[classes/:na]
>> >> at
>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.closeOu
>> >> tResources(FragmentExecutor.java:382)
>> >> [classes/:na]
>> >> at
>> >> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
>> >> (FragmentExecutor.java:209)
>> >> [classes/:na]
>> >> ... 5 common frames omitted
>> >> =====================fragment:0 : 0done!
>> >> *************************receive a bloom filter**********
>> >> ************received a bloom filter
>> >> 11:00:41.587 [main] ERROR o.a.d.exec.server.BootStrapContext - Error
>> >> while
>> >> closing
>> >> java.lang.IllegalStateException: Allocator[ROOT] closed with
>> outstanding
>> >> child allocators.
>> >> Allocator(ROOT) 0/16777216/55640064/4294967296 (res/actual/peak/limit)
>> >>   child allocators: 1
>> >>     Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> >> (res/actual/peak/limit)
>> >>       child allocators: 0
>> >>       ledgers: 1
>> >>         ledger[6268] allocator: frag:0:0), isOwning: true, size:
>> 16777216,
>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> >> 505919927378312..0] holds 2 buffers.
>> >>             DrillBuf[10198], udle: [6051 0..16777216]
>> >>             DrillBuf[10208], udle: [6051 0..16777216]
>> >>       reservations: 0
>> >>   ledgers: 0
>> >>   reservations: 0
>> >>
>> >> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
>> >> or.java:496)
>> >> ~[classes/:na]
>> >> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
>> >> [classes/:na]
>> >> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
>> >> [classes/:na]
>> >> at
>> >> org.apache.drill.exec.server.BootStrapContext.close(BootStra
>> >> pContext.java:259)
>> >> ~[classes/:na]
>> >> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
>> >> [classes/:na]
>> >> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
>> >> [classes/:na]
>> >> at org.apache.drill.exec.server.Drillbit.close(Drillbit.java:263)
>> >> [classes/:na]
>> >> at
>> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
>> >> EqualityJoin(TestHashJoin.java:147)
>> >> [test-classes/:na]
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> ~[na:1.8.0_161]
>> >> at
>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> >> ssorImpl.java:62)
>> >> ~[na:1.8.0_161]
>> >> at
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> >> thodAccessorImpl.java:43)
>> >> ~[na:1.8.0_161]
>> >> at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
>> >> at
>> >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
>> >> FrameworkMethod.java:50)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.internal.runners.model.ReflectiveCallable.run(Refl
>> >> ectiveCallable.java:12)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>> >> ameworkMethod.java:47)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
>> >> .executeTestMethod(JUnit4TestRunnerDecorator.java:154)
>> >> [jmockit-1.39.jar:1.39]
>> >> at
>> >> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
>> >> .invokeExplosively(JUnit4TestRunnerDecorator.java:70)
>> >> [jmockit-1.39.jar:1.39]
>> >> at
>> >> mockit.integration.junit4.internal.FakeFrameworkMethod.invok
>> >> eExplosively(FakeFrameworkMethod.java:34)
>> >> [jmockit-1.39.jar:1.39]
>> >> at
>> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>> >> ameworkMethod.java)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.internal.runners.statements.InvokeMethod.evaluate(
>> >> InvokeMethod.java:17)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.internal.runners.statements.RunBefores.evaluate(
>> >> RunBefores.java:26)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.internal.runners.statements.RunAfters.evaluate(Run
>> >> Afters.java:27)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.rules.ExpectedException$ExpectedExceptionStatement
>> >> .evaluate(ExpectedException.java:239)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> >> 4ClassRunner.java:78)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> >> 4ClassRunner.java:57)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.internal.runners.statements.RunBefores.evaluate(
>> >> RunBefores.java:26)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> org.junit.internal.runners.statements.RunAfters.evaluate(Run
>> >> Afters.java:27)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> >> [junit-4.12.jar:4.12]
>> >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> >> [junit-4.12.jar:4.12]
>> >> at
>> >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs
>> >> (JUnit4IdeaTestRunner.java:68)
>> >> [junit-rt.jar:na]
>> >> at
>> >> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.star
>> >> tRunnerWithArgs(IdeaTestRunner.java:47)
>> >> [junit-rt.jar:na]
>> >> at
>> >> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsA
>> >> ndStart(JUnitStarter.java:242)
>> >> [junit-rt.jar:na]
>> >> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStart
>> >> er.java:70)
>> >> [junit-rt.jar:na]
>> >> 11:00:41.593 [main] ERROR org.apache.drill.TestReporter - Test Failed
>> (d:
>> >> 0
>> >> B(1 B), h: -360.8 MiB(52.3 MiB), nh: 3.2 MiB(88.4 MiB)):
>> >>
>> simpleEqualityJoin(org.apache.drill.exec.physical.impl.join.TestHashJoin)
>> >> org.apache.drill.exec.rpc.RpcException:
>> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
>> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
>> buffers
>> >> allocated (1).
>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> >> (res/actual/peak/limit)
>> >>   child allocators: 0
>> >>   ledgers: 1
>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> >> 505919927378312..0] holds 2 buffers.
>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>> >>   reservations: 0
>> >>
>> >>
>> >> Fragment 0:0
>> >>
>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>> >> at
>> >>
>> org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:60)
>> >> ~[classes/:na]
>> >> at
>> >> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
>> >> istener.getResults(DrillClient.java:881)
>> >> ~[classes/:na]
>> >> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
>> >> t.java:583)
>> >> ~[classes/:na]
>> >> at
>> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
>> >> EqualityJoin(TestHashJoin.java:119)
>> >> ~[test-classes/:na]
>> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
>> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
>> buffers
>> >> allocated (1).
>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> >> (res/actual/peak/limit)
>> >>   child allocators: 0
>> >>   ledgers: 1
>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> >> 505919927378312..0] holds 2 buffers.
>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>> >>   reservations: 0
>> >>
>> >>
>> >> Fragment 0:0
>> >>
>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>> >> at
>> >> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
>> >> ved(QueryResultHandler.java:123)
>> >> ~[classes/:na]
>> >> at
>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
>> >> ~[classes/:na]
>> >> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
>> >> ~[classes/:na]
>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>> >> s.java:275)
>> >> ~[classes/:na]
>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>> >> s.java:245)
>> >> ~[classes/:na]
>> >> at
>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>> >> essageToMessageDecoder.java:88)
>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
>> >> ateHandler.java:287)
>> >> ~[netty-handler-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>> >> essageToMessageDecoder.java:102)
>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
>> >> ByteToMessageDecoder.java:312)
>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
>> >> ToMessageDecoder.java:286)
>> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
>> >> annelInboundHandlerAdapter.java:86)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.DefaultChannelPipeline$HeadContext.
>> >> channelRead(DefaultChannelPipeline.java:1294)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
>> >> ultChannelPipeline.java:911)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
>> >> read(AbstractNioByteChannel.java:131)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>> >> tLoop.java:645)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
>> >> ed(NioEventLoop.java:580)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
>> >> ntLoop.java:497)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> >> at
>> >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
>> >> SingleThreadEventExecutor.java:131)
>> >> ~[netty-common-4.0.48.Final.jar:4.0.48.Final]
>> >> at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
>> >>
>> >> org.apache.drill.exec.rpc.RpcException:
>> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
>> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
>> buffers
>> >> allocated (1).
>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> >> (res/actual/peak/limit)
>> >>   child allocators: 0
>> >>   ledgers: 1
>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> >> 505919927378312..0] holds 2 buffers.
>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>> >>   reservations: 0
>> >>
>> >>
>> >> Fragment 0:0
>> >>
>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>> >>
>> >> at org.apache.drill.exec.rpc.RpcException.mapException(RpcExcep
>> >> tion.java:60)
>> >> at
>> >> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
>> >> istener.getResults(DrillClient.java:881)
>> >> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
>> >> t.java:583)
>> >> at
>> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
>> >> EqualityJoin(TestHashJoin.java:119)
>> >> Caused by: org.apache.drill.common.exceptions.UserRemoteException:
>> SYSTEM
>> >> ERROR: IllegalStateException: Allocator[frag:0:0] closed with
>> outstanding
>> >> buffers allocated (1).
>> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> >> (res/actual/peak/limit)
>> >>   child allocators: 0
>> >>   ledgers: 1
>> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> >> 505919927378312..0] holds 2 buffers.
>> >>         DrillBuf[10198], udle: [6051 0..16777216]
>> >>         DrillBuf[10208], udle: [6051 0..16777216]
>> >>   reservations: 0
>> >>
>> >>
>> >> Fragment 0:0
>> >>
>> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>> >> at
>> >> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
>> >> ved(QueryResultHandler.java:123)
>> >> at
>> org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
>> >> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>> >> s.java:275)
>> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>> >> s.java:245)
>> >> at
>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>> >> essageToMessageDecoder.java:88)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> at
>> >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
>> >> ateHandler.java:287)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> at
>> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>> >> essageToMessageDecoder.java:102)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> at
>> >> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
>> >> ByteToMessageDecoder.java:312)
>> >> at
>> >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
>> >> ToMessageDecoder.java:286)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> at
>> >> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
>> >> annelInboundHandlerAdapter.java:86)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> >> ad(AbstractChannelHandlerContext.java:335)
>> >> at
>> >> io.netty.channel.DefaultChannelPipeline$HeadContext.
>> >> channelRead(DefaultChannelPipeline.java:1294)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:356)
>> >> at
>> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> >> Read(AbstractChannelHandlerContext.java:342)
>> >> at
>> >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
>> >> ultChannelPipeline.java:911)
>> >> at
>> >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
>> >> read(AbstractNioByteChannel.java:131)
>> >> at
>> >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>> >> tLoop.java:645)
>> >> at
>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
>> >> ed(NioEventLoop.java:580)
>> >> at
>> >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
>> >> ntLoop.java:497)
>> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>> >> at
>> >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
>> >> SingleThreadEventExecutor.java:131)
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >
>> >
>>
>

Re: how to release allocated ByteBuf which steps across two threads

Posted by weijie tong <to...@gmail.com>.
Hi Parth:

    Thanks for your reply. Your detail description explain that problem
clearly. This problem is not a common case. The bloom filter has not been
sent out while the query has completed. I meet this exception while open
the bloom filter option to run all the join related test cases, then it
happen at TestHashJoin.simpleEqualityJoin() . Btw, the BloomFilter was sent
out through the data tunnel not the control one.   To partitioned broadcast
, the bloom filter will send out from each HashJoin node to the foreman
node . The foreman node will wait for all the bloom filter which it known
at the plan stage to come until the timeout occur. Once the foreman node
received all the bloom filter ,it will aggregate them then broadcast them
to all the probe side scan nodes. The design purpose is that all the origin
flow will not blocked by the runtime bloom filter flow . The bloom filter
sending, receiving , applying behaviors are all async ,just a helpful
behavior to the original execution flow.


     First, not all the HashJoin scenarios is possible to push down the
join predicate such as the both sides has similar row numbers. I will add
some check according the cost at the plan stage to prevent this happen so
the exception scenario will happen less.

     Send, I think the exception scenario still has to prevent to let the
system robust.  Your suggestion to add a synchronization between (3) ,(4)
and (6)  is good. But the question is that the corresponding receiving side
has completed, it has no chance to give a reply ack. Maybe some other
special timeout Ack was needed to take the lost role to let the Thread1 to
wait while the sending out behavior failed.     To this test case , the
hash join node is the last fragment, and it has few data to complete its
query.

    As normal execution flow has worked out, I will share the dev branch
soon by fix some trivial things. Still need suggestions to this problem.



On Thu, Jun 21, 2018 at 6:44 AM Parth Chandra <pa...@apache.org> wrote:

> Hi Weijie
>
>   It would also help to understand the flow of control that your design
> uses. I've put a screenshot of a query profile here :
>
> https://docs.google.com/document/d/1DgAbGovEWV6rZ4GvioJz5Twe_m5o1ADoBpbqAzqa_aU/edit?usp=sharing
>
>   Looking at the subset under [ Hash Join 06-01 ],  can you annotate and/or
> explain how you see the control messages flowing? Also, are you using the
> control channel to send the bloom filter?
>
> Parth
>
> On Wed, Jun 20, 2018 at 3:28 PM, Parth Chandra <pa...@apache.org> wrote:
>
> > Hi Weijie,
> >   This is a tricky problem. So let me first summarize how this should be
> > behaving -
> >
> >         Thread 1                     |     Thread 2
> >   -----------------------------------+------------------------
> > ------------------
> > 1)  Allocate DrillBuf                |
> > 2)  Pass memory to RPC layer         |   get reference to DrillBuf
> > (refcount == 2)
> > 3)                                   |   queue up the send (async)
> > 4)                                   |   Send and release DrillBuf
> > (refcount == 1)
> > 5)  Continue to end of query         |
> > 6)  Cleanup (release DrillBuf)       |
> > 7)  Close Allocator (refcount of     |
> >       DrillBuf *must* be zero)       |
> >
> > In your case, steps 3 and 4 are occurring after step 7 which is natural
> > since the RPC send is async, but that is what we have to prevent. The
> only
> > way to do that is to have some synchronization between steps (3), (4),
> and
> > (6) such that (6) only happens after (4). With RPC the way to do so is to
> > require an ack.
> >
> >
> >           Thread 1                     |     Thread 2  (Netty)
> >          |   Thread 3  (foreman)
> >     -----------------------------------+------------------------
> > --------------------+------------------------
> > 1)    Allocate DrillBuf                |
> >          |
> > 2)    Pass memory to RPC layer         |   get reference to DrillBuf
> > (refcount == 2)|
> > 3)                                     |   queue up the send (async)
> >          |
> > 4)                                     |   Send and release DrillBuf
> > (refcount == 1)|
> > 4.1)                                   |
> >          |  Recv msg, send back Ack (The RPC layer
> >                                        |
> >          |    automatically does this)
> > 4.2)  Check if Ack received            |
> > 5)    Continue to end of query         |
> > 6)    Cleanup (release DrillBuf)       |
> > 7)    Close Allocator (refcount of     |
> >         DrillBuf *must* be zero)       |
> >
> > Note that (4.2) does not have to complete before (5), only before (6) for
> > the memory to be released.
> >
> > One question I have is how the query completed without the Bloom Filter
> > reaching its destination. How does the destination fragment know when it
> > has to wait for the Bloom Filter? I suspect this may be more
> > complicated than it appears at first glance.
> >
> > Not sure if this helps narrow it down. If you can share a dev branch we
> > can help take a look.
> >
> >
> >
> > On Tue, Jun 19, 2018 at 8:35 PM, weijie tong <to...@gmail.com>
> > wrote:
> >
> >> HI:
> >>    I faced a complicated problem by releasing the BloomFilter's direct
> >> memory at some special cases. Hope someone could give some advices.
> >>
> >>    Say, one join node sends out BloomFilter to the foreman
> >> node(TestHashJoin.simpleEqualityJoin() ) .  The sending thread is
> netty's
> >> BitClient. The BloomFilter's direct memory is allocated by another
> thread
> >> allocator (i.e. the HashJoin fragment's allocator).  Once the fragment
> >> completes quickly. Then its corresponding close logic will check the
> >> allocator's memory assignment. But the async sender thread has not sent
> >> out
> >> the BloomFilter to release the corresponding direct ByteBuffer as the
> >> query
> >> has completed quickly , the wire has closed. Then the  corresponding
> >> fragment's close logic will throw exception to complain about the memory
> >> leak.
> >>
> >>     So I want to know how to release the allocated direct ByteBuffer at
> >> such case .
> >>
> >>
> >>    The exception is :
> >>
> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
> >> at
> >> org.apache.drill.common.exceptions.UserException$Builder.
> >> build(UserException.java:633)
> >> ~[classes/:na]
> >> at
> >> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFin
> >> alState(FragmentExecutor.java:359)
> >> [classes/:na]
> >> at
> >> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
> >> (FragmentExecutor.java:214)
> >> [classes/:na]
> >> at
> >> org.apache.drill.exec.work.fragment.FragmentExecutor.run(Fra
> >> gmentExecutor.java:325)
> >> [classes/:na]
> >> at
> >> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleanin
> >> gRunnable.java:38)
> >> [classes/:na]
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> >> Executor.java:1149)
> >> [na:1.8.0_161]
> >> at
> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> >> lExecutor.java:624)
> >> [na:1.8.0_161]
> >> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
> >> Caused by: java.lang.IllegalStateException: Allocator[frag:0:0] closed
> >> with
> >> outstanding buffers allocated (1).
> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >> (res/actual/peak/limit)
> >>   child allocators: 0
> >>   ledgers: 1
> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> >> 505919927378312..0] holds 2 buffers.
> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>   reservations: 0
> >>
> >> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
> >> or.java:503)
> >> ~[classes/:na]
> >> at
> >> org.apache.drill.exec.ops.FragmentContextImpl.suppressingClo
> >> se(FragmentContextImpl.java:484)
> >> ~[classes/:na]
> >> at
> >> org.apache.drill.exec.ops.FragmentContextImpl.close(Fragment
> >> ContextImpl.java:478)
> >> ~[classes/:na]
> >> at
> >> org.apache.drill.exec.work.fragment.FragmentExecutor.closeOu
> >> tResources(FragmentExecutor.java:382)
> >> [classes/:na]
> >> at
> >> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
> >> (FragmentExecutor.java:209)
> >> [classes/:na]
> >> ... 5 common frames omitted
> >> =====================fragment:0 : 0done!
> >> *************************receive a bloom filter**********
> >> ************received a bloom filter
> >> 11:00:41.587 [main] ERROR o.a.d.exec.server.BootStrapContext - Error
> >> while
> >> closing
> >> java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding
> >> child allocators.
> >> Allocator(ROOT) 0/16777216/55640064/4294967296 (res/actual/peak/limit)
> >>   child allocators: 1
> >>     Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >> (res/actual/peak/limit)
> >>       child allocators: 0
> >>       ledgers: 1
> >>         ledger[6268] allocator: frag:0:0), isOwning: true, size:
> 16777216,
> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> >> 505919927378312..0] holds 2 buffers.
> >>             DrillBuf[10198], udle: [6051 0..16777216]
> >>             DrillBuf[10208], udle: [6051 0..16777216]
> >>       reservations: 0
> >>   ledgers: 0
> >>   reservations: 0
> >>
> >> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
> >> or.java:496)
> >> ~[classes/:na]
> >> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
> >> [classes/:na]
> >> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
> >> [classes/:na]
> >> at
> >> org.apache.drill.exec.server.BootStrapContext.close(BootStra
> >> pContext.java:259)
> >> ~[classes/:na]
> >> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
> >> [classes/:na]
> >> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
> >> [classes/:na]
> >> at org.apache.drill.exec.server.Drillbit.close(Drillbit.java:263)
> >> [classes/:na]
> >> at
> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
> >> EqualityJoin(TestHashJoin.java:147)
> >> [test-classes/:na]
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> ~[na:1.8.0_161]
> >> at
> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
> >> ssorImpl.java:62)
> >> ~[na:1.8.0_161]
> >> at
> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
> >> thodAccessorImpl.java:43)
> >> ~[na:1.8.0_161]
> >> at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
> >> at
> >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> >> FrameworkMethod.java:50)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.internal.runners.model.ReflectiveCallable.run(Refl
> >> ectiveCallable.java:12)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
> >> ameworkMethod.java:47)
> >> [junit-4.12.jar:4.12]
> >> at
> >> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
> >> .executeTestMethod(JUnit4TestRunnerDecorator.java:154)
> >> [jmockit-1.39.jar:1.39]
> >> at
> >> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
> >> .invokeExplosively(JUnit4TestRunnerDecorator.java:70)
> >> [jmockit-1.39.jar:1.39]
> >> at
> >> mockit.integration.junit4.internal.FakeFrameworkMethod.invok
> >> eExplosively(FakeFrameworkMethod.java:34)
> >> [jmockit-1.39.jar:1.39]
> >> at
> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
> >> ameworkMethod.java)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.internal.runners.statements.InvokeMethod.evaluate(
> >> InvokeMethod.java:17)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.internal.runners.statements.RunBefores.evaluate(
> >> RunBefores.java:26)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.internal.runners.statements.RunAfters.evaluate(Run
> >> Afters.java:27)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.rules.ExpectedException$ExpectedExceptionStatement
> >> .evaluate(ExpectedException.java:239)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
> >> 4ClassRunner.java:78)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
> >> 4ClassRunner.java:57)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.internal.runners.statements.RunBefores.evaluate(
> >> RunBefores.java:26)
> >> [junit-4.12.jar:4.12]
> >> at
> >> org.junit.internal.runners.statements.RunAfters.evaluate(Run
> >> Afters.java:27)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> >> [junit-4.12.jar:4.12]
> >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> >> [junit-4.12.jar:4.12]
> >> at
> >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs
> >> (JUnit4IdeaTestRunner.java:68)
> >> [junit-rt.jar:na]
> >> at
> >> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.star
> >> tRunnerWithArgs(IdeaTestRunner.java:47)
> >> [junit-rt.jar:na]
> >> at
> >> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsA
> >> ndStart(JUnitStarter.java:242)
> >> [junit-rt.jar:na]
> >> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStart
> >> er.java:70)
> >> [junit-rt.jar:na]
> >> 11:00:41.593 [main] ERROR org.apache.drill.TestReporter - Test Failed
> (d:
> >> 0
> >> B(1 B), h: -360.8 MiB(52.3 MiB), nh: 3.2 MiB(88.4 MiB)):
> >>
> simpleEqualityJoin(org.apache.drill.exec.physical.impl.join.TestHashJoin)
> >> org.apache.drill.exec.rpc.RpcException:
> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
> buffers
> >> allocated (1).
> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >> (res/actual/peak/limit)
> >>   child allocators: 0
> >>   ledgers: 1
> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> >> 505919927378312..0] holds 2 buffers.
> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>   reservations: 0
> >>
> >>
> >> Fragment 0:0
> >>
> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
> >> at
> >>
> org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:60)
> >> ~[classes/:na]
> >> at
> >> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
> >> istener.getResults(DrillClient.java:881)
> >> ~[classes/:na]
> >> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
> >> t.java:583)
> >> ~[classes/:na]
> >> at
> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
> >> EqualityJoin(TestHashJoin.java:119)
> >> ~[test-classes/:na]
> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
> buffers
> >> allocated (1).
> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >> (res/actual/peak/limit)
> >>   child allocators: 0
> >>   ledgers: 1
> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> >> 505919927378312..0] holds 2 buffers.
> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>   reservations: 0
> >>
> >>
> >> Fragment 0:0
> >>
> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
> >> at
> >> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
> >> ved(QueryResultHandler.java:123)
> >> ~[classes/:na]
> >> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
> >> ~[classes/:na]
> >> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
> >> ~[classes/:na]
> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
> >> s.java:275)
> >> ~[classes/:na]
> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
> >> s.java:245)
> >> ~[classes/:na]
> >> at
> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
> >> essageToMessageDecoder.java:88)
> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
> >> ateHandler.java:287)
> >> ~[netty-handler-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
> >> essageToMessageDecoder.java:102)
> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
> >> ByteToMessageDecoder.java:312)
> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
> >> ToMessageDecoder.java:286)
> >> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
> >> annelInboundHandlerAdapter.java:86)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.DefaultChannelPipeline$HeadContext.
> >> channelRead(DefaultChannelPipeline.java:1294)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
> >> ultChannelPipeline.java:911)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
> >> read(AbstractNioByteChannel.java:131)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
> >> tLoop.java:645)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
> >> ed(NioEventLoop.java:580)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
> >> ntLoop.java:497)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> >> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> >> at
> >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
> >> SingleThreadEventExecutor.java:131)
> >> ~[netty-common-4.0.48.Final.jar:4.0.48.Final]
> >> at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
> >>
> >> org.apache.drill.exec.rpc.RpcException:
> >> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
> >> IllegalStateException: Allocator[frag:0:0] closed with outstanding
> buffers
> >> allocated (1).
> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >> (res/actual/peak/limit)
> >>   child allocators: 0
> >>   ledgers: 1
> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> >> 505919927378312..0] holds 2 buffers.
> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>   reservations: 0
> >>
> >>
> >> Fragment 0:0
> >>
> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
> >>
> >> at org.apache.drill.exec.rpc.RpcException.mapException(RpcExcep
> >> tion.java:60)
> >> at
> >> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
> >> istener.getResults(DrillClient.java:881)
> >> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
> >> t.java:583)
> >> at
> >> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
> >> EqualityJoin(TestHashJoin.java:119)
> >> Caused by: org.apache.drill.common.exceptions.UserRemoteException:
> SYSTEM
> >> ERROR: IllegalStateException: Allocator[frag:0:0] closed with
> outstanding
> >> buffers allocated (1).
> >> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> >> (res/actual/peak/limit)
> >>   child allocators: 0
> >>   ledgers: 1
> >>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> >> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> >> 505919927378312..0] holds 2 buffers.
> >>         DrillBuf[10198], udle: [6051 0..16777216]
> >>         DrillBuf[10208], udle: [6051 0..16777216]
> >>   reservations: 0
> >>
> >>
> >> Fragment 0:0
> >>
> >> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
> >> at
> >> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
> >> ved(QueryResultHandler.java:123)
> >> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
> >> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
> >> s.java:275)
> >> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
> >> s.java:245)
> >> at
> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
> >> essageToMessageDecoder.java:88)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> at
> >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
> >> ateHandler.java:287)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> at
> >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
> >> essageToMessageDecoder.java:102)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> at
> >> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
> >> ByteToMessageDecoder.java:312)
> >> at
> >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
> >> ToMessageDecoder.java:286)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> at
> >> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
> >> annelInboundHandlerAdapter.java:86)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
> >> ad(AbstractChannelHandlerContext.java:335)
> >> at
> >> io.netty.channel.DefaultChannelPipeline$HeadContext.
> >> channelRead(DefaultChannelPipeline.java:1294)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:356)
> >> at
> >> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
> >> Read(AbstractChannelHandlerContext.java:342)
> >> at
> >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
> >> ultChannelPipeline.java:911)
> >> at
> >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
> >> read(AbstractNioByteChannel.java:131)
> >> at
> >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
> >> tLoop.java:645)
> >> at
> >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
> >> ed(NioEventLoop.java:580)
> >> at
> >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
> >> ntLoop.java:497)
> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> >> at
> >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
> >> SingleThreadEventExecutor.java:131)
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >
> >
>

Re: how to release allocated ByteBuf which steps across two threads

Posted by Parth Chandra <pa...@apache.org>.
Hi Weijie

  It would also help to understand the flow of control that your design
uses. I've put a screenshot of a query profile here :
https://docs.google.com/document/d/1DgAbGovEWV6rZ4GvioJz5Twe_m5o1ADoBpbqAzqa_aU/edit?usp=sharing

  Looking at the subset under [ Hash Join 06-01 ],  can you annotate and/or
explain how you see the control messages flowing? Also, are you using the
control channel to send the bloom filter?

Parth

On Wed, Jun 20, 2018 at 3:28 PM, Parth Chandra <pa...@apache.org> wrote:

> Hi Weijie,
>   This is a tricky problem. So let me first summarize how this should be
> behaving -
>
>         Thread 1                     |     Thread 2
>   -----------------------------------+------------------------
> ------------------
> 1)  Allocate DrillBuf                |
> 2)  Pass memory to RPC layer         |   get reference to DrillBuf
> (refcount == 2)
> 3)                                   |   queue up the send (async)
> 4)                                   |   Send and release DrillBuf
> (refcount == 1)
> 5)  Continue to end of query         |
> 6)  Cleanup (release DrillBuf)       |
> 7)  Close Allocator (refcount of     |
>       DrillBuf *must* be zero)       |
>
> In your case, steps 3 and 4 are occurring after step 7 which is natural
> since the RPC send is async, but that is what we have to prevent. The only
> way to do that is to have some synchronization between steps (3), (4), and
> (6) such that (6) only happens after (4). With RPC the way to do so is to
> require an ack.
>
>
>           Thread 1                     |     Thread 2  (Netty)
>          |   Thread 3  (foreman)
>     -----------------------------------+------------------------
> --------------------+------------------------
> 1)    Allocate DrillBuf                |
>          |
> 2)    Pass memory to RPC layer         |   get reference to DrillBuf
> (refcount == 2)|
> 3)                                     |   queue up the send (async)
>          |
> 4)                                     |   Send and release DrillBuf
> (refcount == 1)|
> 4.1)                                   |
>          |  Recv msg, send back Ack (The RPC layer
>                                        |
>          |    automatically does this)
> 4.2)  Check if Ack received            |
> 5)    Continue to end of query         |
> 6)    Cleanup (release DrillBuf)       |
> 7)    Close Allocator (refcount of     |
>         DrillBuf *must* be zero)       |
>
> Note that (4.2) does not have to complete before (5), only before (6) for
> the memory to be released.
>
> One question I have is how the query completed without the Bloom Filter
> reaching its destination. How does the destination fragment know when it
> has to wait for the Bloom Filter? I suspect this may be more
> complicated than it appears at first glance.
>
> Not sure if this helps narrow it down. If you can share a dev branch we
> can help take a look.
>
>
>
> On Tue, Jun 19, 2018 at 8:35 PM, weijie tong <to...@gmail.com>
> wrote:
>
>> HI:
>>    I faced a complicated problem by releasing the BloomFilter's direct
>> memory at some special cases. Hope someone could give some advices.
>>
>>    Say, one join node sends out BloomFilter to the foreman
>> node(TestHashJoin.simpleEqualityJoin() ) .  The sending thread is netty's
>> BitClient. The BloomFilter's direct memory is allocated by another thread
>> allocator (i.e. the HashJoin fragment's allocator).  Once the fragment
>> completes quickly. Then its corresponding close logic will check the
>> allocator's memory assignment. But the async sender thread has not sent
>> out
>> the BloomFilter to release the corresponding direct ByteBuffer as the
>> query
>> has completed quickly , the wire has closed. Then the  corresponding
>> fragment's close logic will throw exception to complain about the memory
>> leak.
>>
>>     So I want to know how to release the allocated direct ByteBuffer at
>> such case .
>>
>>
>>    The exception is :
>>
>> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>> at
>> org.apache.drill.common.exceptions.UserException$Builder.
>> build(UserException.java:633)
>> ~[classes/:na]
>> at
>> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFin
>> alState(FragmentExecutor.java:359)
>> [classes/:na]
>> at
>> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
>> (FragmentExecutor.java:214)
>> [classes/:na]
>> at
>> org.apache.drill.exec.work.fragment.FragmentExecutor.run(Fra
>> gmentExecutor.java:325)
>> [classes/:na]
>> at
>> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleanin
>> gRunnable.java:38)
>> [classes/:na]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1149)
>> [na:1.8.0_161]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:624)
>> [na:1.8.0_161]
>> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
>> Caused by: java.lang.IllegalStateException: Allocator[frag:0:0] closed
>> with
>> outstanding buffers allocated (1).
>> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> (res/actual/peak/limit)
>>   child allocators: 0
>>   ledgers: 1
>>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> 505919927378312..0] holds 2 buffers.
>>         DrillBuf[10198], udle: [6051 0..16777216]
>>         DrillBuf[10208], udle: [6051 0..16777216]
>>   reservations: 0
>>
>> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
>> or.java:503)
>> ~[classes/:na]
>> at
>> org.apache.drill.exec.ops.FragmentContextImpl.suppressingClo
>> se(FragmentContextImpl.java:484)
>> ~[classes/:na]
>> at
>> org.apache.drill.exec.ops.FragmentContextImpl.close(Fragment
>> ContextImpl.java:478)
>> ~[classes/:na]
>> at
>> org.apache.drill.exec.work.fragment.FragmentExecutor.closeOu
>> tResources(FragmentExecutor.java:382)
>> [classes/:na]
>> at
>> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup
>> (FragmentExecutor.java:209)
>> [classes/:na]
>> ... 5 common frames omitted
>> =====================fragment:0 : 0done!
>> *************************receive a bloom filter**********
>> ************received a bloom filter
>> 11:00:41.587 [main] ERROR o.a.d.exec.server.BootStrapContext - Error
>> while
>> closing
>> java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding
>> child allocators.
>> Allocator(ROOT) 0/16777216/55640064/4294967296 (res/actual/peak/limit)
>>   child allocators: 1
>>     Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> (res/actual/peak/limit)
>>       child allocators: 0
>>       ledgers: 1
>>         ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> 505919927378312..0] holds 2 buffers.
>>             DrillBuf[10198], udle: [6051 0..16777216]
>>             DrillBuf[10208], udle: [6051 0..16777216]
>>       reservations: 0
>>   ledgers: 0
>>   reservations: 0
>>
>> at org.apache.drill.exec.memory.BaseAllocator.close(BaseAllocat
>> or.java:496)
>> ~[classes/:na]
>> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
>> [classes/:na]
>> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
>> [classes/:na]
>> at
>> org.apache.drill.exec.server.BootStrapContext.close(BootStra
>> pContext.java:259)
>> ~[classes/:na]
>> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
>> [classes/:na]
>> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
>> [classes/:na]
>> at org.apache.drill.exec.server.Drillbit.close(Drillbit.java:263)
>> [classes/:na]
>> at
>> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
>> EqualityJoin(TestHashJoin.java:147)
>> [test-classes/:na]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[na:1.8.0_161]
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> ~[na:1.8.0_161]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> ~[na:1.8.0_161]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
>> FrameworkMethod.java:50)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(Refl
>> ectiveCallable.java:12)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>> ameworkMethod.java:47)
>> [junit-4.12.jar:4.12]
>> at
>> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
>> .executeTestMethod(JUnit4TestRunnerDecorator.java:154)
>> [jmockit-1.39.jar:1.39]
>> at
>> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator
>> .invokeExplosively(JUnit4TestRunnerDecorator.java:70)
>> [jmockit-1.39.jar:1.39]
>> at
>> mockit.integration.junit4.internal.FakeFrameworkMethod.invok
>> eExplosively(FakeFrameworkMethod.java:34)
>> [jmockit-1.39.jar:1.39]
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>> ameworkMethod.java)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(
>> InvokeMethod.java:17)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(
>> RunBefores.java:26)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.internal.runners.statements.RunAfters.evaluate(Run
>> Afters.java:27)
>> [junit-4.12.jar:4.12]
>> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> [junit-4.12.jar:4.12]
>> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> [junit-4.12.jar:4.12]
>> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> [junit-4.12.jar:4.12]
>> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.rules.ExpectedException$ExpectedExceptionStatement
>> .evaluate(ExpectedException.java:239)
>> [junit-4.12.jar:4.12]
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> [junit-4.12.jar:4.12]
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> 4ClassRunner.java:78)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> 4ClassRunner.java:57)
>> [junit-4.12.jar:4.12]
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> [junit-4.12.jar:4.12]
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> [junit-4.12.jar:4.12]
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> [junit-4.12.jar:4.12]
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> [junit-4.12.jar:4.12]
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(
>> RunBefores.java:26)
>> [junit-4.12.jar:4.12]
>> at
>> org.junit.internal.runners.statements.RunAfters.evaluate(Run
>> Afters.java:27)
>> [junit-4.12.jar:4.12]
>> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> [junit-4.12.jar:4.12]
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> [junit-4.12.jar:4.12]
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> [junit-4.12.jar:4.12]
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> [junit-4.12.jar:4.12]
>> at
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs
>> (JUnit4IdeaTestRunner.java:68)
>> [junit-rt.jar:na]
>> at
>> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.star
>> tRunnerWithArgs(IdeaTestRunner.java:47)
>> [junit-rt.jar:na]
>> at
>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsA
>> ndStart(JUnitStarter.java:242)
>> [junit-rt.jar:na]
>> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStart
>> er.java:70)
>> [junit-rt.jar:na]
>> 11:00:41.593 [main] ERROR org.apache.drill.TestReporter - Test Failed (d:
>> 0
>> B(1 B), h: -360.8 MiB(52.3 MiB), nh: 3.2 MiB(88.4 MiB)):
>> simpleEqualityJoin(org.apache.drill.exec.physical.impl.join.TestHashJoin)
>> org.apache.drill.exec.rpc.RpcException:
>> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
>> IllegalStateException: Allocator[frag:0:0] closed with outstanding buffers
>> allocated (1).
>> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> (res/actual/peak/limit)
>>   child allocators: 0
>>   ledgers: 1
>>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> 505919927378312..0] holds 2 buffers.
>>         DrillBuf[10198], udle: [6051 0..16777216]
>>         DrillBuf[10208], udle: [6051 0..16777216]
>>   reservations: 0
>>
>>
>> Fragment 0:0
>>
>> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>> at
>> org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:60)
>> ~[classes/:na]
>> at
>> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
>> istener.getResults(DrillClient.java:881)
>> ~[classes/:na]
>> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
>> t.java:583)
>> ~[classes/:na]
>> at
>> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
>> EqualityJoin(TestHashJoin.java:119)
>> ~[test-classes/:na]
>> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
>> IllegalStateException: Allocator[frag:0:0] closed with outstanding buffers
>> allocated (1).
>> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> (res/actual/peak/limit)
>>   child allocators: 0
>>   ledgers: 1
>>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> 505919927378312..0] holds 2 buffers.
>>         DrillBuf[10198], udle: [6051 0..16777216]
>>         DrillBuf[10208], udle: [6051 0..16777216]
>>   reservations: 0
>>
>>
>> Fragment 0:0
>>
>> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>> at
>> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
>> ved(QueryResultHandler.java:123)
>> ~[classes/:na]
>> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
>> ~[classes/:na]
>> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
>> ~[classes/:na]
>> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>> s.java:275)
>> ~[classes/:na]
>> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>> s.java:245)
>> ~[classes/:na]
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>> essageToMessageDecoder.java:88)
>> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
>> ateHandler.java:287)
>> ~[netty-handler-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>> essageToMessageDecoder.java:102)
>> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
>> ByteToMessageDecoder.java:312)
>> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
>> ToMessageDecoder.java:286)
>> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
>> annelInboundHandlerAdapter.java:86)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.DefaultChannelPipeline$HeadContext.
>> channelRead(DefaultChannelPipeline.java:1294)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
>> ultChannelPipeline.java:911)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
>> read(AbstractNioByteChannel.java:131)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>> tLoop.java:645)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
>> ed(NioEventLoop.java:580)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
>> ntLoop.java:497)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
>> SingleThreadEventExecutor.java:131)
>> ~[netty-common-4.0.48.Final.jar:4.0.48.Final]
>> at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
>>
>> org.apache.drill.exec.rpc.RpcException:
>> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
>> IllegalStateException: Allocator[frag:0:0] closed with outstanding buffers
>> allocated (1).
>> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> (res/actual/peak/limit)
>>   child allocators: 0
>>   ledgers: 1
>>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> 505919927378312..0] holds 2 buffers.
>>         DrillBuf[10198], udle: [6051 0..16777216]
>>         DrillBuf[10208], udle: [6051 0..16777216]
>>   reservations: 0
>>
>>
>> Fragment 0:0
>>
>> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>>
>> at org.apache.drill.exec.rpc.RpcException.mapException(RpcExcep
>> tion.java:60)
>> at
>> org.apache.drill.exec.client.DrillClient$ListHoldingResultsL
>> istener.getResults(DrillClient.java:881)
>> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClien
>> t.java:583)
>> at
>> org.apache.drill.exec.physical.impl.join.TestHashJoin.simple
>> EqualityJoin(TestHashJoin.java:119)
>> Caused by: org.apache.drill.common.exceptions.UserRemoteException: SYSTEM
>> ERROR: IllegalStateException: Allocator[frag:0:0] closed with outstanding
>> buffers allocated (1).
>> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
>> (res/actual/peak/limit)
>>   child allocators: 0
>>   ledgers: 1
>>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
>> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
>> 505919927378312..0] holds 2 buffers.
>>         DrillBuf[10198], udle: [6051 0..16777216]
>>         DrillBuf[10208], udle: [6051 0..16777216]
>>   reservations: 0
>>
>>
>> Fragment 0:0
>>
>> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>> at
>> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArri
>> ved(QueryResultHandler.java:123)
>> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
>> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
>> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>> s.java:275)
>> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBu
>> s.java:245)
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>> essageToMessageDecoder.java:88)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleSt
>> ateHandler.java:287)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(M
>> essageToMessageDecoder.java:102)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
>> ByteToMessageDecoder.java:312)
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byte
>> ToMessageDecoder.java:286)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(Ch
>> annelInboundHandlerAdapter.java:86)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRe
>> ad(AbstractChannelHandlerContext.java:335)
>> at
>> io.netty.channel.DefaultChannelPipeline$HeadContext.
>> channelRead(DefaultChannelPipeline.java:1294)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:356)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannel
>> Read(AbstractChannelHandlerContext.java:342)
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(Defa
>> ultChannelPipeline.java:911)
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
>> read(AbstractNioByteChannel.java:131)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>> tLoop.java:645)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
>> ed(NioEventLoop.java:580)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEve
>> ntLoop.java:497)
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(
>> SingleThreadEventExecutor.java:131)
>> at java.lang.Thread.run(Thread.java:748)
>>
>
>

Re: how to release allocated ByteBuf which steps across two threads

Posted by Parth Chandra <pa...@apache.org>.
Hi Weijie,
  This is a tricky problem. So let me first summarize how this should be
behaving -

        Thread 1                     |     Thread 2

-----------------------------------+------------------------------------------
1)  Allocate DrillBuf                |
2)  Pass memory to RPC layer         |   get reference to DrillBuf
(refcount == 2)
3)                                   |   queue up the send (async)
4)                                   |   Send and release DrillBuf
(refcount == 1)
5)  Continue to end of query         |
6)  Cleanup (release DrillBuf)       |
7)  Close Allocator (refcount of     |
      DrillBuf *must* be zero)       |

In your case, steps 3 and 4 are occurring after step 7 which is natural
since the RPC send is async, but that is what we have to prevent. The only
way to do that is to have some synchronization between steps (3), (4), and
(6) such that (6) only happens after (4). With RPC the way to do so is to
require an ack.


          Thread 1                     |     Thread 2  (Netty)
         |   Thread 3  (foreman)

-----------------------------------+--------------------------------------------+------------------------
1)    Allocate DrillBuf                |
         |
2)    Pass memory to RPC layer         |   get reference to DrillBuf
(refcount == 2)|
3)                                     |   queue up the send (async)
         |
4)                                     |   Send and release DrillBuf
(refcount == 1)|
4.1)                                   |
         |  Recv msg, send back Ack (The RPC layer
                                       |
         |    automatically does this)
4.2)  Check if Ack received            |
5)    Continue to end of query         |
6)    Cleanup (release DrillBuf)       |
7)    Close Allocator (refcount of     |
        DrillBuf *must* be zero)       |

Note that (4.2) does not have to complete before (5), only before (6) for
the memory to be released.

One question I have is how the query completed without the Bloom Filter
reaching its destination. How does the destination fragment know when it
has to wait for the Bloom Filter? I suspect this may be more
complicated than it appears at first glance.

Not sure if this helps narrow it down. If you can share a dev branch we can
help take a look.



On Tue, Jun 19, 2018 at 8:35 PM, weijie tong <to...@gmail.com>
wrote:

> HI:
>    I faced a complicated problem by releasing the BloomFilter's direct
> memory at some special cases. Hope someone could give some advices.
>
>    Say, one join node sends out BloomFilter to the foreman
> node(TestHashJoin.simpleEqualityJoin() ) .  The sending thread is netty's
> BitClient. The BloomFilter's direct memory is allocated by another thread
> allocator (i.e. the HashJoin fragment's allocator).  Once the fragment
> completes quickly. Then its corresponding close logic will check the
> allocator's memory assignment. But the async sender thread has not sent out
> the BloomFilter to release the corresponding direct ByteBuffer as the query
> has completed quickly , the wire has closed. Then the  corresponding
> fragment's close logic will throw exception to complain about the memory
> leak.
>
>     So I want to know how to release the allocated direct ByteBuffer at
> such case .
>
>
>    The exception is :
>
> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
> at
> org.apache.drill.common.exceptions.UserException$
> Builder.build(UserException.java:633)
> ~[classes/:na]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFinalState(
> FragmentExecutor.java:359)
> [classes/:na]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.
> cleanup(FragmentExecutor.java:214)
> [classes/:na]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.run(
> FragmentExecutor.java:325)
> [classes/:na]
> at
> org.apache.drill.common.SelfCleaningRunnable.run(
> SelfCleaningRunnable.java:38)
> [classes/:na]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> [na:1.8.0_161]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> [na:1.8.0_161]
> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
> Caused by: java.lang.IllegalStateException: Allocator[frag:0:0] closed
> with
> outstanding buffers allocated (1).
> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> (res/actual/peak/limit)
>   child allocators: 0
>   ledgers: 1
>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> 505919927378312..0] holds 2 buffers.
>         DrillBuf[10198], udle: [6051 0..16777216]
>         DrillBuf[10208], udle: [6051 0..16777216]
>   reservations: 0
>
> at org.apache.drill.exec.memory.BaseAllocator.close(
> BaseAllocator.java:503)
> ~[classes/:na]
> at
> org.apache.drill.exec.ops.FragmentContextImpl.suppressingClose(
> FragmentContextImpl.java:484)
> ~[classes/:na]
> at
> org.apache.drill.exec.ops.FragmentContextImpl.close(
> FragmentContextImpl.java:478)
> ~[classes/:na]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.closeOutResources(
> FragmentExecutor.java:382)
> [classes/:na]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.
> cleanup(FragmentExecutor.java:209)
> [classes/:na]
> ... 5 common frames omitted
> =====================fragment:0 : 0done!
> *************************receive a bloom filter**********
> ************received a bloom filter
> 11:00:41.587 [main] ERROR o.a.d.exec.server.BootStrapContext - Error while
> closing
> java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding
> child allocators.
> Allocator(ROOT) 0/16777216/55640064/4294967296 (res/actual/peak/limit)
>   child allocators: 1
>     Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> (res/actual/peak/limit)
>       child allocators: 0
>       ledgers: 1
>         ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> 505919927378312..0] holds 2 buffers.
>             DrillBuf[10198], udle: [6051 0..16777216]
>             DrillBuf[10208], udle: [6051 0..16777216]
>       reservations: 0
>   ledgers: 0
>   reservations: 0
>
> at org.apache.drill.exec.memory.BaseAllocator.close(
> BaseAllocator.java:496)
> ~[classes/:na]
> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
> [classes/:na]
> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
> [classes/:na]
> at
> org.apache.drill.exec.server.BootStrapContext.close(
> BootStrapContext.java:259)
> ~[classes/:na]
> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:81)
> [classes/:na]
> at org.apache.drill.common.AutoCloseables.close(AutoCloseables.java:69)
> [classes/:na]
> at org.apache.drill.exec.server.Drillbit.close(Drillbit.java:263)
> [classes/:na]
> at
> org.apache.drill.exec.physical.impl.join.TestHashJoin.simpleEqualityJoin(
> TestHashJoin.java:147)
> [test-classes/:na]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[na:1.8.0_161]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> ~[na:1.8.0_161]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> ~[na:1.8.0_161]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> FrameworkMethod.java:50)
> [junit-4.12.jar:4.12]
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(
> ReflectiveCallable.java:12)
> [junit-4.12.jar:4.12]
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java:47)
> [junit-4.12.jar:4.12]
> at
> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator.
> executeTestMethod(JUnit4TestRunnerDecorator.java:154)
> [jmockit-1.39.jar:1.39]
> at
> mockit.integration.junit4.internal.JUnit4TestRunnerDecorator.
> invokeExplosively(JUnit4TestRunnerDecorator.java:70)
> [jmockit-1.39.jar:1.39]
> at
> mockit.integration.junit4.internal.FakeFrameworkMethod.invokeExplosively(
> FakeFrameworkMethod.java:34)
> [jmockit-1.39.jar:1.39]
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java)
> [junit-4.12.jar:4.12]
> at
> org.junit.internal.runners.statements.InvokeMethod.
> evaluate(InvokeMethod.java:17)
> [junit-4.12.jar:4.12]
> at
> org.junit.internal.runners.statements.RunBefores.
> evaluate(RunBefores.java:26)
> [junit-4.12.jar:4.12]
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(
> RunAfters.java:27)
> [junit-4.12.jar:4.12]
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> [junit-4.12.jar:4.12]
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> [junit-4.12.jar:4.12]
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> [junit-4.12.jar:4.12]
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> [junit-4.12.jar:4.12]
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.
> evaluate(ExpectedException.java:239)
> [junit-4.12.jar:4.12]
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> [junit-4.12.jar:4.12]
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> [junit-4.12.jar:4.12]
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:78)
> [junit-4.12.jar:4.12]
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:57)
> [junit-4.12.jar:4.12]
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> [junit-4.12.jar:4.12]
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> [junit-4.12.jar:4.12]
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> [junit-4.12.jar:4.12]
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> [junit-4.12.jar:4.12]
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> [junit-4.12.jar:4.12]
> at
> org.junit.internal.runners.statements.RunBefores.
> evaluate(RunBefores.java:26)
> [junit-4.12.jar:4.12]
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(
> RunAfters.java:27)
> [junit-4.12.jar:4.12]
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> [junit-4.12.jar:4.12]
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> [junit-4.12.jar:4.12]
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> [junit-4.12.jar:4.12]
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> [junit-4.12.jar:4.12]
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(
> JUnit4IdeaTestRunner.java:68)
> [junit-rt.jar:na]
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.
> startRunnerWithArgs(IdeaTestRunner.java:47)
> [junit-rt.jar:na]
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(
> JUnitStarter.java:242)
> [junit-rt.jar:na]
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> [junit-rt.jar:na]
> 11:00:41.593 [main] ERROR org.apache.drill.TestReporter - Test Failed (d: 0
> B(1 B), h: -360.8 MiB(52.3 MiB), nh: 3.2 MiB(88.4 MiB)):
> simpleEqualityJoin(org.apache.drill.exec.physical.impl.join.TestHashJoin)
> org.apache.drill.exec.rpc.RpcException:
> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
> IllegalStateException: Allocator[frag:0:0] closed with outstanding buffers
> allocated (1).
> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> (res/actual/peak/limit)
>   child allocators: 0
>   ledgers: 1
>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> 505919927378312..0] holds 2 buffers.
>         DrillBuf[10198], udle: [6051 0..16777216]
>         DrillBuf[10208], udle: [6051 0..16777216]
>   reservations: 0
>
>
> Fragment 0:0
>
> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
> at
> org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:60)
> ~[classes/:na]
> at
> org.apache.drill.exec.client.DrillClient$ListHoldingResultsListener.
> getResults(DrillClient.java:881)
> ~[classes/:na]
> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClient.java:583)
> ~[classes/:na]
> at
> org.apache.drill.exec.physical.impl.join.TestHashJoin.simpleEqualityJoin(
> TestHashJoin.java:119)
> ~[test-classes/:na]
> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
> IllegalStateException: Allocator[frag:0:0] closed with outstanding buffers
> allocated (1).
> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> (res/actual/peak/limit)
>   child allocators: 0
>   ledgers: 1
>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> 505919927378312..0] holds 2 buffers.
>         DrillBuf[10198], udle: [6051 0..16777216]
>         DrillBuf[10208], udle: [6051 0..16777216]
>   reservations: 0
>
>
> Fragment 0:0
>
> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
> at
> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArrived(
> QueryResultHandler.java:123)
> ~[classes/:na]
> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
> ~[classes/:na]
> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
> ~[classes/:na]
> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:275)
> ~[classes/:na]
> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:245)
> ~[classes/:na]
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(
> MessageToMessageDecoder.java:88)
> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(
> IdleStateHandler.java:287)
> ~[netty-handler-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(
> MessageToMessageDecoder.java:102)
> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
> ByteToMessageDecoder.java:312)
> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(
> ByteToMessageDecoder.java:286)
> ~[netty-codec-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(
> ChannelInboundHandlerAdapter.java:86)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(
> DefaultChannelPipeline.java:1294)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(
> DefaultChannelPipeline.java:911)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
> AbstractNioByteChannel.java:131)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:645)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
> NioEventLoop.java:580)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> NioEventLoop.java:497)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:131)
> ~[netty-common-4.0.48.Final.jar:4.0.48.Final]
> at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
>
> org.apache.drill.exec.rpc.RpcException:
> org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR:
> IllegalStateException: Allocator[frag:0:0] closed with outstanding buffers
> allocated (1).
> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> (res/actual/peak/limit)
>   child allocators: 0
>   ledgers: 1
>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> 505919927378312..0] holds 2 buffers.
>         DrillBuf[10198], udle: [6051 0..16777216]
>         DrillBuf[10208], udle: [6051 0..16777216]
>   reservations: 0
>
>
> Fragment 0:0
>
> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
>
> at org.apache.drill.exec.rpc.RpcException.mapException(
> RpcException.java:60)
> at
> org.apache.drill.exec.client.DrillClient$ListHoldingResultsListener.
> getResults(DrillClient.java:881)
> at org.apache.drill.exec.client.DrillClient.runQuery(DrillClient.java:583)
> at
> org.apache.drill.exec.physical.impl.join.TestHashJoin.simpleEqualityJoin(
> TestHashJoin.java:119)
> Caused by: org.apache.drill.common.exceptions.UserRemoteException: SYSTEM
> ERROR: IllegalStateException: Allocator[frag:0:0] closed with outstanding
> buffers allocated (1).
> Allocator(frag:0:0) 4000000/16777216/47664448/30357913941
> (res/actual/peak/limit)
>   child allocators: 0
>   ledgers: 1
>     ledger[6268] allocator: frag:0:0), isOwning: true, size: 16777216,
> references: 1, life: 505919927431943..0, allocatorManager: [6050, life:
> 505919927378312..0] holds 2 buffers.
>         DrillBuf[10198], udle: [6051 0..16777216]
>         DrillBuf[10208], udle: [6051 0..16777216]
>   reservations: 0
>
>
> Fragment 0:0
>
> [Error Id: 0042b168-8728-4367-b461-653837c3a276 on 10.15.235.86:31010]
> at
> org.apache.drill.exec.rpc.user.QueryResultHandler.resultArrived(
> QueryResultHandler.java:123)
> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:422)
> at org.apache.drill.exec.rpc.user.UserClient.handle(UserClient.java:96)
> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:275)
> at org.apache.drill.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:245)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(
> MessageToMessageDecoder.java:88)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(
> IdleStateHandler.java:287)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(
> MessageToMessageDecoder.java:102)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> at
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
> ByteToMessageDecoder.java:312)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(
> ByteToMessageDecoder.java:286)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(
> ChannelInboundHandlerAdapter.java:86)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
> AbstractChannelHandlerContext.java:335)
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(
> DefaultChannelPipeline.java:1294)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:356)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:342)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(
> DefaultChannelPipeline.java:911)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
> AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:645)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
> NioEventLoop.java:580)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:131)
> at java.lang.Thread.run(Thread.java:748)
>