You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:26 UTC
[03/13] drill git commit: DRILL-4134: Add new allocator
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
new file mode 100644
index 0000000..bb8f0ee
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -0,0 +1,1654 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import io.netty.buffer.UnsafeDirectLittleEndian;
+
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.common.DrillAutoCloseables;
+import org.apache.drill.common.HistoricalLog;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.drill.exec.util.Pointer;
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+// TODO(cwestin) javadoc
+// TODO(cwestin) add allocator implementation options tried
+public abstract class BaseAllocator implements BufferAllocator {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
+ private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(BaseAllocator.class);
+
+ private static final AtomicInteger idGenerator = new AtomicInteger(0);
+ private static final Object ALLOCATOR_LOCK = new Object();
+
+ public static final String CHILD_BUFFER_INJECTION_SITE = "child.buffer";
+
+ static final boolean DEBUG = AssertionUtil.isAssertionsEnabled()
+ || Boolean.getBoolean(ExecConstants.DEBUG_ALLOCATOR);
+ private static final PooledByteBufAllocatorL INNER_ALLOCATOR = PooledByteBufAllocatorL.DEFAULT;
+
+ private long allocated; // the amount of memory this allocator has given out to its clients (including children)
+ private long owned; // the amount of memory this allocator has obtained from its parent
+ private long peakAllocated; // the most memory this allocator has given out during its lifetime
+ private long bufferAllocation; // the amount of memory used just for directly allocated buffers, not children
+
+ private boolean isClosed = false; // the allocator has been closed
+
+ private final long maxAllocation; // the maximum amount of memory this allocator will give out
+ private final long chunkSize; // size of secondary chunks to allocate
+ private final AllocationPolicyAgent policyAgent;
+ private final BaseAllocator parentAllocator;
+ private final AllocatorOwner allocatorOwner;
+ protected final int id = idGenerator.incrementAndGet(); // unique ID assigned to each allocator
+ private final DrillBuf empty;
+ private final AllocationPolicy allocationPolicy;
+ private final InnerBufferLedger bufferLedger = new InnerBufferLedger();
+
+ // members used purely for debugging
+ private int getsFromParent;
+ private int putsToParent;
+ private final IdentityHashMap<UnsafeDirectLittleEndian, BufferLedger> allocatedBuffers;
+ private final IdentityHashMap<BaseAllocator, Object> childAllocators;
+ private final IdentityHashMap<Reservation, Object> reservations;
+ private long preallocSpace;
+
+ private final HistoricalLog historicalLog;
+
+ private static BaseAllocator getBaseAllocator(final BufferAllocator bufferAllocator) {
+ if (!(bufferAllocator instanceof BaseAllocator)) {
+ throw new IllegalArgumentException("expected a BaseAllocator instance, but got a "
+ + bufferAllocator.getClass().getName());
+ }
+ return (BaseAllocator) bufferAllocator;
+ }
+
+ // TODO move allocation policy and agent to outside of allocator
+ private static class PerFragmentAllocationPolicy implements AllocationPolicy {
+ static class Globals {
+ private long maxBufferAllocation = 0;
+ private final AtomicInteger limitingRoots = new AtomicInteger(0);
+ }
+
+ private final Globals globals = new Globals();
+
+ @Override
+ public AllocationPolicyAgent newAgent() {
+ return new PerFragmentAllocationPolicyAgent(globals);
+ }
+ }
+
+ /**
+ * AllocationPolicy that allows each fragment running on a drillbit to share an
+ * equal amount of direct memory, regardless of whether or not those fragments
+ * belong to the same query.
+ */
+ public static final AllocationPolicy POLICY_PER_FRAGMENT = new PerFragmentAllocationPolicy();
+
+ /**
+ * String name of {@link #POLICY_PER_FRAGMENT} policy.
+ */
+ public static final String POLICY_PER_FRAGMENT_NAME = "per-fragment";
+
+ private static class PerFragmentAllocationPolicyAgent implements AllocationPolicyAgent {
+ private final PerFragmentAllocationPolicy.Globals globals;
+ private boolean limitingRoot; // this is a limiting root; see F_LIMITING_ROOT
+
+ PerFragmentAllocationPolicyAgent(PerFragmentAllocationPolicy.Globals globals) {
+ this.globals = globals;
+ }
+
+ @Override
+ public void close() {
+ if (limitingRoot) {
+ // now there's one fewer active root
+ final int rootCount = globals.limitingRoots.decrementAndGet();
+
+ synchronized(globals) {
+ /*
+ * If the rootCount went to zero, we don't need to worry about setting the
+ * maxBufferAllocation, because there aren't any allocators to reference it;
+ * the next allocator to get created will set it appropriately.
+ */
+ if (rootCount != 0) {
+ globals.maxBufferAllocation = RootAllocator.getMaxDirect() / rootCount;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void checkNewAllocator(BufferAllocator parentAllocator,
+ long initReservation, long maxAllocation, int flags) {
+/*
+ Preconditions.checkArgument(parentAllocator != null, "parent allocator can't be null");
+ Preconditions.checkArgument(parentAllocator instanceof BaseAllocator, "Parent allocator must be a BaseAllocator");
+*/
+
+// final BaseAllocator baseAllocator = (BaseAllocator) parentAllocator;
+
+ // this is synchronized to protect maxBufferAllocation
+ synchronized(POLICY_PER_FRAGMENT) {
+ // initialize maxBufferAllocation the very first time we call this
+ if (globals.maxBufferAllocation == 0) {
+ globals.maxBufferAllocation = RootAllocator.getMaxDirect();
+ }
+
+ if (limitingRoot = ((flags & F_LIMITING_ROOT) != 0)) {
+ // figure out the new current per-allocator limit
+ globals.maxBufferAllocation = RootAllocator.getMaxDirect() / (globals.limitingRoots.get() + 1);
+ }
+
+ if (initReservation > 0) {
+ if (initReservation > globals.maxBufferAllocation) {
+ throw new OutOfMemoryRuntimeException(
+ String.format("can't fulfill initReservation request at this time "
+ + "(initReservation = %d > maxBufferAllocation = %d)",
+ initReservation, globals.maxBufferAllocation));
+ }
+ }
+ }
+ }
+
+ @Override
+ public long getMemoryLimit(BufferAllocator bufferAllocator) {
+ synchronized(POLICY_PER_FRAGMENT) {
+ return globals.maxBufferAllocation;
+ }
+ }
+
+ @Override
+ public void initializeAllocator(final BufferAllocator bufferAllocator) {
+ final BaseAllocator baseAllocator = getBaseAllocator(bufferAllocator);
+
+ if (limitingRoot) {
+ globals.limitingRoots.incrementAndGet();
+ }
+ }
+
+ @Override
+ public boolean shouldReleaseToParent(final BufferAllocator bufferAllocator) {
+ final BaseAllocator baseAllocator = getBaseAllocator(bufferAllocator);
+ return (baseAllocator.owned + baseAllocator.chunkSize > globals.maxBufferAllocation);
+ }
+ }
+
+ private static class LocalMaxAllocationPolicy implements AllocationPolicy {
+ // this agent is stateless, so we can always use the same one
+ private static final AllocationPolicyAgent AGENT = new LocalMaxAllocationPolicyAgent();
+
+ @Override
+ public AllocationPolicyAgent newAgent() {
+ return AGENT;
+ }
+ }
+
+ /**
+ * AllocationPolicy that imposes no limits on how much direct memory fragments
+ * may allocate. LOCAL_MAX refers to the only limit that is enforced, which is
+ * the maxAllocation specified at allocators' creation.
+ *
+ * <p>This policy ignores the value of {@link BufferAllocator#F_LIMITING_ROOT}.</p>
+ */
+ public static final AllocationPolicy POLICY_LOCAL_MAX = new LocalMaxAllocationPolicy();
+
+ /**
+ * String name of {@link #POLICY_LOCAL_MAX} allocation policy.
+ */
+ public static final String POLICY_LOCAL_MAX_NAME = "local-max";
+
+ private static class LocalMaxAllocationPolicyAgent implements AllocationPolicyAgent {
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public void checkNewAllocator(BufferAllocator parentAllocator,
+ long initReservation, long maxAllocation, int flags) {
+ }
+
+ @Override
+ public long getMemoryLimit(BufferAllocator bufferAllocator) {
+ final BaseAllocator baseAllocator = (BaseAllocator) bufferAllocator;
+ return baseAllocator.maxAllocation;
+ }
+
+ @Override
+ public void initializeAllocator(BufferAllocator bufferAllocator) {
+ }
+
+ @Override
+ public boolean shouldReleaseToParent(BufferAllocator bufferAllocator) {
+ // since there are no shared limits, release space whenever we can
+ return true;
+ }
+ }
+
+ // TODO(DRILL-2698) POLICY_PER_QUERY
+
+ protected BaseAllocator(
+ final BaseAllocator parentAllocator,
+ final AllocatorOwner allocatorOwner,
+ final AllocationPolicy allocationPolicy,
+ final long initReservation,
+ final long maxAllocation,
+ final int flags) throws OutOfMemoryRuntimeException {
+ Preconditions.checkArgument(allocatorOwner != null, "allocatorOwner must be non-null");
+ Preconditions.checkArgument(initReservation >= 0,
+ "the initial reservation size must be non-negative");
+ Preconditions.checkArgument(maxAllocation >= 0,
+ "the maximum allocation limit mjst be non-negative");
+ Preconditions.checkArgument(initReservation <= maxAllocation,
+ "the initial reservation size must be <= the maximum allocation");
+
+ if (initReservation > 0) {
+ if (parentAllocator == null) {
+ throw new IllegalStateException(
+ "can't reserve memory without a parent allocator");
+ }
+ }
+
+ // check to see if we can create this new allocator (the check throws if it's not ok)
+ final AllocationPolicyAgent policyAgent = allocationPolicy.newAgent();
+ policyAgent.checkNewAllocator(parentAllocator, initReservation, maxAllocation, flags);
+
+ if ((initReservation > 0) && !parentAllocator.reserve(this, initReservation, 0)) {
+ throw new OutOfMemoryRuntimeException(
+ "can't fulfill initial reservation of size (unavailable from parent)" + initReservation);
+ }
+
+ /*
+ * Figure out how much more to ask for from our parent if we're out.
+ * Secondary allocations from the parent will be done in integral multiples of
+ * chunkSize. We also use this to determine when to hand back memory to our
+ * parent in order to create hysteresis. This reduces contention on the parent's
+ * lock
+ */
+ if (initReservation == 0) {
+ chunkSize = maxAllocation / 8;
+ } else {
+ chunkSize = initReservation;
+ }
+
+ this.parentAllocator = parentAllocator;
+ this.allocatorOwner = allocatorOwner;
+ this.allocationPolicy = allocationPolicy;
+ this.policyAgent = policyAgent;
+ this.maxAllocation = maxAllocation;
+
+ // the root allocator owns all of its memory; anything else just owns it's initial reservation
+ owned = parentAllocator == null ? maxAllocation : initReservation;
+ empty = DrillBuf.getEmpty(new EmptyLedger(), this);
+
+ if (DEBUG) {
+ allocatedBuffers = new IdentityHashMap<>();
+ childAllocators = new IdentityHashMap<>();
+ reservations = new IdentityHashMap<>();
+ historicalLog = new HistoricalLog(4, "allocator[%d]", id);
+
+ historicalLog.recordEvent("created by \"%s\", owned = %d", allocatorOwner.toString(), owned);
+ } else {
+ allocatedBuffers = null;
+ childAllocators = null;
+ reservations = null;
+ historicalLog = null;
+ }
+
+ // now that we're not in danger of throwing an exception, we can take this step
+ policyAgent.initializeAllocator(this);
+ }
+
+ /**
+ * Allocators without a parent must provide an implementation of this so
+ * that they may reserve additional space even though they don't have a
+ * parent they can fall back to.
+ *
+ * <p>Prior to calling this, BaseAllocator has verified that this won't violate
+ * the maxAllocation for this allocator.</p>
+ *
+ * @param nBytes the amount of space to reserve
+ * @param ignoreMax ignore the maximum allocation limit;
+ * see {@link ChildLedger#reserve(long, boolean)}.
+ * @return true if the request can be met, false otherwise
+ */
+ protected boolean canIncreaseOwned(final long nBytes, final int flags) {
+ if (parentAllocator == null) {
+ return false;
+ }
+
+ return parentAllocator.reserve(this, nBytes, flags);
+ }
+
+ /**
+ * Reserve space for the child allocator from this allocator.
+ *
+ * @param childAllocator child allocator making the request, or null
+ * if this is not for a child
+ * @param nBytes how much to reserve
+ * @param flags one or more of RESERVE_F_* flags or'ed together
+ * @return true if the reservation can be satisfied, false otherwise
+ */
+ private static final int RESERVE_F_IGNORE_MAX = 0x0001;
+ private boolean reserve(final BaseAllocator childAllocator,
+ final long nBytes, final int flags) {
+ Preconditions.checkArgument(nBytes >= 0,
+ "the number of bytes to reserve must be non-negative");
+
+ // we can always fulfill an empty request
+ if (nBytes == 0) {
+ return true;
+ }
+
+ final boolean ignoreMax = (flags & RESERVE_F_IGNORE_MAX) != 0;
+
+ synchronized(ALLOCATOR_LOCK) {
+ if (isClosed) {
+ throw new AllocatorClosedException(String.format("Attempt to use closed allocator[%d]", id));
+ }
+
+ final long ownAtLeast = allocated + nBytes;
+ // Are we allowed to hand out this much?
+ if (!ignoreMax && (ownAtLeast > maxAllocation)) {
+ return false;
+ }
+
+ // do we need more from our parent first?
+ if (ownAtLeast > owned) {
+ /*
+ * Allocate space in integral multiples of chunkSize, as long as it doesn't exceed
+ * the maxAllocation.
+ */
+ final long needAdditional = ownAtLeast - owned;
+ final long getInChunks = (1 + ((needAdditional - 1) / chunkSize)) * chunkSize;
+ final long getFromParent;
+ if (getInChunks + owned <= maxAllocation) {
+ getFromParent = getInChunks;
+ } else {
+ getFromParent = needAdditional;
+ }
+ if (!canIncreaseOwned(getFromParent, flags)) {
+ return false;
+ }
+ owned += getFromParent;
+
+ if (DEBUG) {
+ ++getsFromParent;
+ historicalLog.recordEvent("increased owned by %d, now owned == %d", needAdditional, owned);
+ }
+ }
+
+ if (DEBUG) {
+ if (owned < ownAtLeast) {
+ throw new IllegalStateException("don't own enough memory to satisfy request");
+ }
+ if (allocated > owned) {
+ throw new IllegalStateException(
+ String.format("more memory allocated (%d) than owned (%d)", allocated, owned));
+ }
+
+ historicalLog.recordEvent("allocator[%d] allocated increased by nBytes == %d to %d",
+ id, nBytes, allocated + nBytes);
+ }
+
+ allocated += nBytes;
+
+ if (allocated > peakAllocated) {
+ peakAllocated = allocated;
+ }
+
+ return true;
+ }
+ }
+
+ private void releaseBytes(final long nBytes) {
+ Preconditions.checkArgument(nBytes >= 0,
+ "the number of bytes being released must be non-negative");
+
+ synchronized(ALLOCATOR_LOCK) {
+ allocated -= nBytes;
+
+ if (DEBUG) {
+ historicalLog.recordEvent("allocator[%d] released nBytes == %d, allocated now %d",
+ id, nBytes, allocated);
+ }
+
+ /*
+ * Return space to our parent if our allocation is over the currently allowed amount.
+ */
+ final boolean releaseToParent = (parentAllocator != null)
+ && (owned > maxAllocation) && policyAgent.shouldReleaseToParent(this);
+ if (releaseToParent) {
+ final long canFree = owned - maxAllocation;
+ parentAllocator.releaseBytes(canFree);
+ owned -= canFree;
+
+ if (DEBUG) {
+ ++putsToParent;
+ historicalLog.recordEvent("returned %d to parent, now owned == %d", canFree, owned);
+ }
+ }
+ }
+ }
+
+ private void releaseBuffer(final DrillBuf drillBuf) {
+ Preconditions.checkArgument(drillBuf != null,
+ "the DrillBuf being released can't be null");
+
+ final ByteBuf byteBuf = drillBuf.unwrap();
+ final int udleMaxCapacity = byteBuf.maxCapacity();
+
+ synchronized(ALLOCATOR_LOCK) {
+ bufferAllocation -= udleMaxCapacity;
+ releaseBytes(udleMaxCapacity);
+
+ if (DEBUG) {
+ // make sure the buffer came from this allocator
+ final Object object = allocatedBuffers.remove(byteBuf);
+ if (object == null) {
+ historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException("Released buffer did not belong to this allocator");
+ }
+ }
+ }
+ }
+
+ private void childClosed(final BaseAllocator childAllocator) {
+ Preconditions.checkArgument(childAllocator != null, "child allocator can't be null");
+
+ if (DEBUG) {
+ synchronized(ALLOCATOR_LOCK) {
+ final Object object = childAllocators.remove(childAllocator);
+ if (object == null) {
+ childAllocator.historicalLog.logHistory(logger);
+ throw new IllegalStateException("Child allocator[" + childAllocator.id
+ + "] not found in parent allocator[" + id + "]'s childAllocators");
+ }
+
+ try {
+ verifyAllocator();
+ } catch(Exception e) {
+ /*
+ * If there was a problem with verification, the history of the closed
+ * child may also be useful.
+ */
+ logger.debug("allocator[" + id + "]: exception while closing the following child");
+ childAllocator.historicalLog.logHistory(logger);
+
+ // Continue with the verification exception throwing.
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * TODO(DRILL-2740) We use this to bypass the regular accounting for the
+ * empty DrillBuf, because it is treated specially at this time. Once that
+ * is remedied, this should be able to go away.
+ */
+ private class EmptyLedger implements BufferLedger {
+ @Override
+ public PooledByteBufAllocatorL getUnderlyingAllocator() {
+ return INNER_ALLOCATOR;
+ }
+
+ @Override
+ public void release(final DrillBuf drillBuf) {
+ if (DEBUG) {
+ if (drillBuf != empty) {
+ throw new IllegalStateException("The empty buffer's ledger is being used to release something else");
+ }
+ }
+ }
+
+ @Override
+ public BufferLedger shareWith(Pointer<DrillBuf> pDrillBuf,
+ BufferLedger otherLedger, BufferAllocator otherAllocator, DrillBuf drillBuf,
+ int index, int length, int drillBufFlags) {
+ // As a special case, we allow sharing with the same allocator so that slicing works.
+ if (otherAllocator != BaseAllocator.this) {
+ throw new UnsupportedOperationException("The empty buffer can't be shared");
+ }
+
+ pDrillBuf.value = drillBuf;
+ return otherLedger;
+ }
+
+ @Override
+ public boolean transferTo(BufferAllocator newAlloc,
+ Pointer<BufferLedger> pNewLedger, DrillBuf drillBuf) {
+ throw new UnsupportedOperationException("The empty buffer's ownership can't be changed");
+ }
+ }
+
+ private class InnerBufferLedger implements BufferLedger {
+ @Override
+ public PooledByteBufAllocatorL getUnderlyingAllocator() {
+ return INNER_ALLOCATOR;
+ }
+
+ @Override
+ public void release(final DrillBuf drillBuf) {
+ releaseBuffer(drillBuf);
+ }
+
+ @Override
+ public BufferLedger shareWith(final Pointer<DrillBuf> pDrillBuf,
+ final BufferLedger otherLedger, final BufferAllocator otherAllocator,
+ final DrillBuf drillBuf, final int index, final int length, final int drillBufFlags) {
+ final BaseAllocator baseAllocator = (BaseAllocator) otherAllocator;
+ synchronized(ALLOCATOR_LOCK) {
+ if (baseAllocator.isClosed) {
+ throw new AllocatorClosedException(
+ String.format("Attempt to use closed allocator[%d]", baseAllocator.id));
+ }
+
+ /*
+ * If this is called, then the buffer isn't yet shared, and should
+ * become so.
+ */
+ final SharedBufferLedger sharedLedger = new SharedBufferLedger(drillBuf, BaseAllocator.this);
+
+ // Create the new wrapping DrillBuf.
+ final DrillBuf newBuf =
+ new DrillBuf(sharedLedger, otherAllocator, drillBuf, index, length, drillBufFlags);
+ sharedLedger.addMapping(newBuf, baseAllocator);
+
+ if (DEBUG) {
+ final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) drillBuf.unwrap();
+ historicalLog.recordEvent("InnerBufferLedger(allocator[%d]).shareWith(..., "
+ + "otherAllocator[%d], drillBuf[%d]{UnsafeDirectLittleEndian[identityHashCode == %d]}, ...)",
+ BaseAllocator.this.id, baseAllocator.id, drillBuf.getId(),
+ System.identityHashCode(udle));
+
+ final BaseAllocator drillBufAllocator = (BaseAllocator) drillBuf.getAllocator();
+ if (BaseAllocator.this != drillBufAllocator) {
+ historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException(String.format(
+ "DrillBuf's allocator([%d]) doesn't match this(allocator[%d])",
+ drillBufAllocator.id, BaseAllocator.this.id));
+ }
+
+ // Replace the ledger for the existing buffer.
+ final BufferLedger thisLedger = allocatedBuffers.put(udle, sharedLedger);
+
+ // If we throw any of these exceptions, we need to clean up newBuf.
+ if (thisLedger == null) {
+ newBuf.release();
+ historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException("Buffer to be shared is unknown to the source allocator");
+ }
+ if (thisLedger != this) {
+ newBuf.release();
+ historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException("Buffer's ledger was not the one it should be");
+ }
+ }
+
+ pDrillBuf.value = newBuf;
+ return sharedLedger;
+ }
+ }
+
+ @Override
+ public boolean transferTo(final BufferAllocator newAlloc,
+ final Pointer<BufferLedger> pNewLedger, final DrillBuf drillBuf) {
+ Preconditions.checkArgument(newAlloc != null, "New allocator cannot be null");
+ Preconditions.checkArgument(newAlloc != BaseAllocator.this,
+ "New allocator is same as current");
+ Preconditions.checkArgument(newAlloc instanceof BaseAllocator,
+ "New allocator isn't a BaseAllocator");
+ Preconditions.checkArgument(pNewLedger.value != null, "Candidate new ledger can't be null");
+ Preconditions.checkArgument(drillBuf != null, "DrillBuf can't be null");
+
+ final BaseAllocator newAllocator = (BaseAllocator) newAlloc;
+ synchronized(ALLOCATOR_LOCK) {
+ if (newAllocator.isClosed) {
+ throw new AllocatorClosedException(
+ String.format("Attempt to use closed allocator[%d]", newAllocator.id));
+ }
+
+ return BaseAllocator.transferTo(newAllocator, pNewLedger.value, drillBuf);
+ }
+ }
+ }
+
+ /**
+ * Transfer ownership of a buffer from one allocator to another.
+ *
+ * <p>Assumes the allocatorLock is held.</p>
+ *
+ * @param newAllocator the new allocator
+ * @param newLedger the new ledger to use (which could be shared)
+ * @param drillBuf the buffer
+ * @return true if the buffer's transfer didn't exceed the new owner's maximum
+ * allocation limit
+ */
+ private static boolean transferTo(final BaseAllocator newAllocator,
+ final BufferLedger newLedger, final DrillBuf drillBuf) {
+ final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) drillBuf.unwrap();
+ final int udleMaxCapacity = udle.maxCapacity();
+
+ synchronized(ALLOCATOR_LOCK) {
+ // Account for the space and track the buffer.
+ newAllocator.reserveForBuf(udleMaxCapacity);
+
+ if (DEBUG) {
+ final Object object = newAllocator.allocatedBuffers.put(udle, newLedger);
+ if (object != null) {
+ newAllocator.historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException("Buffer unexpectedly found in new allocator");
+ }
+ }
+
+ // Remove from the old allocator.
+ final BaseAllocator oldAllocator = (BaseAllocator) drillBuf.getAllocator();
+ oldAllocator.releaseBuffer(drillBuf);
+
+ if (DEBUG) {
+ final Object object = oldAllocator.allocatedBuffers.get(udle);
+ if (object != null) {
+ oldAllocator.historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException("Buffer was not removed from old allocator");
+ }
+
+ oldAllocator.historicalLog.recordEvent("BaseAllocator.transferTo(otherAllocator[%d], ..., "
+ + "drillBuf[%d]{UnsafeDirectLittleEndian[identityHashCode == %d]}) oldAllocator[%d]",
+ newAllocator.id, drillBuf.getId(), System.identityHashCode(drillBuf.unwrap()),
+ oldAllocator.id);
+ newAllocator.historicalLog.recordEvent("BaseAllocator.transferTo(otherAllocator[%d], ..., "
+ + "drillBuf[%d]{UnsafeDirectLittleEndian[identityHashCode == %d]}) oldAllocator[%d]",
+ newAllocator.id, drillBuf.getId(), System.identityHashCode(drillBuf.unwrap()),
+ oldAllocator.id);
+ }
+
+ return newAllocator.allocated < newAllocator.maxAllocation;
+ }
+ }
+
+ private static class SharedBufferLedger implements BufferLedger {
+ private volatile BaseAllocator owningAllocator;
+ private final IdentityHashMap<DrillBuf, BaseAllocator> bufferMap = new IdentityHashMap<>();
+
+ private final HistoricalLog historicalLog;
+
+ public SharedBufferLedger(final DrillBuf drillBuf, final BaseAllocator baseAllocator) {
+ if (DEBUG) {
+ historicalLog = new HistoricalLog(4,
+ "SharedBufferLedger for DrillBuf[%d]{UnsafeDirectLittleEndian[identityHashCode == %d]}",
+ drillBuf.getId(), System.identityHashCode(drillBuf.unwrap()));
+ } else {
+ historicalLog = null;
+ }
+ addMapping(drillBuf, baseAllocator);
+ owningAllocator = baseAllocator;
+
+ if (DEBUG) {
+ checkBufferMap();
+ }
+ }
+
+ private synchronized void addMapping(final DrillBuf drillBuf, final BaseAllocator baseAllocator) {
+ bufferMap.put(drillBuf, baseAllocator);
+
+ if (DEBUG) {
+ historicalLog.recordEvent("addMapping(DrillBuf[%d], allocator[%d])", drillBuf.getId(), baseAllocator.id);
+ }
+ }
+
+ private synchronized void logBufferHistories(final Logger logger) {
+ final Set<Map.Entry<DrillBuf, BaseAllocator>> bufsToCheck = bufferMap.entrySet();
+ for(final Map.Entry<DrillBuf, BaseAllocator> mapEntry : bufsToCheck) {
+ final DrillBuf drillBuf = mapEntry.getKey();
+ drillBuf.logHistory(logger);
+ }
+ }
+
+ private synchronized void checkBufferMap() {
+ boolean foundOwner = false;
+ final Set<Map.Entry<DrillBuf, BaseAllocator>> bufsToCheck = bufferMap.entrySet();
+ for(final Map.Entry<DrillBuf, BaseAllocator> mapEntry : bufsToCheck) {
+ final DrillBuf drillBuf = mapEntry.getKey();
+ final BaseAllocator bufferAllocator = mapEntry.getValue();
+
+ final Object object = bufferAllocator.allocatedBuffers.get(drillBuf.unwrap());
+ if (bufferAllocator == owningAllocator) {
+ foundOwner = true;
+ if (object == null) {
+ historicalLog.logHistory(logger);
+ logBufferHistories(logger);
+ throw new IllegalStateException(
+ String.format("Shared buffer DrillBuf[%d] not found in owning allocator[%d]",
+ drillBuf.getId(), bufferAllocator.id));
+ }
+ } else {
+ if (object != null) {
+ historicalLog.logHistory(logger);
+ logBufferHistories(logger);
+ throw new IllegalStateException(
+ String.format("Shared buffer DrillBuf[%d] not found in non-owning allocator[%d]",
+ drillBuf.getId(), bufferAllocator.id));
+
+ }
+ }
+ }
+
+ if (!foundOwner && !bufferMap.isEmpty()) {
+ historicalLog.logHistory(logger);
+ logBufferHistories(logger);
+ owningAllocator.historicalLog.logHistory(logger);
+ throw new IllegalStateException(
+ String.format("Did not find owning allocator[%d] in bufferMap", owningAllocator.id));
+ }
+ }
+
+ @Override
+ public PooledByteBufAllocatorL getUnderlyingAllocator() {
+ return INNER_ALLOCATOR;
+ }
+
+ @Override
+ public void release(final DrillBuf drillBuf) {
+ Preconditions.checkArgument(drillBuf != null, "drillBuf can't be null");
+
+ /*
+ * This is the only method on the shared ledger that can be entered without
+ * having first come through an outside method on BaseAllocator (such
+ * as takeOwnership() or shareOwnership()), all of which get the allocatorLock.
+ * Operations in the below require the allocatorLock. We also need to synchronize
+ * on this object to protect the bufferMap. In order to avoid a deadlock with other
+ * methods, we have to get the allocatorLock first, as will be done in all the
+ * other cases.
+ */
+ synchronized(ALLOCATOR_LOCK) {
+ synchronized(this) {
+ final Object bufferObject = bufferMap.remove(drillBuf);
+ if (DEBUG) {
+ if (bufferObject == null) {
+ historicalLog.logHistory(logger, String.format("release(DrillBuf[%d])", drillBuf.getId()));
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException("Buffer not found in SharedBufferLedger's buffer map");
+ }
+ }
+
+ /*
+ * If there are other buffers in the bufferMap that share this buffer's fate,
+ * remove them, since they are also now invalid. As we examine buffers, take note
+ * of any others that don't share this one's fate, but which belong to the same
+ * allocator; if we find any such, then we can avoid transferring ownership at this
+ * time.
+ */
+ final BaseAllocator bufferAllocator = (BaseAllocator) drillBuf.getAllocator();
+ final List<DrillBuf> sameAllocatorSurvivors = new LinkedList<>();
+ if (!bufferMap.isEmpty()) {
+ /*
+ * We're going to be modifying bufferMap (if we find any other related buffers);
+ * in order to avoid getting a ConcurrentModificationException, we can't do it
+ * on the same iteration we use to examine the buffers, so we use an intermediate
+ * list to figure out which ones we have to remove.
+ */
+ final Set<Map.Entry<DrillBuf, BaseAllocator>> bufsToCheck = bufferMap.entrySet();
+ final List<DrillBuf> sharedFateBuffers = new LinkedList<>();
+ for(final Map.Entry<DrillBuf, BaseAllocator> mapEntry : bufsToCheck) {
+ final DrillBuf otherBuf = mapEntry.getKey();
+ if (otherBuf.hasSharedFate(drillBuf)) {
+ sharedFateBuffers.add(otherBuf);
+ } else {
+ final BaseAllocator otherAllocator = mapEntry.getValue();
+ if (otherAllocator == bufferAllocator) {
+ sameAllocatorSurvivors.add(otherBuf);
+ }
+ }
+ }
+
+ final int nSharedFate = sharedFateBuffers.size();
+ if (nSharedFate > 0) {
+ final int[] sharedIds = new int[nSharedFate];
+ int index = 0;
+ for(final DrillBuf bufToRemove : sharedFateBuffers) {
+ sharedIds[index++] = bufToRemove.getId();
+ bufferMap.remove(bufToRemove);
+ }
+
+ if (DEBUG) {
+ final StringBuilder sb = new StringBuilder();
+ for(final DrillBuf bufToRemove : sharedFateBuffers) {
+ sb.append(String.format("DrillBuf[%d], ", bufToRemove.getId()));
+ }
+ sb.setLength(sb.length() - 2); // Chop off the trailing comma and space.
+ historicalLog.recordEvent("removed shared fate buffers " + sb.toString());
+ }
+ }
+ }
+
+ if (sameAllocatorSurvivors.isEmpty()) {
+ /*
+ * If that was the owning allocator, then we need to transfer ownership to
+ * another allocator (any one) that is part of the sharing set.
+ *
+ * When we release the buffer back to the allocator, release the root buffer,
+ */
+ if (bufferAllocator == owningAllocator) {
+ if (bufferMap.isEmpty()) {
+ /*
+ * There are no other allocators available to transfer to, so
+ * release the space to the owner.
+ */
+ bufferAllocator.releaseBuffer(drillBuf);
+ } else {
+ // Pick another allocator, and transfer ownership to that.
+ final Collection<BaseAllocator> allocators = bufferMap.values();
+ final Iterator<BaseAllocator> allocatorIter = allocators.iterator();
+ if (!allocatorIter.hasNext()) {
+ historicalLog.logHistory(logger);
+ throw new IllegalStateException("Shared ledger buffer map is non-empty, but not iterable");
+ }
+ final BaseAllocator nextAllocator = allocatorIter.next();
+ BaseAllocator.transferTo(nextAllocator, this, drillBuf);
+ owningAllocator = nextAllocator;
+
+ if (DEBUG) {
+ if (owningAllocator == bufferAllocator) {
+ historicalLog.logHistory(logger);
+ owningAllocator.historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException("Shared buffer release transfer to same owner");
+ }
+
+ final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) drillBuf.unwrap();
+ final Object oldObject = bufferAllocator.allocatedBuffers.get(udle);
+ if (oldObject != null) {
+ historicalLog.logHistory(logger);
+ bufferAllocator.historicalLog.logHistory(logger);
+ owningAllocator.historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+
+ throw new IllegalStateException("Inconsistent shared buffer release state (old owner)");
+ }
+
+ final Object newObject = owningAllocator.allocatedBuffers.get(udle);
+ if (newObject == null) {
+ historicalLog.logHistory(logger);
+ bufferAllocator.historicalLog.logHistory(logger);
+ owningAllocator.historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+
+ throw new IllegalStateException("Inconsistent shared buffer release state (new owner)");
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (DEBUG) {
+ checkBufferMap();
+ }
+ }
+
+ @Override
+ public BufferLedger shareWith(final Pointer<DrillBuf> pDrillBuf,
+ final BufferLedger otherLedger, final BufferAllocator otherAllocator,
+ final DrillBuf drillBuf, final int index, final int length, final int drillBufFlags) {
+ final BaseAllocator baseAllocator = (BaseAllocator) otherAllocator;
+ if (baseAllocator.isClosed) {
+ throw new AllocatorClosedException(
+ String.format("Attempt to use closed allocator[%d]", baseAllocator.id));
+ }
+
+ synchronized(ALLOCATOR_LOCK) {
+ /*
+ * This buffer is already shared, but we want to add more sharers.
+ *
+ * Create the new wrapper.
+ */
+ final DrillBuf newBuf = new DrillBuf(this, otherAllocator, drillBuf, index, length, drillBufFlags);
+ if (DEBUG) {
+ final UnsafeDirectLittleEndian udle = (UnsafeDirectLittleEndian) drillBuf.unwrap();
+ baseAllocator.historicalLog.recordEvent("SharedBufferLedger.shareWith(..., otherAllocator[%d], "
+ + "drillBuf[%d]{UnsafeDirectLittleEndian[identityHashCode == %d]}, ...)",
+ baseAllocator.id, drillBuf.getId(), System.identityHashCode(udle));
+
+ // Make sure the current ownership is still correct.
+ final Object object = owningAllocator.allocatedBuffers.get(udle); // This may not be protectable w/o ALLOCATOR_LOCK.
+ if (object == null) {
+ newBuf.release();
+ historicalLog.logHistory(logger);
+ owningAllocator.historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException("Buffer not found in owning allocator");
+ }
+ }
+
+ addMapping(newBuf, baseAllocator);
+ pDrillBuf.value = newBuf;
+
+ if (DEBUG) {
+ checkBufferMap();
+ }
+
+ return this;
+ }
+ }
+
+ @Override
+ public boolean transferTo(final BufferAllocator newAlloc,
+ final Pointer<BufferLedger> pNewLedger, final DrillBuf drillBuf) {
+ Preconditions.checkArgument(newAlloc != null, "New allocator cannot be null");
+ Preconditions.checkArgument(newAlloc instanceof BaseAllocator,
+ "New allocator isn't a BaseAllocator");
+ Preconditions.checkArgument(pNewLedger.value != null, "Candidate new ledger can't be null");
+ Preconditions.checkArgument(drillBuf != null, "DrillBuf can't be null");
+
+ final BaseAllocator newAllocator = (BaseAllocator) newAlloc;
+ if (newAllocator.isClosed) {
+ throw new AllocatorClosedException(String.format(
+ "Attempt to use closed allocator[%d]", newAllocator.id));
+ }
+
+ // This doesn't need the ALLOCATOR_LOCK, because it will already be held.
+ synchronized(this) {
+ try {
+ // Modify the buffer mapping to reflect the virtual transfer.
+ final BaseAllocator oldAllocator = bufferMap.put(drillBuf, newAllocator);
+ if (oldAllocator == null) {
+ final BaseAllocator bufAllocator = (BaseAllocator) drillBuf.getAllocator();
+ historicalLog.logHistory(logger);
+ bufAllocator.historicalLog.logHistory(logger);
+ drillBuf.logHistory(logger);
+ throw new IllegalStateException("No previous entry in SharedBufferLedger for drillBuf");
+ }
+
+ // Whatever happens, this is the new ledger.
+ pNewLedger.value = this;
+
+ /*
+ * If the oldAllocator was the owner, then transfer ownership to the new allocator.
+ */
+ if (oldAllocator == owningAllocator) {
+ owningAllocator = newAllocator;
+ return BaseAllocator.transferTo(newAllocator, this, drillBuf);
+ }
+
+ // Even though we didn't do a real transfer, tell if it would have fit the limit.
+ final int udleMaxCapacity = drillBuf.unwrap().maxCapacity();
+ return newAllocator.allocated + udleMaxCapacity < newAllocator.maxAllocation;
+ } finally {
+ if (DEBUG) {
+ checkBufferMap();
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public DrillBuf buffer(int size) {
+ return buffer(size, size);
+ }
+
+ private static String createErrorMsg(final BufferAllocator allocator, final int size) {
+ return String.format("Unable to allocate buffer of size %d due to memory limit. Current allocation: %d",
+ size, allocator.getAllocatedMemory());
+ }
+
+ @Override
+ public DrillBuf buffer(final int minSize, final int maxSize) {
+ Preconditions.checkArgument(minSize >= 0,
+ "the minimimum requested size must be non-negative");
+ Preconditions.checkArgument(maxSize >= 0,
+ "the maximum requested size must be non-negative");
+ Preconditions.checkArgument(minSize <= maxSize,
+ "the minimum requested size must be <= the maximum requested size");
+
+ if (DEBUG) {
+ injector.injectUnchecked(allocatorOwner.getExecutionControls(), CHILD_BUFFER_INJECTION_SITE);
+ }
+
+ // we can always return an empty buffer
+ if (minSize == 0) {
+ return getEmpty();
+ }
+
+ synchronized(ALLOCATOR_LOCK) {
+ // Don't allow the allocation if it will take us over the limit.
+ final long allocatedWas = allocated;
+ if (!reserve(null, maxSize, 0)) {
+ throw new OutOfMemoryRuntimeException(createErrorMsg(this, minSize));
+ }
+
+ final long reserved = allocated - allocatedWas;
+ assert reserved == maxSize;
+
+ final UnsafeDirectLittleEndian buffer = INNER_ALLOCATOR.directBuffer(minSize, maxSize);
+ final int actualSize = buffer.maxCapacity();
+ if (actualSize > maxSize) {
+ final int extraSize = actualSize - maxSize;
+ reserve(null, extraSize, RESERVE_F_IGNORE_MAX);
+ }
+
+ final DrillBuf wrapped = new DrillBuf(bufferLedger, this, buffer);
+ buffer.release(); // Should have been retained by the DrillBuf constructor.
+ assert buffer.refCnt() == 1 : "buffer was not retained by DrillBuf";
+ assert allocated <= owned : "allocated more memory than owned";
+
+ bufferAllocation += maxSize;
+ if (allocated > peakAllocated) {
+ peakAllocated = allocated;
+ }
+
+ if (allocatedBuffers != null) {
+ allocatedBuffers.put(buffer, bufferLedger);
+ }
+
+ return wrapped;
+ }
+ }
+
+ @Override
+ public ByteBufAllocator getUnderlyingAllocator() {
+ return INNER_ALLOCATOR;
+ }
+
+ @Override
+ public BufferAllocator newChildAllocator(final AllocatorOwner allocatorOwner,
+ final long initReservation, final long maxAllocation, final int flags) {
+ synchronized(ALLOCATOR_LOCK) {
+ final BaseAllocator childAllocator =
+ new ChildAllocator(this, allocatorOwner, allocationPolicy,
+ initReservation, maxAllocation, flags);
+
+ if (DEBUG) {
+ childAllocators.put(childAllocator, childAllocator);
+ historicalLog.recordEvent("allocator[%d] created new child allocator[%d]",
+ id, childAllocator.id);
+ }
+
+ return childAllocator;
+ }
+ }
+
+ @Override
+ public BufferAllocator getChildAllocator(FragmentContext fragmentContext,
+ final long initialReservation, final long maximumAllocation,
+ final boolean applyFragmentLimit) {
+ return newChildAllocator(allocatorOwner, initialReservation, maximumAllocation,
+ (applyFragmentLimit ? F_LIMITING_ROOT : 0));
+ }
+
+ /**
+ * Reserve space for a DrillBuf for an ownership transfer.
+ *
+ * @param drillBuf the buffer to reserve space for
+ */
+ private void reserveForBuf(final int maxCapacity) {
+ final boolean reserved = reserve(null, maxCapacity, RESERVE_F_IGNORE_MAX);
+ if (DEBUG) {
+ if (!reserved) {
+ throw new IllegalStateException("reserveForBuf() failed");
+ }
+ }
+ }
+
+ @Override
+ public boolean takeOwnership(final DrillBuf drillBuf) {
+ // If already owned by this, there's nothing to do.
+ if (this == drillBuf.getAllocator()) {
+ return true;
+ }
+
+ synchronized(ALLOCATOR_LOCK) {
+ return drillBuf.transferTo(this, bufferLedger);
+ }
+ }
+
+ @Override
+ public boolean shareOwnership(final DrillBuf drillBuf, final Pointer<DrillBuf> bufOut) {
+ synchronized(ALLOCATOR_LOCK) {
+ bufOut.value = drillBuf.shareWith(bufferLedger, this, 0, drillBuf.capacity());
+ return allocated < maxAllocation;
+ }
+ }
+
+ /*
+ * It's not clear why we'd allow anyone to set their own limit, need to see why this is used;
+ * this also doesn't make sense when the limits are constantly shifting, nor for other
+ * allocation policies.
+ */
+ @Deprecated
+ @Override
+ public void setFragmentLimit(long fragmentLimit) {
+ throw new UnsupportedOperationException("unimplemented:BaseAllocator.setFragmentLimit()");
+ }
+
+ /**
+ * Get the fragment limit. This was originally meant to be the maximum amount
+ * of memory the currently running fragment (which owns this allocator or
+ * its ancestor) may use. Note that the value may vary up and down over time
+ * as fragments come and go on the node.
+ *
+ * <p>This is deprecated because the concept is not entirely stable. This
+ * only makes sense for one particular memory allocation policy, which is the
+ * one that sets limits on what fragments on a node may use by dividing up all
+ * the memory evenly between all the fragments (see {@see #POLICY_PER_FRAGMENT}).
+ * Other allocation policies, such as the one that limits memory on a
+ * per-query-per-node basis, wouldn't have a value for this. But we need to have
+ * something until we figure out what to eplace this with because it is used by
+ * some operators (such as ExternalSortBatch) to determine how much memory they
+ * can use before they have to spill to disk.</p>
+ *
+ * @return the fragment limit
+ */
+ @Deprecated
+ @Override
+ public long getFragmentLimit() {
+ return policyAgent.getMemoryLimit(this);
+ }
+
+ @Override
+ public void close() {
+ /*
+ * Some owners may close more than once because of complex cleanup and shutdown
+ * procedures.
+ */
+ if (isClosed) {
+ return;
+ }
+
+ synchronized(ALLOCATOR_LOCK) {
+ if (DEBUG) {
+ verifyAllocator();
+
+ // are there outstanding child allocators?
+ if (!childAllocators.isEmpty()) {
+ for(final BaseAllocator childAllocator : childAllocators.keySet()) {
+ if (childAllocator.isClosed) {
+ logger.debug(String.format(
+ "Closed child allocator[%d] on parent allocator[%d]'s child list",
+ childAllocator.id, id));
+ }
+ }
+
+ historicalLog.logHistory(logger);
+ logChildren();
+
+ throw new IllegalStateException(
+ String.format("Allocator[%d] closed with outstanding child allocators", id));
+ }
+
+ // are there outstanding buffers?
+ final int allocatedCount = allocatedBuffers.size();
+ if (allocatedCount > 0) {
+ historicalLog.logHistory(logger);
+ logBuffers();
+
+ throw new IllegalStateException(
+ String.format("Allocator[%d] closed with outstanding buffers allocated (%d)",
+ id, allocatedCount));
+ }
+
+ if (reservations.size() != 0) {
+ historicalLog.logHistory(logger);
+ logReservations(ReservationsLog.ALL);
+
+ throw new IllegalStateException(
+ String.format("Allocator closed with outstanding reservations (%d)", reservations.size()));
+ }
+
+ /* TODO(DRILL-2740)
+ // We should be the only client holding a reference to empty now.
+ final int emptyRefCnt = empty.refCnt();
+ if (emptyRefCnt != 1) {
+ final String msg = "empty buffer refCnt() == " + emptyRefCnt + " (!= 1)";
+ final StringWriter stringWriter = new StringWriter();
+ stringWriter.write(msg);
+ stringWriter.write('\n');
+ empty.writeState(stringWriter);
+ logger.debug(stringWriter.toString());
+ throw new IllegalStateException(msg);
+ }
+ */
+ }
+
+ // Is there unaccounted-for outstanding allocation?
+ if (allocated > 0) {
+ if (DEBUG) {
+ historicalLog.logHistory(logger);
+ }
+ throw new IllegalStateException(
+ String.format("Unaccounted for outstanding allocation (%d)", allocated));
+ }
+
+ // Any unclaimed reservations?
+ if (preallocSpace > 0) {
+ if (DEBUG) {
+ historicalLog.logHistory(logger);
+ }
+ throw new IllegalStateException(
+ String.format("Unclaimed preallocation space (%d)", preallocSpace));
+ }
+
+ /*
+ * Let go of the empty buffer. If the allocator has been closed more than once,
+ * this may not be necessary, so check to avoid illegal states.
+ */
+ final int emptyCount = empty.refCnt();
+ if (emptyCount > 0) {
+ empty.release(emptyCount);
+ }
+
+ DrillAutoCloseables.closeNoChecked(policyAgent);
+
+ // Inform our parent allocator that we've closed.
+ if (parentAllocator != null) {
+ parentAllocator.releaseBytes(owned);
+ owned = 0;
+ parentAllocator.childClosed(this);
+ }
+
+ if (DEBUG) {
+ historicalLog.recordEvent("closed");
+ logger.debug(String.format(
+ "closed allocator[%d]; getsFromParent == %d, putsToParent == %d",
+ id, getsFromParent, putsToParent));
+ }
+
+ isClosed = true;
+ }
+ }
+
+ /**
+ * Log information about child allocators; only works if DEBUG
+ */
+ private void logChildren() {
+ logger.debug(String.format("allocator[%d] open child allocators BEGIN", id));
+ final Set<BaseAllocator> allocators = childAllocators.keySet();
+ for(final BaseAllocator childAllocator : allocators) {
+ childAllocator.historicalLog.logHistory(logger);
+ }
+ logger.debug(String.format("allocator[%d] open child allocators END", id));
+ }
+
+ private void logBuffers() {
+ final StringBuilder sb = new StringBuilder();
+ final Set<UnsafeDirectLittleEndian> udleSet = allocatedBuffers.keySet();
+
+ sb.append("allocator[");
+ sb.append(Integer.toString(id));
+ sb.append("], ");
+ sb.append(Integer.toString(udleSet.size()));
+ sb.append(" allocated buffers\n");
+
+ for(final UnsafeDirectLittleEndian udle : udleSet) {
+ sb.append(udle.toString());
+ sb.append("[identityHashCode == ");
+ sb.append(Integer.toString(System.identityHashCode(udle)));
+ sb.append("]\n");
+
+ final Collection<DrillBuf> drillBufs = DrillBuf.unwrappedGet(udle);
+ for(DrillBuf drillBuf : drillBufs) {
+ drillBuf.logHistory(logger);
+ }
+ }
+
+ logger.debug(sb.toString());
+ }
+
+ private enum ReservationsLog {
+ ALL,
+ UNUSED,
+ }
+
+ private void logReservations(final ReservationsLog reservationsLog) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(String.format("allocator[%d] reservations BEGIN", id));
+
+ final Set<Reservation> reservations = this.reservations.keySet();
+ for(final Reservation reservation : reservations) {
+ if ((reservationsLog == ReservationsLog.ALL)
+ || ((reservationsLog == ReservationsLog.UNUSED) && (!reservation.isUsed()))) {
+ reservation.writeHistoryToBuilder(sb);
+ }
+ }
+
+ sb.append(String.format("allocator[%d] reservations END", id));
+
+ logger.debug(sb.toString());
+ }
+
+ @Override
+ public long getAllocatedMemory() {
+ return allocated;
+ }
+
+ @Override
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public long getPeakMemoryAllocation() {
+ return peakAllocated;
+ }
+
+ @Override
+ public DrillBuf getEmpty() {
+ empty.retain(1);
+ // TODO(DRILL-2740) update allocatedBuffers
+ return empty;
+ }
+
+ private class Reservation extends AllocationReservation {
+ private final HistoricalLog historicalLog;
+
+ public Reservation() {
+ if (DEBUG) {
+ historicalLog = new HistoricalLog("Reservation[allocator[%d], %d]", id, System.identityHashCode(this));
+ historicalLog.recordEvent("created");
+ synchronized(ALLOCATOR_LOCK) {
+ reservations.put(this, this);
+ }
+ } else {
+ historicalLog = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ if (DEBUG) {
+ if (!isClosed()) {
+ final Object object;
+ synchronized(ALLOCATOR_LOCK) {
+ object = reservations.remove(this);
+ }
+ if (object == null) {
+ final StringBuilder sb = new StringBuilder();
+ writeHistoryToBuilder(sb);
+
+ logger.debug(sb.toString());
+ throw new IllegalStateException(
+ String.format("Didn't find closing reservation[%d]", System.identityHashCode(this)));
+ }
+
+ historicalLog.recordEvent("closed");
+ }
+ }
+
+ super.close();
+ }
+
+ @Override
+ protected boolean reserve(int nBytes) {
+ final boolean reserved;
+ synchronized(ALLOCATOR_LOCK) {
+ reserved = BaseAllocator.this.reserve(null, nBytes, 0);
+ if (reserved) {
+ preallocSpace += nBytes;
+ }
+ }
+
+ if (DEBUG) {
+ historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(reserved));
+ }
+
+ return reserved;
+ }
+
+ @Override
+ protected DrillBuf allocate(int nBytes) {
+ /*
+ * The reservation already added the requested bytes to the
+ * allocators owned and allocated bytes via reserve(). This
+ * ensures that they can't go away. But when we ask for the buffer
+ * here, that will add to the allocated bytes as well, so we need to
+ * return the same number back to avoid double-counting them.
+ */
+ synchronized(ALLOCATOR_LOCK) {
+ BaseAllocator.this.allocated -= nBytes;
+ final DrillBuf drillBuf = BaseAllocator.this.buffer(nBytes);
+ preallocSpace -= nBytes;
+
+ if (DEBUG) {
+ historicalLog.recordEvent("allocate() => %s",
+ drillBuf == null ? "null" : String.format("DrillBuf[%d]", drillBuf.getId()));
+ }
+
+ return drillBuf;
+ }
+ }
+
+ @Override
+ protected void releaseReservation(int nBytes) {
+ synchronized(ALLOCATOR_LOCK) {
+ releaseBytes(nBytes);
+ preallocSpace -= nBytes;
+ }
+
+ if (DEBUG) {
+ historicalLog.recordEvent("releaseReservation(%d)", nBytes);
+ }
+ }
+
+ private String getState() {
+ return String.format("size == %d, isUsed == %s", getSize(), Boolean.toString(isUsed()));
+ }
+
+ private void writeToBuilder(final StringBuilder sb) {
+ sb.append(String.format("reservation[%d]: ", System.identityHashCode(this)));
+ sb.append(getState());
+ }
+
+ /**
+ * Only works for DEBUG
+ *
+ * @param sb builder to write to
+ */
+ private void writeHistoryToBuilder(final StringBuilder sb) {
+ historicalLog.buildHistory(sb, getState());
+ }
+ }
+
+ @Override
+ public AllocationReservation newReservation() {
+ return new Reservation();
+ }
+
+ /**
+ * Verifies the accounting state of the allocator. Only works for DEBUG.
+ *
+ * @throws IllegalStateException when any problems are found
+ */
+ protected void verifyAllocator() {
+ final IdentityHashMap<UnsafeDirectLittleEndian, BaseAllocator> buffersSeen = new IdentityHashMap<>();
+ verifyAllocator(buffersSeen);
+ }
+
+ /**
+ * Verifies the accounting state of the allocator. Only works for DEBUG.
+ *
+ * <p>This overload is used for recursive calls, allowing for checking that DrillBufs are unique
+ * across all allocators that are checked.</p>
+ *
+ * @param buffersSeen a map of buffers that have already been seen when walking a tree of allocators
+ * @throws IllegalStateException when any problems are found
+ */
+ protected void verifyAllocator(
+ final IdentityHashMap<UnsafeDirectLittleEndian, BaseAllocator> buffersSeen) {
+ synchronized(ALLOCATOR_LOCK) {
+ // verify purely local accounting
+ if (allocated > owned) {
+ historicalLog.logHistory(logger);
+ throw new IllegalStateException("Allocator (id = " + id + ") has allocated more than it owns");
+ }
+
+ // the empty buffer should still be empty
+ final long emptyCapacity = empty.maxCapacity();
+ if (emptyCapacity != 0) {
+ throw new IllegalStateException("empty buffer maxCapacity() == " + emptyCapacity + " (!= 0)");
+ }
+
+ // The remaining tests can only be performed if we're in debug mode.
+ if (!DEBUG) {
+ return;
+ }
+
+ // verify my direct descendants
+ final Set<BaseAllocator> childSet = childAllocators.keySet();
+ for(final BaseAllocator childAllocator : childSet) {
+ childAllocator.verifyAllocator(buffersSeen);
+ }
+
+ /*
+ * Verify my relationships with my descendants.
+ *
+ * The sum of direct child allocators' owned memory must be <= my allocated memory;
+ * my allocated memory also includes DrillBuf's directly allocated by me.
+ */
+ long childTotal = 0;
+ for(final BaseAllocator childAllocator : childSet) {
+ childTotal += childAllocator.owned;
+ }
+ if (childTotal > allocated) {
+ historicalLog.logHistory(logger);
+ logger.debug("allocator[" + id + "] child event logs BEGIN");
+ for(final BaseAllocator childAllocator : childSet) {
+ childAllocator.historicalLog.logHistory(logger);
+ }
+ logger.debug("allocator[" + id + "] child event logs END");
+ throw new IllegalStateException(
+ "Child allocators own more memory (" + childTotal + ") than their parent (id = "
+ + id + " ) has allocated (" + allocated + ')');
+ }
+
+ // Furthermore, the amount I've allocated should be that plus buffers I've allocated.
+ long bufferTotal = 0;
+ final Set<UnsafeDirectLittleEndian> udleSet = allocatedBuffers.keySet();
+ for(final UnsafeDirectLittleEndian udle : udleSet) {
+ /*
+ * Even when shared, DrillBufs are rewrapped, so we should never see the same
+ * instance twice.
+ */
+ final BaseAllocator otherOwner = buffersSeen.get(udle);
+ if (otherOwner != null) {
+ throw new IllegalStateException("This allocator's drillBuf already owned by another allocator");
+ }
+ buffersSeen.put(udle, this);
+
+ bufferTotal += udle.maxCapacity();
+ }
+
+ // Preallocated space has to be accounted for
+ final Set<Reservation> reservationSet = reservations.keySet();
+ long reservedTotal = 0;
+ for(final Reservation reservation : reservationSet) {
+ if (!reservation.isUsed()) {
+ reservedTotal += reservation.getSize();
+ }
+ }
+ if (reservedTotal != preallocSpace) {
+ logReservations(ReservationsLog.UNUSED);
+
+ throw new IllegalStateException(
+ String.format("This allocator's reservedTotal(%d) doesn't match preallocSpace (%d)",
+ reservedTotal, preallocSpace));
+ }
+
+ if (bufferTotal + reservedTotal + childTotal != allocated) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("allocator[");
+ sb.append(Integer.toString(id));
+ sb.append("]\nallocated: ");
+ sb.append(Long.toString(allocated));
+ sb.append(" allocated - (bufferTotal + reservedTotal + childTotal): ");
+ sb.append(Long.toString(allocated - (bufferTotal + reservedTotal + childTotal)));
+ sb.append('\n');
+
+ if (bufferTotal != 0) {
+ sb.append("buffer total: ");
+ sb.append(Long.toString(bufferTotal));
+ sb.append('\n');
+ dumpBuffers(sb, udleSet);
+ }
+
+ if (childTotal != 0) {
+ sb.append("child total: ");
+ sb.append(Long.toString(childTotal));
+ sb.append('\n');
+
+ for(final BaseAllocator childAllocator : childSet) {
+ sb.append("child allocator[");
+ sb.append(Integer.toString(childAllocator.id));
+ sb.append("] owned ");
+ sb.append(Long.toString(childAllocator.owned));
+ sb.append('\n');
+ }
+ }
+
+ if (reservedTotal != 0) {
+ sb.append(String.format("reserved total : ", reservedTotal));
+ for(final Reservation reservation : reservationSet) {
+ reservation.writeToBuilder(sb);
+ sb.append('\n');
+ }
+ }
+
+ logger.debug(sb.toString());
+ throw new IllegalStateException(String.format(
+ "allocator[%d]: buffer space (%d) + prealloc space (%d) + child space (%d) != allocated (%d)",
+ id, bufferTotal, reservedTotal, childTotal, allocated));
+ }
+ }
+ }
+
+ private void dumpBuffers(final StringBuilder sb, final Set<UnsafeDirectLittleEndian> udleSet) {
+ for(final UnsafeDirectLittleEndian udle : udleSet) {
+ sb.append("UnsafeDirectLittleEndian[dentityHashCode == ");
+ sb.append(Integer.toString(System.identityHashCode(udle)));
+ sb.append("] size ");
+ sb.append(Integer.toString(udle.maxCapacity()));
+ sb.append('\n');
+ }
+ }
+
+ public static boolean isDebug() {
+ return DEBUG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/BufferLedger.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/BufferLedger.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/BufferLedger.java
new file mode 100644
index 0000000..d649940
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/BufferLedger.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import org.apache.drill.exec.util.Pointer;
+
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
+
+/**
+ * BufferLedger is an interface meant to facility the private
+ * exchange of information between a DrillBuf and its owning
+ * allocator. To that end, a number of DrillBuf constructors
+ * and methods take a BufferLedger as an argument, yet there
+ * are no public implementations of BufferLedger; they all
+ * come from inner classes implemented by allocators, ensuring
+ * that allocators can give DrillBufs the access they need when
+ * they are created or asked to perform complex operations such
+ * as ownership sharing or transfers.
+ */
+public interface BufferLedger {
+ /**
+ * Get the underlying pooled allocator used by this ledger's
+ * allocator.
+ *
+ * <p>This is usually used to create the shared singleton
+ * empty buffer. Don't use it to create random buffers, because
+ * they won't be tracked, and we won't be able to find leaks.</p>
+ *
+ * @return the underlying pooled allocator
+ */
+ public PooledByteBufAllocatorL getUnderlyingAllocator();
+
+ /**
+ * Return a buffer's memory to the allocator.
+ *
+ * @param drillBuf the DrillBuf that was freed
+ */
+ public void release(DrillBuf drillBuf);
+
+ /**
+ * Share ownership of a buffer with another allocator. As far as reader
+ * and writer index positions go, this acts like a new slice that is owned
+ * by the target allocator, but which has it's own lifetime (i.e., it doesn't
+ * share the fate of the original buffer, unlike real slices).
+ *
+ * @param pDrillBuf returns the new DrillBuf that is shared
+ * @param otherLedger the ledger the new DrillBuf should use
+ * @param otherAllocator the new allocator-owner
+ * @param drillBuf the original DrillBuf to be shared
+ * @param index the starting index to be shared (as for slicing)
+ * @param length the length to be shared (as for slicing)
+ * @param drillBufFlags private flags passed through from the allocator
+ * (this call originates with a call to BufferAllocator.shareOwnership()).
+ * @return the ledger the calling DrillBuf must use from this point forward;
+ * this may not match it's original ledger, as allocators provide multiple
+ * implementations of ledgers to cope with sharing and slicing
+ */
+ public BufferLedger shareWith(Pointer<DrillBuf> pDrillBuf,
+ BufferLedger otherLedger, BufferAllocator otherAllocator,
+ DrillBuf drillBuf, int index, int length, int drillBufFlags);
+
+ /**
+ * Transfer the ownership of a buffer to another allocator. This doesn't change
+ * any of the buffer's reader or writer positions or size, just which allocator
+ * owns it. The reference count stays the same.
+ *
+ * @param newAlloc the new allocator (the one to transfer to)
+ * @param pNewLedger a Pointer<> initialized with a candidate ledger; this
+ * may be used, or it may not, depending on the sharing state of the buffer.
+ * The caller is required to use whatever ledger is in pNewLedger on return
+ * @param drillBuf the buffer to transfer
+ * @return true if the transfer kept the target allocator within its maximum
+ * allocation limit; false if the allocator now owns more memory than its
+ * creation-time maximum
+ */
+ public boolean transferTo(final BufferAllocator newAlloc,
+ final Pointer<BufferLedger> pNewLedger, final DrillBuf drillBuf);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/ChainedAllocatorOwner.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/ChainedAllocatorOwner.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/ChainedAllocatorOwner.java
new file mode 100644
index 0000000..37475fd
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/ChainedAllocatorOwner.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+/**
+ * An {@link AllocatorOwner} implementation that supports linking ownership to
+ * a parent object. This can be convenient for identification purposes, when the
+ * parent of the allocator's owner is a better handle for debugging.
+ *
+ * <p>The implementation of {@link #getExecutionControls()} returns the childOwner's
+ * response to getExecutionControls().</p>
+ */
+public class ChainedAllocatorOwner implements AllocatorOwner {
+ private final AllocatorOwner childOwner;
+ private final AllocatorOwner parentOwner;
+
+ /**
+ * Constructor.
+ *
+ * @param childOwner the owner of the allocator
+ * @param parentOwner the object that owns or created the childOwner
+ */
+ public ChainedAllocatorOwner(AllocatorOwner childOwner, AllocatorOwner parentOwner) {
+ this.childOwner = childOwner;
+ this.parentOwner = parentOwner;
+ }
+
+ @Override
+ public String toString() {
+ return childOwner + "(owned by " + parentOwner + ')';
+ }
+
+ @Override
+ public ExecutionControls getExecutionControls() {
+ return childOwner.getExecutionControls();
+ }
+
+ @Override
+ public FragmentContext getFragmentContext() {
+ return childOwner.getFragmentContext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
new file mode 100644
index 0000000..8636d26
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/ChildAllocator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+/**
+ * Child allocator class. Only slightly different from the {@see RootAllocator},
+ * in that these can't be created directly, but must be obtained from
+ * {@see BufferAllocator#newChildAllocator(AllocatorOwner, long, long, int)}.
+
+ * <p>Child allocators can only be created by the root, or other children, so
+ * this class is package private.</p>
+ */
+class ChildAllocator extends BaseAllocator {
+ /**
+ * Constructor.
+ *
+ * @param parentAllocator parent allocator -- the one creating this child
+ * @param allocatorOwner a handle to the object making the request
+ * @param allocationPolicy the allocation policy to use; the policy for all
+ * allocators must match for each invocation of a drillbit
+ * @param initReservation initial amount of space to reserve (obtained from the parent)
+ * @param maxAllocation maximum amount of space that can be obtained from this allocator;
+ * note this includes direct allocations (via {@see BufferAllocator#buffer(int, int)}
+ * et al) and requests from descendant allocators. Depending on the allocation policy in
+ * force, even less memory may be available
+ * @param flags one or more of BaseAllocator.F_* flags
+ */
+ ChildAllocator(BaseAllocator parentAllocator, AllocatorOwner allocatorOwner,
+ AllocationPolicy allocationPolicy, long initReservation, long maxAllocation, int flags) {
+ super(parentAllocator, allocatorOwner, allocationPolicy, initReservation, maxAllocation, flags);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/RootAllocator.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
new file mode 100644
index 0000000..63b9987
--- /dev/null
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/RootAllocator.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorL;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The root allocator for using direct memory inside a Drillbit. Supports creating a
+ * tree of descendant child allocators.
+ */
+public class RootAllocator extends BaseAllocator {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootAllocator.class);
+
+ // TODO these statics, and others in BaseAllocator, may be a problem for multiple in-process Drillbits
+ private static final PooledByteBufAllocatorL innerAllocator = PooledByteBufAllocatorL.DEFAULT;
+ private static long maxDirect;
+
+ public static AllocationPolicy getAllocationPolicy() {
+ final String policyName = System.getProperty(ExecConstants.ALLOCATION_POLICY,
+ BaseAllocator.POLICY_LOCAL_MAX_NAME); // TODO try with PER_FRAGMENT_NAME
+
+ switch(policyName) {
+ case POLICY_PER_FRAGMENT_NAME:
+ return POLICY_PER_FRAGMENT;
+ case POLICY_LOCAL_MAX_NAME:
+ return POLICY_LOCAL_MAX;
+ default:
+ throw new IllegalArgumentException("Unrecognized allocation policy name \"" + policyName + "\"");
+ }
+ }
+
+ public RootAllocator(final DrillConfig drillConfig) {
+ this(getAllocationPolicy(), 0, Math.min(
+ DrillConfig.getMaxDirectMemory(), drillConfig.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)), 0);
+ }
+
+ public static long getMaxDirect() {
+ return maxDirect;
+ }
+
+ /**
+ * Provide statistics via JMX for each RootAllocator.
+ */
+ private class AllocatorsStats implements AllocatorsStatsMXBean {
+ @Override
+ public long getMaxDirectMemory() {
+ return maxDirect;
+ }
+ }
+
+ private static class RootAllocatorOwner implements AllocatorOwner {
+ @Override
+ public ExecutionControls getExecutionControls() {
+ return null;
+ }
+
+ @Override
+ public FragmentContext getFragmentContext() {
+ return null;
+ }
+ }
+
+ @VisibleForTesting
+ public RootAllocator(final AllocationPolicy allocationPolicy,
+ final long initAllocation, final long maxReservation, final int flags) {
+ super(null, new RootAllocatorOwner(), allocationPolicy, initAllocation, maxDirect = maxReservation, flags);
+ assert (flags & F_LIMITING_ROOT) == 0 : "the RootAllocator shouldn't be a limiting root";
+
+ try {
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ final ObjectName objectName = new ObjectName("org.apache.drill.exec.memory:Allocators=" + id);
+ final AllocatorsStats mbean = new AllocatorsStats();
+ mbs.registerMBean(mbean, objectName);
+ } catch(Exception e) {
+ logger.info("Exception setting up AllocatorsStatsMBean", e);
+ }
+ }
+
+ @Override
+ public ByteBufAllocator getUnderlyingAllocator() {
+ return innerAllocator;
+ }
+
+ @Override
+ protected boolean canIncreaseOwned(final long nBytes, final int flags) {
+ // the end total has already been checked against maxAllocation, so we can just return true
+ return true;
+ }
+
+ /**
+ * Verify the accounting state of the allocation system.
+ */
+ @VisibleForTesting
+ public void verify() {
+ verifyAllocator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
index a413e4a..078fc21 100644
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.memory;
import org.apache.drill.common.config.DrillConfig;
-import com.google.common.annotations.VisibleForTesting;
-
public class RootAllocatorFactory {
/**
* Constructor to prevent instantiation of this static utility class.
@@ -34,34 +32,12 @@ public class RootAllocatorFactory {
* @return a new root allocator
*/
public static BufferAllocator newRoot(final DrillConfig drillConfig) {
-/* TODO(DRILL-1942)
+/* TODO(cwestin)
if (BaseAllocator.DEBUG) {
- return new TopLevelAllocator(drillConfig);
+ return new RootAllocator(drillConfig);
}
*/
-
- return new TopLevelAllocator(drillConfig);
- }
-
- /**
- * Factory method for testing, where a DrillConfig may not be available.
- *
- * @param allocationPolicy the allocation policy
- * @param initAllocation initial allocation from parent
- * @param maxReservation maximum amount of memory this can hand out
- * @param flags one of BufferAllocator's F_* flags
- * @return a new root allocator
- */
-/* TODO(DRILL-1942)
- @VisibleForTesting
- public static BufferAllocator newRoot(
- final AllocationPolicy allocationPolicy,
- final long initAllocation, final long maxReservation, final int flags) {
- if (BaseAllocator.DEBUG) {
- return new RootAllocatorDebug(allocationPolicy, initAllocation, maxReservation, flags);
- }
-
- return new RootAllocator(allocationPolicy, initAllocation, maxReservation, flags);
+ return new RootAllocator(drillConfig);
+ // TODO(cwestin) return new TopLevelAllocator(drillConfig);
}
-*/
}