You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:34 UTC
[11/13] drill git commit: DRILL-4134: Allocator Improvements
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java
deleted file mode 100644
index 6ea5670..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestBaseAllocator.java
+++ /dev/null
@@ -1,651 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import io.netty.buffer.DrillBuf;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.testing.ExecutionControls;
-import org.apache.drill.exec.util.Pointer;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-
-public class TestBaseAllocator {
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class);
-
- private final static int MAX_ALLOCATION = 8 * 1024;
-
-/*
- // ---------------------------------------- DEBUG -----------------------------------
-
- @After
- public void checkBuffers() {
- final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
- if (bufferCount != 0) {
- UnsafeDirectLittleEndian.logBuffers(logger);
- UnsafeDirectLittleEndian.releaseBuffers();
- }
-
- assertEquals(0, bufferCount);
- }
-
-// @AfterClass
-// public static void dumpBuffers() {
-// UnsafeDirectLittleEndian.logBuffers(logger);
-// }
-
- // ---------------------------------------- DEBUG ------------------------------------
-*/
-
- // Concoct ExecutionControls that won't try to inject anything.
- @Mock private static final OptionManager optionManager = Mockito.mock(OptionManager.class);
- static {
- Mockito.when(optionManager.getOption(Matchers.anyString()))
- .thenReturn(null);
- }
-
- @Mock private static final ExecutionControls executionControls = new ExecutionControls(optionManager, null);
-
- private final static class NamedOwner implements AllocatorOwner {
- private final String name;
-
- public NamedOwner(final String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- @Override
- public ExecutionControls getExecutionControls() {
- return executionControls;
- }
-
- @Override
- public FragmentContext getFragmentContext() {
- return null;
- }
- }
-
- @Test
- public void test_privateMax() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("noLimits");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_LOCAL_MAX, 0, MAX_ALLOCATION, 0)) {
- final DrillBuf drillBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
- assertNotNull("allocation failed", drillBuf1);
-
- try(final BufferAllocator childAllocator =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- final DrillBuf drillBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
- assertNotNull("allocation failed", drillBuf2);
- drillBuf2.release();
- }
-
- drillBuf1.release();
- }
- }
-
- @Test(expected=IllegalStateException.class)
- public void testRootAllocator_closeWithOutstanding() throws Exception {
- try {
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- final DrillBuf drillBuf = rootAllocator.buffer(512);
- assertNotNull("allocation failed", drillBuf);
- }
- } finally {
- /*
- * We expect there to be one unreleased underlying buffer because we're closing
- * without releasing it.
- */
-/*
- // ------------------------------- DEBUG ---------------------------------
- final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
- UnsafeDirectLittleEndian.releaseBuffers();
- assertEquals(1, bufferCount);
- // ------------------------------- DEBUG ---------------------------------
-*/
- }
- }
-
- @Test
- public void testRootAllocator_getEmpty() throws Exception {
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- final DrillBuf drillBuf = rootAllocator.buffer(0);
- assertNotNull("allocation failed", drillBuf);
- assertEquals("capacity was non-zero", 0, drillBuf.capacity());
- drillBuf.release();
- }
- }
-
- @Ignore // TODO(DRILL-2740)
- @Test(expected = IllegalStateException.class)
- public void testAllocator_unreleasedEmpty() throws Exception {
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- @SuppressWarnings("unused")
- final DrillBuf drillBuf = rootAllocator.buffer(0);
- }
- }
-
- @Test
- public void testAllocator_transferOwnership() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("changeOwnership");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- final BufferAllocator childAllocator1 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- final BufferAllocator childAllocator2 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-
- final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
- rootAllocator.verify();
- final boolean allocationFit = childAllocator2.takeOwnership(drillBuf1);
- rootAllocator.verify();
- assertTrue(allocationFit);
-
- childAllocator1.close();
- rootAllocator.verify();
-
- drillBuf1.release();
- childAllocator2.close();
- }
- }
-
- @Test
- public void testAllocator_shareOwnership() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("shareOwnership");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- final BufferAllocator childAllocator1 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- final BufferAllocator childAllocator2 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
- rootAllocator.verify();
- final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
- boolean allocationFit;
-
- allocationFit = childAllocator2.shareOwnership(drillBuf1, pDrillBuf);
- assertTrue(allocationFit);
- rootAllocator.verify();
- final DrillBuf drillBuf2 = pDrillBuf.value;
- assertNotNull(drillBuf2);
- assertNotEquals(drillBuf2, drillBuf1);
-
- drillBuf1.release();
- rootAllocator.verify();
- childAllocator1.close();
- rootAllocator.verify();
-
- final BufferAllocator childAllocator3 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- allocationFit = childAllocator3.shareOwnership(drillBuf2, pDrillBuf);
- assertTrue(allocationFit);
- final DrillBuf drillBuf3 = pDrillBuf.value;
- assertNotNull(drillBuf3);
- assertNotEquals(drillBuf3, drillBuf1);
- assertNotEquals(drillBuf3, drillBuf2);
- rootAllocator.verify();
-
- drillBuf2.release();
- rootAllocator.verify();
- childAllocator2.close();
- rootAllocator.verify();
-
- drillBuf3.release();
- rootAllocator.verify();
- childAllocator3.close();
- }
- }
-
- @Test
- public void testRootAllocator_createChildAndUse() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("createChildAndUse");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- try(final BufferAllocator childAllocator =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- final DrillBuf drillBuf = childAllocator.buffer(512);
- assertNotNull("allocation failed", drillBuf);
- drillBuf.release();
- }
- }
- }
-
- @Test(expected=IllegalStateException.class)
- public void testRootAllocator_createChildDontClose() throws Exception {
- try {
- final AllocatorOwner allocatorOwner = new NamedOwner("createChildDontClose");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- final BufferAllocator childAllocator =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- final DrillBuf drillBuf = childAllocator.buffer(512);
- assertNotNull("allocation failed", drillBuf);
- }
- } finally {
- /*
- * We expect one underlying buffer because we closed a child allocator without
- * releasing the buffer allocated from it.
- */
-/*
- // ------------------------------- DEBUG ---------------------------------
- final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
- UnsafeDirectLittleEndian.releaseBuffers();
- assertEquals(1, bufferCount);
- // ------------------------------- DEBUG ---------------------------------
-*/
- }
- }
-
- private static void allocateAndFree(final BufferAllocator allocator) {
- final DrillBuf drillBuf = allocator.buffer(512);
- assertNotNull("allocation failed", drillBuf);
- drillBuf.release();
-
- final DrillBuf drillBuf2 = allocator.buffer(MAX_ALLOCATION);
- assertNotNull("allocation failed", drillBuf2);
- drillBuf2.release();
-
- final int nBufs = 8;
- final DrillBuf[] drillBufs = new DrillBuf[nBufs];
- for(int i = 0; i < drillBufs.length; ++i) {
- DrillBuf drillBufi = allocator.buffer(MAX_ALLOCATION / nBufs);
- assertNotNull("allocation failed", drillBufi);
- drillBufs[i] = drillBufi;
- }
- for(DrillBuf drillBufi : drillBufs) {
- drillBufi.release();
- }
- }
-
- @Test
- public void testAllocator_manyAllocations() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("manyAllocations");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- try(final BufferAllocator childAllocator =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- allocateAndFree(childAllocator);
- }
- }
- }
-
- @Test
- public void testAllocator_overAllocate() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("overAllocate");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- try(final BufferAllocator childAllocator =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- allocateAndFree(childAllocator);
-
- try {
- childAllocator.buffer(MAX_ALLOCATION + 1);
- fail("allocated memory beyond max allowed");
- } catch(OutOfMemoryRuntimeException e) {
- // expected
- }
- }
- }
- }
-
- @Test
- public void testAllocator_overAllocateParent() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("overAllocateParent");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- try(final BufferAllocator childAllocator =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- final DrillBuf drillBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
- assertNotNull("allocation failed", drillBuf1);
- final DrillBuf drillBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
- assertNotNull("allocation failed", drillBuf2);
-
- try {
- childAllocator.buffer(MAX_ALLOCATION / 4);
- fail("allocated memory beyond max allowed");
- } catch(OutOfMemoryRuntimeException e) {
- // expected
- }
-
- drillBuf1.release();
- drillBuf2.release();
- }
- }
- }
-
- private static void testAllocator_sliceUpBufferAndRelease(
- final RootAllocator rootAllocator, final BufferAllocator bufferAllocator) {
- final DrillBuf drillBuf1 = bufferAllocator.buffer(MAX_ALLOCATION / 2);
- rootAllocator.verify();
-
- final DrillBuf drillBuf2 = drillBuf1.slice(16, drillBuf1.capacity() - 32);
- rootAllocator.verify();
- final DrillBuf drillBuf3 = drillBuf2.slice(16, drillBuf2.capacity() - 32);
- rootAllocator.verify();
- @SuppressWarnings("unused")
- final DrillBuf drillBuf4 = drillBuf3.slice(16, drillBuf3.capacity() - 32);
- rootAllocator.verify();
-
- drillBuf3.release(); // since they share refcounts, one is enough to release them all
- rootAllocator.verify();
- }
-
- @Test
- public void testAllocator_createSlices() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("createSlices");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
-
- try(final BufferAllocator childAllocator =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
- }
- rootAllocator.verify();
-
- testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);
-
- try(final BufferAllocator childAllocator =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- try(final BufferAllocator childAllocator2 =
- childAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- final DrillBuf drillBuf1 = childAllocator2.buffer(MAX_ALLOCATION / 8);
- @SuppressWarnings("unused")
- final DrillBuf drillBuf2 = drillBuf1.slice(MAX_ALLOCATION / 16, MAX_ALLOCATION / 16);
- testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
- drillBuf1.release();
- rootAllocator.verify();
- }
- rootAllocator.verify();
-
- testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
- }
- rootAllocator.verify();
- }
- }
-
- @Test
- public void testAllocator_sliceRanges() throws Exception {
-// final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- // Populate a buffer with byte values corresponding to their indices.
- final DrillBuf drillBuf = rootAllocator.buffer(256, 256 + 256);
- assertEquals(256, drillBuf.capacity());
- assertEquals(256 + 256, drillBuf.maxCapacity());
- assertEquals(0, drillBuf.readerIndex());
- assertEquals(0, drillBuf.readableBytes());
- assertEquals(0, drillBuf.writerIndex());
- assertEquals(256, drillBuf.writableBytes());
-
- final DrillBuf slice3 = (DrillBuf) drillBuf.slice();
- assertEquals(0, slice3.readerIndex());
- assertEquals(0, slice3.readableBytes());
- assertEquals(0, slice3.writerIndex());
-// assertEquals(256, slice3.capacity());
-// assertEquals(256, slice3.writableBytes());
-
- for(int i = 0; i < 256; ++i) {
- drillBuf.writeByte(i);
- }
- assertEquals(0, drillBuf.readerIndex());
- assertEquals(256, drillBuf.readableBytes());
- assertEquals(256, drillBuf.writerIndex());
- assertEquals(0, drillBuf.writableBytes());
-
- final DrillBuf slice1 = (DrillBuf) drillBuf.slice();
- assertEquals(0, slice1.readerIndex());
- assertEquals(256, slice1.readableBytes());
- for(int i = 0; i < 10; ++i) {
- assertEquals(i, slice1.readByte());
- }
- assertEquals(256 - 10, slice1.readableBytes());
- for(int i = 0; i < 256; ++i) {
- assertEquals((byte) i, slice1.getByte(i));
- }
-
- final DrillBuf slice2 = (DrillBuf) drillBuf.slice(25, 25);
- assertEquals(0, slice2.readerIndex());
- assertEquals(25, slice2.readableBytes());
- for(int i = 25; i < 50; ++i) {
- assertEquals(i, slice2.readByte());
- }
-
-/*
- for(int i = 256; i > 0; --i) {
- slice3.writeByte(i - 1);
- }
- for(int i = 0; i < 256; ++i) {
- assertEquals(255 - i, slice1.getByte(i));
- }
-*/
-
- drillBuf.release(); // all the derived buffers share this fate
- }
- }
-
- @Test
- public void testAllocator_slicesOfSlices() throws Exception {
-// final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- // Populate a buffer with byte values corresponding to their indices.
- final DrillBuf drillBuf = rootAllocator.buffer(256, 256 + 256);
- for(int i = 0; i < 256; ++i) {
- drillBuf.writeByte(i);
- }
-
- // Slice it up.
- final DrillBuf slice0 = drillBuf.slice(0, drillBuf.capacity());
- for(int i = 0; i < 256; ++i) {
- assertEquals((byte) i, drillBuf.getByte(i));
- }
-
- final DrillBuf slice10 = slice0.slice(10, drillBuf.capacity() - 10);
- for(int i = 10; i < 256; ++i) {
- assertEquals((byte) i, slice10.getByte(i - 10));
- }
-
- final DrillBuf slice20 = slice10.slice(10, drillBuf.capacity() - 20);
- for(int i = 20; i < 256; ++i) {
- assertEquals((byte) i, slice20.getByte(i - 20));
- }
-
- final DrillBuf slice30 = slice20.slice(10, drillBuf.capacity() - 30);
- for(int i = 30; i < 256; ++i) {
- assertEquals((byte) i, slice30.getByte(i - 30));
- }
-
- drillBuf.release();
- }
- }
-
- @Test
- public void testAllocator_transferSliced() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("transferSliced");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- final BufferAllocator childAllocator1 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- final BufferAllocator childAllocator2 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-
- final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
- final DrillBuf drillBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
-
- final DrillBuf drillBuf1s = drillBuf1.slice(0, drillBuf1.capacity() / 2);
- final DrillBuf drillBuf2s = drillBuf2.slice(0, drillBuf2.capacity() / 2);
-
- rootAllocator.verify();
-
- childAllocator1.takeOwnership(drillBuf2s);
- rootAllocator.verify();
- childAllocator2.takeOwnership(drillBuf1s);
- rootAllocator.verify();
-
- drillBuf1s.release(); // releases drillBuf1
- drillBuf2s.release(); // releases drillBuf2
-
- childAllocator1.close();
- childAllocator2.close();
- }
- }
-
- @Test
- public void testAllocator_shareSliced() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("transferSliced");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- final BufferAllocator childAllocator1 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- final BufferAllocator childAllocator2 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-
- final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
- final DrillBuf drillBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);
-
- final DrillBuf drillBuf1s = drillBuf1.slice(0, drillBuf1.capacity() / 2);
- final DrillBuf drillBuf2s = drillBuf2.slice(0, drillBuf2.capacity() / 2);
-
- rootAllocator.verify();
-
- final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
- childAllocator1.shareOwnership(drillBuf2s, pDrillBuf);
- final DrillBuf drillBuf2s1 = pDrillBuf.value;
- childAllocator2.shareOwnership(drillBuf1s, pDrillBuf);
- final DrillBuf drillBuf1s2 = pDrillBuf.value;
- rootAllocator.verify();
-
- drillBuf1s.release(); // releases drillBuf1
- drillBuf2s.release(); // releases drillBuf2
- rootAllocator.verify();
-
- drillBuf2s1.release(); // releases the shared drillBuf2 slice
- drillBuf1s2.release(); // releases the shared drillBuf1 slice
-
- childAllocator1.close();
- childAllocator2.close();
- }
- }
-
- @Test
- public void testAllocator_transferShared() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("transferShared");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- final BufferAllocator childAllocator1 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- final BufferAllocator childAllocator2 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- final BufferAllocator childAllocator3 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
-
- final DrillBuf drillBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
-
- final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
- boolean allocationFit;
-
- allocationFit = childAllocator2.shareOwnership(drillBuf1, pDrillBuf);
- assertTrue(allocationFit);
- rootAllocator.verify();
- final DrillBuf drillBuf2 = pDrillBuf.value;
- assertNotNull(drillBuf2);
- assertNotEquals(drillBuf2, drillBuf1);
-
- allocationFit = childAllocator3.takeOwnership(drillBuf1);
- assertTrue(allocationFit);
- rootAllocator.verify();
-
- // Since childAllocator3 now has childAllocator1's buffer, 1, can close
- childAllocator1.close();
- rootAllocator.verify();
-
- drillBuf2.release();
- childAllocator2.close();
- rootAllocator.verify();
-
- final BufferAllocator childAllocator4 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0);
- allocationFit = childAllocator4.takeOwnership(drillBuf1);
- assertTrue(allocationFit);
- rootAllocator.verify();
-
- childAllocator3.close();
- rootAllocator.verify();
-
- drillBuf1.release();
- childAllocator4.close();
- rootAllocator.verify();
- }
- }
-
- @Test
- public void testAllocator_unclaimedReservation() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("unclaimedReservation");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- try(final BufferAllocator childAllocator1 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- try(final AllocationReservation reservation = childAllocator1.newReservation()) {
- assertTrue(reservation.add(64));
- }
- rootAllocator.verify();
- }
- }
- }
-
- @Test
- public void testAllocator_claimedReservation() throws Exception {
- final AllocatorOwner allocatorOwner = new NamedOwner("claimedReservation");
- try(final RootAllocator rootAllocator =
- new RootAllocator(RootAllocator.POLICY_PER_FRAGMENT, 0, MAX_ALLOCATION, 0)) {
- try(final BufferAllocator childAllocator1 =
- rootAllocator.newChildAllocator(allocatorOwner, 0, MAX_ALLOCATION, 0)) {
- try(final AllocationReservation reservation = childAllocator1.newReservation()) {
- assertTrue(reservation.add(32));
- assertTrue(reservation.add(32));
-
- final DrillBuf drillBuf = reservation.buffer();
- assertEquals(64, drillBuf.capacity());
- rootAllocator.verify();
-
- drillBuf.release();
- rootAllocator.verify();
- }
- rootAllocator.verify();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 94aa84e..7207bf2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.control.Controller;
@@ -112,7 +113,7 @@ public class TestOptiqPlans extends ExecTest {
workBus,
new LocalPStoreProvider(config));
final QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(),
- bitContext);
+ bitContext, QueryId.getDefaultInstance());
final PhysicalPlanReader reader = bitContext.getPlanReader();
final LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
final PhysicalPlan pp = new BasicOptimizer(qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index 0a9b470..96f2b33 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -24,9 +24,6 @@ import io.netty.buffer.DrillBuf;
import java.nio.charset.Charset;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.SchemaPath;
@@ -47,9 +44,9 @@ import org.apache.drill.exec.expr.holders.RepeatedVarBinaryHolder;
import org.apache.drill.exec.expr.holders.UInt1Holder;
import org.apache.drill.exec.expr.holders.UInt4Holder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.BitVector;
@@ -67,6 +64,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
public class TestValueVector extends ExecTest {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestValueVector.class);
@@ -156,11 +156,11 @@ public class TestValueVector extends ExecTest {
final int expectedOffsetSize = 10;
try {
vector.allocateNew(expectedAllocationInBytes, 10);
- assertEquals(expectedOffsetSize, vector.getValueCapacity());
- assertEquals(expectedAllocationInBytes, vector.getBuffer().capacity());
+ assertTrue(expectedOffsetSize <= vector.getValueCapacity());
+ assertTrue(expectedAllocationInBytes <= vector.getBuffer().capacity());
vector.reAlloc();
- assertEquals(expectedOffsetSize * 2, vector.getValueCapacity());
- assertEquals(expectedAllocationInBytes * 2, vector.getBuffer().capacity());
+ assertTrue(expectedOffsetSize * 2 <= vector.getValueCapacity());
+ assertTrue(expectedAllocationInBytes * 2 <= vector.getBuffer().capacity());
} finally {
vector.close();
}
@@ -666,8 +666,11 @@ the interface to load has changed
for (int i = 0; i < valueVectors.length; i++) {
final ValueVector vv = valueVectors[i];
final int vvCapacity = vv.getValueCapacity();
- assertEquals(String.format("Incorrect value capacity for %s [%d]", vv.getField(), vvCapacity),
- initialCapacity, vvCapacity);
+
+ // this can't be equality because Nullables will be allocated using power of two sized buffers (thus need 1025
+ // spots in one vector > power of two is 2048, available capacity will be 2048 => 2047)
+ assertTrue(String.format("Incorrect value capacity for %s [%d]", vv.getField(), vvCapacity),
+ initialCapacity <= vvCapacity);
}
} finally {
AutoCloseables.close(valueVectors);
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 107f978..73ed65e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -18,12 +18,18 @@
package org.apache.drill.exec.server;
import static org.junit.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.NonStrictExpectations;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
@@ -35,22 +41,21 @@ import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.apache.drill.exec.rpc.data.AckSender;
import org.apache.drill.exec.rpc.data.DataConnectionManager;
-import org.apache.drill.exec.rpc.data.DataResponseHandler;
import org.apache.drill.exec.rpc.data.DataServer;
import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
@@ -60,33 +65,59 @@ import org.junit.Test;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
-import mockit.Injectable;
-import mockit.NonStrictExpectations;
-
public class TestBitRpc extends ExecTest {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class);
@Test
- public void testConnectionBackpressure(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus, @Injectable final FragmentManager fman, @Injectable final FragmentContext fcon) throws Exception {
+ public void testConnectionBackpressure(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
DrillConfig config1 = DrillConfig.create();
final BootStrapContext c = new BootStrapContext(config1, ClassPathScanner.fromPrescan(config1));
DrillConfig config2 = DrillConfig.create();
BootStrapContext c2 = new BootStrapContext(config2, ClassPathScanner.fromPrescan(config2));
+ final FragmentContext fcon = new MockUp<FragmentContext>(){
+ BufferAllocator getAllocator(){
+ return c.getAllocator();
+ }
+ }.getMockInstance();
+
+ final FragmentManager fman = new MockUp<FragmentManager>(){
+ int v = 0;
+
+ @Mock
+ boolean handle(IncomingDataBatch batch) throws FragmentSetupException, IOException {
+ try {
+ v++;
+ if (v % 10 == 0) {
+ System.out.println("sleeping.");
+ Thread.sleep(3000);
+ }
+ } catch (InterruptedException e) {
+
+ }
+ RawFragmentBatch rfb = batch.newRawFragmentBatch(c.getAllocator());
+ rfb.sendOk();
+ rfb.release();
+
+ return true;
+ }
+
+ public FragmentContext getFragmentContext(){
+ return fcon;
+ }
+
+ }.getMockInstance();
+
+
new NonStrictExpectations() {{
workBus.getFragmentManagerIfExists((FragmentHandle) any); result = fman;
workBus.getFragmentManager( (FragmentHandle) any); result = fman;
- fman.getFragmentContext(); result = fcon;
- fcon.getAllocator(); result = c.getAllocator();
}};
int port = 1234;
- DataResponseHandler drp = new BitComTestHandler();
- DataServer server = new DataServer(c, workBus, drp);
+ DataServer server = new DataServer(c, c.getAllocator(), workBus, null);
port = server.bind(port, true);
DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
@@ -154,31 +185,4 @@ public class TestBitRpc extends ExecTest {
}
}
- private class BitComTestHandler implements DataResponseHandler {
-
- int v = 0;
-
- @Override
- public void informOutOfMemory() {
- }
-
- @Override
- public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch, DrillBuf data, AckSender sender)
- throws FragmentSetupException, IOException {
- // System.out.println("Received.");
- try {
- v++;
- if (v % 10 == 0) {
- System.out.println("sleeping.");
- Thread.sleep(3000);
- }
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- sender.sendOk();
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
index c911f79..44cc3a7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
@@ -17,20 +17,21 @@
*/
package org.apache.drill.exec.testing;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.util.Pointer;
import org.junit.Test;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class TestCountDownLatchInjection extends BaseTestQuery {
private static final UserSession session = UserSession.Builder.newBuilder()
@@ -132,7 +133,7 @@ public class TestCountDownLatchInjection extends BaseTestQuery {
ControlsInjectionUtil.setControls(session, controls);
- final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+ final QueryContext queryContext = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
final DummyClass dummyClass = new DummyClass(queryContext, trigger, threads);
(new ThreadCreator(dummyClass, trigger, threads, countingDownTime)).start();
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
index 40620c2..84a7320 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
@@ -17,12 +17,18 @@
*/
package org.apache.drill.exec.testing;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.Drillbit;
@@ -30,11 +36,6 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.junit.Test;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
public class TestExceptionInjection extends BaseTestQuery {
private static final String NO_THROW_FAIL = "Didn't throw expected exception";
@@ -135,7 +136,7 @@ public class TestExceptionInjection extends BaseTestQuery {
+ "}]}";
ControlsInjectionUtil.setControls(session, jsonString);
- final QueryContext context = new QueryContext(session, bits[0].getContext());
+ final QueryContext context = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
// test that the exception gets thrown
final DummyClass dummyClass = new DummyClass(context);
@@ -156,7 +157,7 @@ public class TestExceptionInjection extends BaseTestQuery {
.build();
ControlsInjectionUtil.setControls(session, controls);
- final QueryContext context = new QueryContext(session, bits[0].getContext());
+ final QueryContext context = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
// test that the expected exception (checked) gets thrown
final DummyClass dummyClass = new DummyClass(context);
@@ -185,7 +186,7 @@ public class TestExceptionInjection extends BaseTestQuery {
.build();
ControlsInjectionUtil.setControls(session, controls);
- final QueryContext context = new QueryContext(session, bits[0].getContext());
+ final QueryContext context = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
final DummyClass dummyClass = new DummyClass(context);
@@ -246,7 +247,7 @@ public class TestExceptionInjection extends BaseTestQuery {
ControlsInjectionUtil.setControls(session, controls);
{
- final QueryContext queryContext1 = new QueryContext(session, drillbitContext1);
+ final QueryContext queryContext1 = new QueryContext(session, drillbitContext1, QueryId.getDefaultInstance());
final DummyClass class1 = new DummyClass(queryContext1);
// these shouldn't throw
@@ -268,7 +269,7 @@ public class TestExceptionInjection extends BaseTestQuery {
}
}
{
- final QueryContext queryContext2 = new QueryContext(session, drillbitContext2);
+ final QueryContext queryContext2 = new QueryContext(session, drillbitContext2, QueryId.getDefaultInstance());
final DummyClass class2 = new DummyClass(queryContext2);
// these shouldn't throw
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index f07f676..54f851a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -17,6 +17,11 @@
*/
package org.apache.drill.exec.testing;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.config.DrillConfig;
@@ -24,6 +29,7 @@ import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
import org.apache.drill.exec.rpc.user.UserSession;
@@ -34,11 +40,6 @@ import org.apache.drill.exec.util.Pointer;
import org.junit.Test;
import org.slf4j.Logger;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class TestPauseInjection extends BaseTestQuery {
private static final UserSession session = UserSession.Builder.newBuilder()
@@ -126,7 +127,7 @@ public class TestPauseInjection extends BaseTestQuery {
ControlsInjectionUtil.setControls(session, controls);
- final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+ final QueryContext queryContext = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
(new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
@@ -181,7 +182,7 @@ public class TestPauseInjection extends BaseTestQuery {
final long expectedDuration = 1000L;
final ExtendedLatch trigger = new ExtendedLatch(1);
final Pointer<Exception> ex = new Pointer<>();
- final QueryContext queryContext = new QueryContext(session, drillbitContext1);
+ final QueryContext queryContext = new QueryContext(session, drillbitContext1, QueryId.getDefaultInstance());
(new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
// test that the pause happens
@@ -199,7 +200,7 @@ public class TestPauseInjection extends BaseTestQuery {
{
final ExtendedLatch trigger = new ExtendedLatch(1);
- final QueryContext queryContext = new QueryContext(session, drillbitContext2);
+ final QueryContext queryContext = new QueryContext(session, drillbitContext2, QueryId.getDefaultInstance());
// if the resume did not happen, the test would hang
final DummyClass dummyClass = new DummyClass(queryContext, trigger);
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
index 7ab2da2..d2f2590 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
@@ -18,11 +18,13 @@
package org.apache.drill.exec.testing;
import static org.junit.Assert.fail;
+import io.netty.buffer.DrillBuf;
-import com.google.common.base.Charsets;
-import com.google.common.io.Resources;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Properties;
-import io.netty.buffer.DrillBuf;
+import javax.inject.Inject;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.common.config.DrillConfig;
@@ -42,16 +44,12 @@ import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.test.DrillTest;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import javax.inject.Inject;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.Properties;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
/*
* TODO(DRILL-3170)
@@ -138,7 +136,7 @@ public class TestResourceLeak extends DrillTest {
@Override
public void eval() {
- buf.getAllocator().buffer(1);
+ buf.retain();
out.value = in.value;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java
index 60a2268..223f4ed 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestPromotableWriter.java
@@ -20,8 +20,6 @@ package org.apache.drill.exec.vector.complex.writer;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.TestOutputMutator;
import org.apache.drill.exec.util.BatchPrinter;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/pom.xml
----------------------------------------------------------------------
diff --git a/exec/memory/base/pom.xml b/exec/memory/base/pom.xml
index adec763..686a12b 100644
--- a/exec/memory/base/pom.xml
+++ b/exec/memory/base/pom.xml
@@ -28,7 +28,17 @@
<version>3.0.1</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.carrotsearch</groupId>
+ <artifactId>hppc</artifactId>
+ <version>0.5.2</version>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index d244b26..138495c 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -27,23 +27,16 @@ import java.nio.ByteOrder;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.drill.common.HistoricalLog;
-import org.apache.drill.exec.memory.Accountor;
+import org.apache.drill.exec.memory.AllocatorManager.BufferLedger;
import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
+import org.apache.drill.exec.memory.BoundsChecking;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.BufferManager;
-import org.apache.drill.exec.memory.BufferLedger;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.util.AssertionUtil;
-import org.apache.drill.exec.util.Pointer;
-import org.slf4j.Logger;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
@@ -51,343 +44,46 @@ import com.google.common.base.Preconditions;
public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
- private static final boolean BOUNDS_CHECKING_ENABLED = AssertionUtil.BOUNDS_CHECKING_ENABLED;
- private static final boolean DEBUG = BaseAllocator.isDebug();
- private static final AtomicInteger idGenerator = new AtomicInteger(0);
+ private static final AtomicLong idGenerator = new AtomicLong(0);
- private final ByteBuf byteBuf;
+ private final long id = idGenerator.incrementAndGet();
+ private final AtomicInteger refCnt;
+ private final UnsafeDirectLittleEndian byteBuf;
private final long addr;
private final int offset;
- private final int flags;
- private final AtomicInteger rootRefCnt;
- private volatile BufferAllocator allocator;
-
- // TODO - cleanup
- // The code is partly shared and partly copy-pasted between
- // these three types. They should be unified under one interface
- // to share code and to remove the hacky code here to use only
- // one of these types at a time and use null checks to find out
- // which.
- private final boolean oldWorld; // Indicates that we're operating with TopLevelAllocator.
- private final boolean rootBuffer;
- private volatile Accountor acct;
- private BufferManager bufManager;
- @Deprecated private OperatorContext operatorContext;
- @Deprecated private FragmentContext fragmentContext;
-
- private volatile BufferLedger bufferLedger;
- private volatile int length; // TODO this just seems to duplicate .capacity()
-
- // members used purely for debugging
- // TODO once we have a reduced number of constructors, move these to DEBUG clauses in them
- private final int id = idGenerator.incrementAndGet();
- private final HistoricalLog historicalLog = DEBUG ? new HistoricalLog(4, "DrillBuf[%d]", id) : null;
- private final static IdentityHashMap<UnsafeDirectLittleEndian, Collection<DrillBuf>> unwrappedMap =
- DEBUG ? new IdentityHashMap<UnsafeDirectLittleEndian, Collection<DrillBuf>>() : null;
-
- // TODO(cwestin) javadoc
- private void unwrappedPut() {
- final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) byteBuf;
- synchronized(unwrappedMap) {
- Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
- if (drillBufs == null) {
- drillBufs = new LinkedList<DrillBuf>();
- unwrappedMap.put(udle, drillBufs);
- }
-
- drillBufs.add(this);
- }
- }
-
- // TODO(cwestin) javadoc
- public static Collection<DrillBuf> unwrappedGet(final UnsafeDirectLittleEndian udle) {
- synchronized(unwrappedMap) {
- final Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
- if (drillBufs == null) {
- return Collections.emptyList();
- }
- return new LinkedList<DrillBuf>(drillBufs);
- }
- }
-
- // TODO(cwestin) javadoc
- private static boolean unwrappedRemove(final DrillBuf drillBuf) {
- final ByteBuf byteBuf = drillBuf.unwrap();
- if (!(byteBuf instanceof UnsafeDirectLittleEndian)) {
- return false;
- }
-
- final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) byteBuf;
- synchronized(unwrappedMap) {
- Collection<DrillBuf> drillBufs = unwrappedMap.get(udle);
- if (drillBufs == null) {
- return false;
- }
- final Object object = drillBufs.remove(drillBuf);
- if (drillBufs.isEmpty()) {
- unwrappedMap.remove(udle);
- }
- return object != null;
- }
- }
-
- public DrillBuf(BufferAllocator allocator, Accountor a, UnsafeDirectLittleEndian b) {
- super(b.maxCapacity());
- this.byteBuf = b;
- this.addr = b.memoryAddress();
- this.acct = a;
- this.length = b.capacity();
- this.offset = 0;
- this.rootBuffer = true;
- this.allocator = allocator;
-
- // members from the new world order
- flags = 0;
- rootRefCnt = null;
- oldWorld = true;
- }
-
- // TODO(cwestin) javadoc
- public DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator,
- final UnsafeDirectLittleEndian byteBuf) {
+ private final BufferLedger ledger;
+ private final BufferManager bufManager;
+ private final ByteBufAllocator alloc;
+ private final boolean isEmpty;
+ private volatile int length;
+
+ private final HistoricalLog historicalLog = BaseAllocator.DEBUG ?
+ new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "DrillBuf[%d]", id) : null;
+
+ public DrillBuf(
+ final AtomicInteger refCnt,
+ final BufferLedger ledger,
+ final UnsafeDirectLittleEndian byteBuf,
+ final BufferManager manager,
+ final ByteBufAllocator alloc,
+ final int offset,
+ final int length,
+ boolean isEmpty) {
super(byteBuf.maxCapacity());
+ this.refCnt = refCnt;
this.byteBuf = byteBuf;
- byteBuf.retain(1);
- this.bufferLedger = bufferLedger;
- addr = byteBuf.memoryAddress();
- allocator = bufferAllocator;
- length = byteBuf.capacity();
- offset = 0;
- flags = 0;
- rootRefCnt = new AtomicInteger(1);
- oldWorld = false;
-
- // members from the old world order
- rootBuffer = false;
- acct = null;
-
- if (DEBUG) {
- unwrappedPut();
- historicalLog.recordEvent(
- "DrillBuf(BufferLedger, BufferAllocator[%d], UnsafeDirectLittleEndian[identityHashCode == "
- + "%d](%s)) => rootRefCnt identityHashCode == %d",
- bufferAllocator.getId(), System.identityHashCode(byteBuf), byteBuf.toString(),
- System.identityHashCode(rootRefCnt));
- }
- }
-
- private DrillBuf(BufferAllocator allocator, Accountor a) {
- super(0);
- this.byteBuf = new EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
- this.allocator = allocator;
- this.acct = a;
- this.length = 0;
- this.addr = 0;
- this.rootBuffer = false;
- this.offset = 0;
-
- // members from the new world order
- flags = 0;
- rootRefCnt = null;
- oldWorld = true;
- }
-
- private DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator) {
- super(0);
- this.bufferLedger = bufferLedger;
- allocator = bufferAllocator;
-
- byteBuf = new EmptyByteBuf(bufferLedger.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
- length = 0;
- addr = 0;
- flags = 0;
- rootRefCnt = new AtomicInteger(1);
- offset = 0;
-
- // members from the old world order
- rootBuffer = false;
- acct = null;
- oldWorld = false;
-
- if (DEBUG) {
- // We don't put the empty buffers in the unwrappedMap.
- historicalLog.recordEvent(
- "DrillBuf(BufferLedger, BufferAllocator[%d]) => rootRefCnt identityHashCode == %d",
- bufferAllocator.getId(), System.identityHashCode(rootRefCnt));
- }
- }
-
- /**
- * Special constructor used for RPC ownership transfer. Takes a snapshot slice of the current buf
- * but points directly to the underlying UnsafeLittleEndian buffer. Does this by calling unwrap()
- * twice on the provided DrillBuf and expecting an UnsafeDirectLittleEndian buffer. This operation
- * includes taking a new reference count on the underlying buffer and maintaining returning with a
- * current reference count for itself (masking the underlying reference count).
- * @param allocator
- * @param a Allocator used when users try to receive allocator from buffer.
- * @param b Accountor used for accounting purposes.
- */
- public DrillBuf(BufferAllocator allocator, Accountor a, DrillBuf b) {
- this(allocator, a, getUnderlying(b), b, 0, b.length, true);
- assert b.unwrap().unwrap() instanceof UnsafeDirectLittleEndian;
- b.unwrap().unwrap().retain();
- }
-
- private DrillBuf(DrillBuf buffer, int index, int length) {
- this(buffer.allocator, null, buffer, buffer, index, length, false);
- }
-
- private static ByteBuf getUnderlying(DrillBuf b){
- ByteBuf underlying = b.unwrap().unwrap();
- return underlying.slice((int) (b.memoryAddress() - underlying.memoryAddress()), b.length);
- }
-
- private DrillBuf(BufferAllocator allocator, Accountor a, ByteBuf replacement, DrillBuf buffer, int index, int length, boolean root) {
- super(length);
- if (index < 0 || index > buffer.capacity() - length) {
- throw new IndexOutOfBoundsException(buffer.toString() + ".slice(" + index + ", " + length + ')');
- }
-
- this.length = length;
- writerIndex(length);
-
- this.byteBuf = replacement;
- this.addr = buffer.memoryAddress() + index;
- this.offset = index;
- this.acct = a;
+ this.isEmpty = isEmpty;
+ this.bufManager = manager;
+ this.alloc = alloc;
+ this.addr = byteBuf.memoryAddress() + offset;
+ this.ledger = ledger;
this.length = length;
- this.rootBuffer = root;
- this.allocator = allocator;
+ this.offset = offset;
- // members from the new world order
- flags = 0;
- rootRefCnt = null;
- oldWorld = true;
- }
-
- /**
- * Indicate a shared refcount, as per http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5
- */
- private final static int F_DERIVED = 0x0002;
-
- // TODO(cwestin) javadoc
- /**
- * Used for sharing.
- *
- * @param bufferLedger
- * @param bufferAllocator
- * @param originalBuf
- * @param index
- * @param length
- * @param flags
- */
- public DrillBuf(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator,
- final DrillBuf originalBuf, final int index, final int length, final int flags) {
- this(bufferAllocator, bufferLedger, getUnderlyingUdle(originalBuf),
- originalBuf, index + originalBuf.offset, length, flags);
- }
-
- /**
- * Unwraps a DrillBuf until the underlying UnsafeDirectLittleEndian buffer is
- * found.
- *
- * @param originalBuf the original DrillBuf
- * @return the underlying UnsafeDirectLittleEndian ByteBuf
- */
- private static ByteBuf getUnderlyingUdle(final DrillBuf originalBuf) {
- int count = 1;
- ByteBuf unwrapped = originalBuf.unwrap();
- while(!(unwrapped instanceof UnsafeDirectLittleEndian)
- && (!(unwrapped instanceof EmptyByteBuf))) {
- unwrapped = unwrapped.unwrap();
- ++count;
- }
-
- if (DEBUG) {
- if (count > 1) {
- throw new IllegalStateException("UnsafeDirectLittleEndian is wrapped more than one level");
- }
+ if (BaseAllocator.DEBUG) {
+ historicalLog.recordEvent("create()");
}
- return unwrapped;
- }
-
- // TODO(cwestin) javadoc
- /*
- * TODO the replacement argument becomes an UnsafeDirectLittleEndian;
- * buffer argument may go away if it is determined to be unnecessary after all
- * the deprecated stuff is removed (I suspect only the replacement argument is
- * necessary then).
- */
- private DrillBuf(BufferAllocator allocator, BufferLedger bufferLedger,
- ByteBuf replacement, DrillBuf buffer, int index, int length, int flags) {
- super(replacement.maxCapacity());
-
- // members from the old world order
- rootBuffer = false;
- acct = null;
- oldWorld = false;
-
- if (index < 0 || index > (replacement.maxCapacity() - length)) {
- throw new IndexOutOfBoundsException(replacement.toString() + ".slice(" + index + ", " + length + ')');
- }
-
- this.flags = flags;
-
- this.length = length; // capacity()
- writerIndex(length);
-
- byteBuf = replacement;
- if ((flags & F_DERIVED) == 0) {
- replacement.retain(1);
- }
-
- addr = replacement.memoryAddress() + index;
- offset = index;
- this.bufferLedger = bufferLedger;
- if (!(buffer instanceof DrillBuf)) {
- throw new IllegalArgumentException("DrillBuf slicing can only be performed on other DrillBufs");
- }
-
- if ((flags & F_DERIVED) != 0) {
- final DrillBuf rootBuf = (DrillBuf) buffer;
- rootRefCnt = rootBuf.rootRefCnt;
- } else {
- rootRefCnt = new AtomicInteger(1);
- }
-
- this.allocator = allocator;
-
- if (DEBUG) {
- unwrappedPut();
- historicalLog.recordEvent(
- "DrillBuf(BufferAllocator[%d], BufferLedger, ByteBuf[identityHashCode == "
- + "%d](%s), DrillBuf[%d], index = %d, length = %d, flags = 0x%08x)"
- + " => rootRefCnt identityHashCode == %d",
- allocator.getId(), System.identityHashCode(replacement), replacement.toString(),
- buffer.id, index, length, flags, System.identityHashCode(rootRefCnt));
- }
- }
-
- @Deprecated
- public void setOperatorContext(OperatorContext c) {
- this.operatorContext = c;
- }
-
- @Deprecated
- public void setFragmentContext(FragmentContext c) {
- this.fragmentContext = c;
- }
-
- // TODO(DRILL-3331)
- public void setBufferManager(BufferManager bufManager) {
- Preconditions.checkState(this.bufManager == null,
- "the BufferManager for a buffer can only be set once");
- this.bufManager = bufManager;
- }
-
- public BufferAllocator getAllocator() {
- return allocator;
}
public DrillBuf reallocIfNeeded(final int size) {
@@ -397,11 +93,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
return this;
}
- if (operatorContext != null) {
- return operatorContext.replace(this, size);
- } else if(fragmentContext != null) {
- return fragmentContext.replace(this, size);
- } else if (bufManager != null) {
+ if (bufManager != null) {
return bufManager.replace(this, size);
} else {
throw new UnsupportedOperationException("Realloc is only available in the context of an operator's UDFs");
@@ -410,15 +102,11 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public int refCnt() {
- if (oldWorld) {
- if(rootBuffer){
- return (int) this.rootRefCnt.get();
- }else{
- return byteBuf.refCnt();
- }
+ if (isEmpty) {
+ return 1;
+ } else {
+ return refCnt.get();
}
-
- return rootRefCnt.get();
}
private long addr(int index) {
@@ -431,7 +119,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
}
if (index < 0 || index > capacity() - fieldLength) {
- if (DEBUG) {
+ if (BaseAllocator.DEBUG) {
historicalLog.logHistory(logger);
}
throw new IndexOutOfBoundsException(String.format(
@@ -449,70 +137,106 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
* @param end The exclusive endpoint of the bytes to be read.
*/
public void checkBytes(int start, int end) {
- if (BOUNDS_CHECKING_ENABLED) {
+ if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
checkIndexD(start, end - start);
}
}
private void chk(int index, int width) {
- if (BOUNDS_CHECKING_ENABLED) {
+ if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
checkIndexD(index, width);
}
}
private void ensure(int width) {
- if (BOUNDS_CHECKING_ENABLED) {
+ if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
ensureWritable(width);
}
}
/**
- * Used by allocators to transfer ownership from one allocator to another.
+ * Create a new DrillBuf that is associated with an alternative allocator for the purposes of memory ownership and
+ * accounting. This has no impact on the reference counting for the current DrillBuf except in the situation where the
+ * passed in Allocator is the same as the current buffer.
+ *
+ * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
+ * reference count of 1 (in the case that this is the first time this memory is being associated with the new
+ * allocator) or the current value of the reference count + 1 for the other AllocatorManager/BufferLedger combination
+ * in the case that the provided allocator already had an association to this underlying memory.
*
- * @param newLedger the new ledger the buffer should use going forward
- * @param newAllocator the new allocator
- * @return whether or not the buffer fits the receiving allocator's allocation limit
+ * @param allocator
+ * The target allocator to create an association with.
+ * @return A new DrillBuf which shares the same underlying memory as this DrillBuf.
*/
- public boolean transferTo(final BufferAllocator newAllocator, final BufferLedger newLedger) {
- final Pointer<BufferLedger> pNewLedger = new Pointer<>(newLedger);
- final boolean fitsAllocation = bufferLedger.transferTo(newAllocator, pNewLedger, this);
- allocator = newAllocator;
- bufferLedger = pNewLedger.value;
- return fitsAllocation;
+ public DrillBuf retain(BufferAllocator allocator) {
+
+ if (isEmpty) {
+ return this;
+ }
+
+ if (BaseAllocator.DEBUG) {
+ historicalLog.recordEvent("retain(%s)", allocator.getName());
+ }
+ BufferLedger otherLedger = this.ledger.getLedgerForAllocator(allocator);
+ return otherLedger.newDrillBuf(offset, length, null, true);
}
/**
- * DrillBuf's implementation of sharing buffer functionality, to be accessed from
- * {@link BufferAllocator#shareOwnership(DrillBuf, Pointer)}. See that function
- * for more information.
+ * Transfer the memory accounting ownership of this DrillBuf to another allocator. This will generate a new DrillBuf
+ * that carries an association with the underlying memory of this DrillBuf. If this DrillBuf is connected to the
+ * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the taret allocator. If
+ * this DrillBuf does not currently own the memory underlying it (and is only associated with it), this does not
+ * transfer any ownership to the newly created DrillBuf.
+ *
+ * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
+ * reference count of 1 (in the case that this is the first time this memory is being associated with the new
+ * allocator) or the current value of the reference count for the other AllocatorManager/BufferLedger combination in
+ * the case that the provided allocator already had an association to this underlying memory.
*
- * @param otherLedger the ledger belonging to the other allocator to share with
- * @param otherAllocator the other allocator to be shared with
- * @param index the starting index (for slicing capability)
- * @param length the length (for slicing capability)
- * @return the new DrillBuf (wrapper)
+ * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible
+ * due to the fact that the original owning allocator may have allocated this memory out of a local reservation
+ * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done
+ * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
+ * to an actual overlimit==true condition. This is simply conservative behavior which means we may return overlimit
+ * slightly sooner than is necessary.
+ *
+ * @param target
+ * The allocator to transfer ownership to.
+ * @return A new transfer result with the impact of the transfer (whether it was overlimit) as well as the newly
+ * created DrillBuf.
*/
- public DrillBuf shareWith(final BufferLedger otherLedger, final BufferAllocator otherAllocator,
- final int index, final int length) {
- return shareWith(otherLedger, otherAllocator, index, length, 0);
- }
+ public TransferResult transferOwnership(BufferAllocator target) {
- // TODO(cwestin) javadoc
- private DrillBuf shareWith(final BufferLedger otherLedger, final BufferAllocator otherAllocator,
- final int index, final int length, final int flags) {
- final Pointer<DrillBuf> pDrillBuf = new Pointer<>();
- bufferLedger = bufferLedger.shareWith(pDrillBuf, otherLedger, otherAllocator, this, index, length, flags);
- return pDrillBuf.value;
+ if (isEmpty) {
+ return new TransferResult(true, this);
+ }
+
+ final BufferLedger otherLedger = this.ledger.getLedgerForAllocator(target);
+ final DrillBuf newBuf = otherLedger.newDrillBuf(offset, length, null, true);
+ final boolean allocationFit = this.ledger.transferBalance(otherLedger);
+ return new TransferResult(allocationFit, newBuf);
}
- public boolean transferAccounting(Accountor target) {
- if (rootBuffer) {
- boolean outcome = acct.transferTo(target, this, length);
- acct = target;
- return outcome;
- } else {
- throw new UnsupportedOperationException();
+ /**
+ * The outcome of a Transfer.
+ */
+ public class TransferResult {
+
+ /**
+ * Whether this transfer fit within the target allocator's capacity.
+ */
+ public final boolean allocationFit;
+
+ /**
+ * The newly created buffer associated with the target allocator.
+ */
+ public final DrillBuf buffer;
+
+ private TransferResult(boolean allocationFit, DrillBuf buffer) {
+ this.allocationFit = allocationFit;
+ this.buffer = buffer;
}
+
}
@Override
@@ -525,40 +249,28 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
*/
@Override
public synchronized boolean release(int decrement) {
- Preconditions.checkArgument(decrement > 0,
- "release(%d) argument is not positive", decrement);
- if (DEBUG) {
- historicalLog.recordEvent("release(%d)", decrement);
+ if (isEmpty) {
+ return false;
}
- if (oldWorld) {
- if(rootBuffer){
- final long newRefCnt = this.rootRefCnt.addAndGet(-decrement);
- Preconditions.checkArgument(newRefCnt > -1, "Buffer has negative reference count.");
- if (newRefCnt == 0) {
- byteBuf.release(decrement);
- acct.release(this, length);
- return true;
- }else{
- return false;
- }
- }else{
- return byteBuf.release(decrement);
- }
+ if (decrement < 1) {
+ throw new IllegalStateException(String.format("release(%d) argument is not positive. Buffer Info: %s",
+ decrement, toVerboseString()));
}
- final int refCnt = rootRefCnt.addAndGet(-decrement);
- Preconditions.checkState(refCnt >= 0, "DrillBuf[%d] refCnt has gone negative", id);
- if (refCnt == 0) {
- bufferLedger.release(this);
+ final int refCnt = this.refCnt.addAndGet(-decrement);
- if (DEBUG) {
- unwrappedRemove(this);
- }
+ if (BaseAllocator.DEBUG) {
+ historicalLog.recordEvent("release(%d). original value: %d", decrement, refCnt + decrement);
+ }
- // release the underlying buffer
- byteBuf.release(1);
+ if (refCnt < 0) {
+ throw new IllegalStateException(
+ String.format("DrillBuf[%d] refCnt has gone negative. Buffer Info: %s", id, toVerboseString()));
+ }
+ if (refCnt == 0) {
+ ledger.release();
return true;
}
@@ -571,37 +283,16 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public synchronized ByteBuf capacity(int newCapacity) {
- if (oldWorld) {
- if (rootBuffer) {
- if (newCapacity == length) {
- return this;
- } else if (newCapacity < length) {
- byteBuf.capacity(newCapacity);
- int diff = length - byteBuf.capacity();
- acct.releasePartial(this, diff);
- this.length = length - diff;
- return this;
- } else {
- throw new UnsupportedOperationException("Accounting byte buf doesn't support increasing allocations.");
- }
- } else {
- throw new UnsupportedOperationException("Non root bufs doen't support changing allocations.");
- }
- }
-
- if ((flags & F_DERIVED) != 0) {
- throw new UnsupportedOperationException("Derived buffers don't support resizing.");
- }
+ public synchronized DrillBuf capacity(int newCapacity) {
if (newCapacity == length) {
return this;
}
+ Preconditions.checkArgument(newCapacity >= 0);
+
if (newCapacity < length) {
- byteBuf.capacity(newCapacity);
- final int diff = length - byteBuf.capacity();
- length -= diff;
+ length = newCapacity;
return this;
}
@@ -673,10 +364,9 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public DrillBuf slice(int index, int length) {
- if (oldWorld) {
- DrillBuf buf = new DrillBuf(this, index, length);
- buf.writerIndex = length;
- return buf;
+
+ if (isEmpty) {
+ return this;
}
/*
@@ -684,17 +374,13 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
* see http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, which explains
* that derived buffers share their reference count with their parent
*/
- final DrillBuf buf = shareWith(bufferLedger, allocator, index, length, F_DERIVED);
- buf.writerIndex(length);
- return buf;
+ final DrillBuf newBuf = ledger.newDrillBuf(offset + index, length);
+ newBuf.writerIndex(length);
+ return newBuf;
}
@Override
public DrillBuf duplicate() {
- if (oldWorld) {
- return new DrillBuf(this, 0, length);
- }
-
return slice(0, length);
}
@@ -766,8 +452,8 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public String toString(int index, int length, Charset charset) {
final String basics =
- String.format("{DrillBuf[%d], udle identityHashCode == %d, rootRefCnt identityHashCode == %d}",
- id, System.identityHashCode(byteBuf), System.identityHashCode(rootRefCnt));
+ String.format("{DrillBuf[%d], udle identityHashCode == %d, identityHashCode == %d}",
+ id, System.identityHashCode(byteBuf), System.identityHashCode(refCnt));
if (length == 0) {
return basics;
@@ -799,20 +485,16 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@Override
public ByteBuf retain(int increment) {
Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment);
- if (DEBUG) {
- historicalLog.recordEvent("retain(%d)", increment);
- }
- if (oldWorld) {
- if(rootBuffer){
- this.rootRefCnt.addAndGet(increment);
- }else{
- byteBuf.retain(increment);
- }
+ if (isEmpty) {
return this;
}
- rootRefCnt.addAndGet(increment);
+ if (BaseAllocator.DEBUG) {
+ historicalLog.recordEvent("retain(%d)", increment);
+ }
+
+ refCnt.addAndGet(increment);
return this;
}
@@ -1109,65 +791,42 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
return PlatformDependent.getByte(addr(index));
}
- public static DrillBuf getEmpty(BufferAllocator allocator, Accountor a) {
- return new DrillBuf(allocator, a);
- }
-
- public static DrillBuf getEmpty(final BufferLedger bufferLedger, final BufferAllocator bufferAllocator) {
- return new DrillBuf(bufferLedger, bufferAllocator);
+ @Override
+ public void close() {
+ release();
}
/**
- * Find out if this is a "root buffer." This is obsolete terminology
- * based on the original implementation of DrillBuf, which would layer
- * DrillBufs on top of other DrillBufs when slicing (or duplicating).
- * The buffer at the bottom of the layer was the "root buffer." However,
- * the current implementation flattens such references to always make
- * DrillBufs that are wrap a single buffer underneath, and slices and
- * their original source have a shared fate as per
- * http://netty.io/wiki/reference-counted-objects.html#wiki-h3-5, so
- * this concept isn't really meaningful anymore. But there are callers
- * that want to know a buffer's original size, and whether or not it
- * is "primal" in some sense. Perhaps this just needs a new name that
- * indicates that the buffer was an "original" and not a slice.
+ * Returns the possible memory consumed by this DrillBuf in the worse case scenario. (not shared, connected to larger
+ * underlying buffer of allocated memory)
*
- * @return whether or not the buffer is an original
+ * @return Size in bytes.
*/
- @Deprecated
- public boolean isRootBuffer() {
- if (oldWorld) {
- return rootBuffer;
- }
-
- return (flags & F_DERIVED) == 0;
- }
-
- @Override
- public void close() {
- release();
+ public int getPossibleMemoryConsumed() {
+ return ledger.getSize();
}
/**
- * Indicates whether this DrillBuf and the supplied one have a "shared fate."
- * Having a "shared fate" indicates that the two DrillBufs share a reference
- * count, and will both be released at the same time if either of them is
- * released.
- * @param otherBuf the other buffer to check against
- * @return true if the two buffers have a shared fate, false otherwise
+ * Return that is Accounted for by this buffer (and its potentially shared siblings within the context of the
+ * associated allocator).
+ *
+ * @return Size in bytes.
*/
- public boolean hasSharedFate(final DrillBuf otherBuf) {
- return rootRefCnt == otherBuf.rootRefCnt;
+ public int getActualMemoryConsumed() {
+ return ledger.getAccountedSize();
}
private final static int LOG_BYTES_PER_ROW = 10;
+
/**
- * Log this buffer's byte contents in the form of a hex dump.
- *
- * @param logger where to log to
- * @param start the starting byte index
- * @param length how many bytes to log
+ * Return the buffer's byte contents in the form of a hex dump.
+ * @param start
+ * the starting byte index
+ * @param length
+ * how many bytes to log
+ * @return A hex dump in a String.
*/
- public void logBytes(final Logger logger, final int start, final int length) {
+ public String toHexString(final int start, final int length) {
final int roundedStart = (start / LOG_BYTES_PER_ROW) * LOG_BYTES_PER_ROW;
final StringBuilder sb = new StringBuilder("buffer byte dump\n");
@@ -1184,7 +843,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
}
sb.append('\n');
}
- logger.trace(sb.toString());
+ return sb.toString();
}
/**
@@ -1192,27 +851,28 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
*
* @return integer id
*/
- public int getId() {
+ public long getId() {
return id;
}
- /**
- * Log this buffer's history.
- *
- * @param logger the logger to use
- */
- public void logHistory(final Logger logger) {
- if (historicalLog == null) {
- logger.warn("DrillBuf[{}] historicalLog not available", id);
- } else {
- historicalLog.logHistory(logger);
+
+ public String toVerboseString() {
+ if (isEmpty) {
+ return toString();
}
+
+ StringBuilder sb = new StringBuilder();
+ ledger.print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
+ return sb.toString();
}
- public void logHistoryForUdle(final Logger logger, final UnsafeDirectLittleEndian udle) {
- final Collection<DrillBuf> drillBufs = unwrappedGet(udle);
- for(final DrillBuf drillBuf : drillBufs) {
- drillBuf.logHistory(logger);
+ public void print(StringBuilder sb, int indent, Verbosity verbosity) {
+ BaseAllocator.indent(sb, indent).append(toString());
+
+ if (BaseAllocator.DEBUG && !isEmpty && verbosity.includeHistoricalLog) {
+ sb.append("\n");
+ historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/main/java/io/netty/buffer/ExpandableByteBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/ExpandableByteBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/ExpandableByteBuf.java
new file mode 100644
index 0000000..7788552
--- /dev/null
+++ b/exec/memory/base/src/main/java/io/netty/buffer/ExpandableByteBuf.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+
+/**
+ * Allows us to decorate DrillBuf to make it expandable so that we can use them in the context of the Netty framework
+ * (thus supporting RPC level memory accounting).
+ */
+public class ExpandableByteBuf extends MutableWrappedByteBuf {
+
+ private final BufferAllocator allocator;
+
+ public ExpandableByteBuf(ByteBuf buffer, BufferAllocator allocator) {
+ super(buffer);
+ this.allocator = allocator;
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ return new ExpandableByteBuf(buffer.copy(index, length), allocator);
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ if (newCapacity > capacity()) {
+ ByteBuf newBuf = allocator.buffer(newCapacity);
+ newBuf.writeBytes(buffer, 0, buffer.capacity());
+ newBuf.readerIndex(buffer.readerIndex());
+ newBuf.writerIndex(buffer.writerIndex());
+ buffer.release();
+ buffer = newBuf;
+ return newBuf;
+ } else {
+ return super.capacity(newCapacity);
+ }
+ }
+
+}