You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:25 UTC
[02/13] drill git commit: DRILL-4134: Add new allocator
http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 1e96238..36bcacf 100644
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -26,66 +26,71 @@ import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.AssertionUtil;
import org.apache.drill.exec.util.Pointer;
public class TopLevelAllocator implements BufferAllocator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopLevelAllocator.class);
+ private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(TopLevelAllocator.class);
public static final String CHILD_BUFFER_INJECTION_SITE = "child.buffer";
- private static final PooledByteBufAllocatorL ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getInstance());
-
- public static final String TOP_LEVEL_MAX_ALLOC = "drill.memory.top.max";
- public static final String ERROR_ON_MEMORY_LEAK = "drill.memory.debug.error_on_leak";
-
public static long MAXIMUM_DIRECT_MEMORY;
private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled();
private final Map<ChildAllocator, StackTraceElement[]> childrenMap;
- private final PooledByteBufAllocatorL innerAllocator;
- private final AccountorImpl acct;
+ private final PooledByteBufAllocatorL innerAllocator = PooledByteBufAllocatorL.DEFAULT;
+ private final Accountor acct;
private final boolean errorOnLeak;
private final DrillBuf empty;
- private final DrillConfig config;
+
+ private final AtomicInteger idGenerator = new AtomicInteger(0);
private TopLevelAllocator(DrillConfig config, long maximumAllocation, boolean errorOnLeak){
MAXIMUM_DIRECT_MEMORY = maximumAllocation;
- innerAllocator = ALLOCATOR;
- this.config=(config!=null) ? config : DrillConfig.create();
this.errorOnLeak = errorOnLeak;
- this.acct = new AccountorImpl(config, errorOnLeak, null, null, maximumAllocation, 0, true);
+ this.acct = new Accountor(config, errorOnLeak, null, null, maximumAllocation, 0, true);
this.empty = DrillBuf.getEmpty(this, acct);
this.childrenMap = ENABLE_ACCOUNTING ? new IdentityHashMap<ChildAllocator, StackTraceElement[]>() : null;
}
TopLevelAllocator(DrillConfig config) {
- this(config, Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(TOP_LEVEL_MAX_ALLOC)),
- config.getBoolean(ERROR_ON_MEMORY_LEAK)
+ this(config, Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)),
+ config.getBoolean(ExecConstants.ERROR_ON_MEMORY_LEAK)
);
}
@Override
+ public int getId() {
+ return idGenerator.incrementAndGet();
+ }
+
+ @Override
public boolean takeOwnership(DrillBuf buf) {
return buf.transferAccounting(acct);
}
@Override
- public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
+ public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
DrillBuf b = new DrillBuf(this, acct, buf);
out.value = b;
return acct.transferIn(b, b.capacity());
}
+ @Override
public DrillBuf buffer(int min, int max) {
if (min == 0) {
return empty;
}
if(!acct.reserve(min)) {
- throw new OutOfMemoryException(createErrorMsg(this, min));
+ throw new OutOfMemoryRuntimeException(createErrorMsg(this, min));
}
try {
@@ -96,7 +101,7 @@ public class TopLevelAllocator implements BufferAllocator {
} catch (OutOfMemoryError e) {
if ("Direct buffer memory".equals(e.getMessage())) {
acct.release(min);
- throw new OutOfMemoryException(createErrorMsg(this, min), e);
+ throw new OutOfMemoryRuntimeException(createErrorMsg(this, min), e);
} else {
throw e;
}
@@ -124,20 +129,25 @@ public class TopLevelAllocator implements BufferAllocator {
}
@Override
- public BufferAllocator getChildAllocator(LimitConsumer limitConsumer, long initialReservation,
- long maximumReservation,
- boolean applyFragmentLimit) {
+ public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
+ long initialReservation, long maximumReservation, int flags) {
+ return getChildAllocator(allocatorOwner.getFragmentContext(), initialReservation,
+ maximumReservation, (flags & BufferAllocator.F_LIMITING_ROOT) != 0);
+ }
+
+ @Override
+ public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation,
+ long maximumReservation, boolean applyFragmentLimit) {
if(!acct.reserve(initialReservation)){
logger.debug(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
- throw new OutOfMemoryException(
+ throw new OutOfMemoryRuntimeException(
String
.format(
"You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.",
initialReservation, acct.getCapacity() - acct.getAllocation()));
}
logger.debug("New child allocator with initial reservation {}", initialReservation);
- ChildAllocator allocator = new ChildAllocator(limitConsumer, acct, maximumReservation, initialReservation,
- childrenMap, applyFragmentLimit);
+ ChildAllocator allocator = new ChildAllocator(context, acct, maximumReservation, initialReservation, childrenMap, applyFragmentLimit);
if(ENABLE_ACCOUNTING){
childrenMap.put(allocator, Thread.currentThread().getStackTrace());
}
@@ -146,17 +156,12 @@ public class TopLevelAllocator implements BufferAllocator {
}
@Override
- public void resetLimits() {
- acct.resetFragmentLimits();
- }
-
- @Override
- public void setLimit(long limit){
+ public void setFragmentLimit(long limit){
acct.setFragmentLimit(limit);
}
@Override
- public long getLimit(){
+ public long getFragmentLimit(){
return acct.getFragmentLimit();
}
@@ -190,26 +195,30 @@ public class TopLevelAllocator implements BufferAllocator {
private class ChildAllocator implements BufferAllocator {
private final DrillBuf empty;
- private AccountorImpl childAcct;
+ private Accountor childAcct;
private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>();
private boolean closed = false;
- private LimitConsumer limitConsumer;
+ private FragmentContext fragmentContext;
private Map<ChildAllocator, StackTraceElement[]> thisMap;
- private boolean applyFragmentLimit;
- public ChildAllocator(LimitConsumer limitConsumer,
- AccountorImpl parentAccountor,
+ public ChildAllocator(FragmentContext context,
+ Accountor parentAccountor,
long max,
long pre,
Map<ChildAllocator,
StackTraceElement[]> map,
boolean applyFragmentLimit) {
assert max >= pre;
- this.applyFragmentLimit = applyFragmentLimit;
- this.limitConsumer = limitConsumer;
- childAcct = new AccountorImpl(config, errorOnLeak, limitConsumer, parentAccountor, max, pre, applyFragmentLimit);
+ DrillConfig drillConf = context != null ? context.getConfig() : null;
+ childAcct = new Accountor(drillConf, errorOnLeak, context, parentAccountor, max, pre, applyFragmentLimit);
+ this.fragmentContext=context;
thisMap = map;
- empty = DrillBuf.getEmpty(this, childAcct);
+ this.empty = DrillBuf.getEmpty(this, childAcct);
+ }
+
+ @Override
+ public int getId() {
+ return idGenerator.incrementAndGet();
}
@Override
@@ -218,7 +227,7 @@ public class TopLevelAllocator implements BufferAllocator {
}
@Override
- public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
+ public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
DrillBuf b = new DrillBuf(this, acct, buf);
out.value = b;
return acct.transferIn(b, b.capacity());
@@ -227,11 +236,15 @@ public class TopLevelAllocator implements BufferAllocator {
@Override
public DrillBuf buffer(int size, int max) {
+ if (ENABLE_ACCOUNTING) {
+ injector.injectUnchecked(fragmentContext, CHILD_BUFFER_INJECTION_SITE);
+ }
+
if (size == 0) {
return empty;
}
if(!childAcct.reserve(size)) {
- throw new OutOfMemoryException(createErrorMsg(this, size));
+ throw new OutOfMemoryRuntimeException(createErrorMsg(this, size));
}
try {
@@ -242,13 +255,14 @@ public class TopLevelAllocator implements BufferAllocator {
} catch (OutOfMemoryError e) {
if ("Direct buffer memory".equals(e.getMessage())) {
childAcct.release(size);
- throw new OutOfMemoryException(createErrorMsg(this, size), e);
+ throw new OutOfMemoryRuntimeException(createErrorMsg(this, size), e);
} else {
throw e;
}
}
}
+ @Override
public DrillBuf buffer(int size) {
return buffer(size, size);
}
@@ -259,39 +273,40 @@ public class TopLevelAllocator implements BufferAllocator {
}
@Override
- public BufferAllocator getChildAllocator(LimitConsumer limitConsumer, long initialReservation,
- long maximumReservation,
- boolean applyFragmentLimit) {
+ public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
+ long initialReservation, long maximumReservation, int flags) {
+ return getChildAllocator(allocatorOwner.getFragmentContext(), initialReservation,
+ maximumReservation, (flags & BufferAllocator.F_LIMITING_ROOT) != 0);
+ }
+
+ @Override
+ public BufferAllocator getChildAllocator(FragmentContext context,
+ long initialReservation, long maximumReservation, boolean applyFragmentLimit) {
if (!childAcct.reserve(initialReservation)) {
- throw new OutOfMemoryException(
+ throw new OutOfMemoryRuntimeException(
String
.format(
"You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.",
initialReservation, childAcct.getAvailable()));
}
logger.debug("New child allocator with initial reservation {}", initialReservation);
- ChildAllocator newChildAllocator = new ChildAllocator(limitConsumer, childAcct, maximumReservation,
- initialReservation, null, applyFragmentLimit);
+ ChildAllocator newChildAllocator = new ChildAllocator(context, childAcct, maximumReservation, initialReservation, null, applyFragmentLimit);
this.children.put(newChildAllocator, Thread.currentThread().getStackTrace());
return newChildAllocator;
}
- public PreAllocator getNewPreAllocator() {
- return new PreAlloc(this, this.childAcct);
- }
-
@Override
- public void resetLimits(){
- childAcct.resetFragmentLimits();
+ public AllocationReservation newReservation() {
+ return new PreAlloc(this, this.childAcct);
}
@Override
- public void setLimit(long limit){
+ public void setFragmentLimit(long limit){
childAcct.setFragmentLimit(limit);
}
@Override
- public long getLimit(){
+ public long getFragmentLimit(){
return childAcct.getFragmentLimit();
}
@@ -312,9 +327,10 @@ public class TopLevelAllocator implements BufferAllocator {
}
+ final FragmentHandle handle = fragmentContext.getHandle();
IllegalStateException e = new IllegalStateException(String.format(
- "Failure while trying to close child allocator: Child level allocators not closed. Identifier: %s. Stack trace: \n %s",
- limitConsumer.getIdentifier(), sb.toString()));
+ "Failure while trying to close child allocator: Child level allocators not closed. Fragment %d:%d. Stack trace: \n %s",
+ handle.getMajorFragmentId(), handle.getMinorFragmentId(), sb.toString()));
if (errorOnLeak) {
throw e;
} else {
@@ -345,15 +361,14 @@ public class TopLevelAllocator implements BufferAllocator {
public DrillBuf getEmpty() {
return empty;
}
-
-
}
- public PreAllocator getNewPreAllocator() {
+ @Override
+ public AllocationReservation newReservation() {
return new PreAlloc(this, this.acct);
}
- public class PreAlloc implements PreAllocator{
+ public class PreAlloc extends AllocationReservation {
int bytes = 0;
final Accountor acct;
final BufferAllocator allocator;
@@ -362,25 +377,28 @@ public class TopLevelAllocator implements BufferAllocator {
this.allocator = allocator;
}
- /**
- *
- */
- public boolean preAllocate(int bytes) {
-
+ @Override
+ protected boolean reserve(int bytes) {
if (!acct.reserve(bytes)) {
return false;
}
+
this.bytes += bytes;
return true;
-
}
-
- public DrillBuf getAllocation() {
+ @Override
+ protected DrillBuf allocate(int bytes) {
+ assert this.bytes == bytes : "allocation size mismatch";
DrillBuf b = new DrillBuf(allocator, acct, innerAllocator.directBuffer(bytes, bytes));
acct.reserved(bytes, b);
return b;
}
+
+ @Override
+ protected void releaseReservation(int nBytes) {
+ acct.release(nBytes);
+ }
}
private static String createErrorMsg(final BufferAllocator allocator, final int size) {