You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:25 UTC

[02/13] drill git commit: DRILL-4134: Add new allocator

http://git-wip-us.apache.org/repos/asf/drill/blob/53dcabeb/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 1e96238..36bcacf 100644
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -26,66 +26,71 @@ import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.AssertionUtil;
 import org.apache.drill.exec.util.Pointer;
 
 public class TopLevelAllocator implements BufferAllocator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopLevelAllocator.class);
+  private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(TopLevelAllocator.class);
   public static final String CHILD_BUFFER_INJECTION_SITE = "child.buffer";
 
-  private static final PooledByteBufAllocatorL ALLOCATOR = new PooledByteBufAllocatorL(DrillMetrics.getInstance());
-
-  public static final String TOP_LEVEL_MAX_ALLOC = "drill.memory.top.max";
-  public static final String ERROR_ON_MEMORY_LEAK = "drill.memory.debug.error_on_leak";
-
   public static long MAXIMUM_DIRECT_MEMORY;
 
   private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled();
   private final Map<ChildAllocator, StackTraceElement[]> childrenMap;
-  private final PooledByteBufAllocatorL innerAllocator;
-  private final AccountorImpl acct;
+  private final PooledByteBufAllocatorL innerAllocator = PooledByteBufAllocatorL.DEFAULT;
+  private final Accountor acct;
   private final boolean errorOnLeak;
   private final DrillBuf empty;
-  private final DrillConfig config;
+
+  private final AtomicInteger idGenerator = new AtomicInteger(0);
 
   private TopLevelAllocator(DrillConfig config, long maximumAllocation, boolean errorOnLeak){
     MAXIMUM_DIRECT_MEMORY = maximumAllocation;
-    innerAllocator = ALLOCATOR;
-    this.config=(config!=null) ? config : DrillConfig.create();
     this.errorOnLeak = errorOnLeak;
-    this.acct = new AccountorImpl(config, errorOnLeak, null, null, maximumAllocation, 0, true);
+    this.acct = new Accountor(config, errorOnLeak, null, null, maximumAllocation, 0, true);
     this.empty = DrillBuf.getEmpty(this, acct);
     this.childrenMap = ENABLE_ACCOUNTING ? new IdentityHashMap<ChildAllocator, StackTraceElement[]>() : null;
   }
 
   TopLevelAllocator(DrillConfig config) {
-    this(config, Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(TOP_LEVEL_MAX_ALLOC)),
-        config.getBoolean(ERROR_ON_MEMORY_LEAK)
+    this(config, Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)),
+        config.getBoolean(ExecConstants.ERROR_ON_MEMORY_LEAK)
         );
   }
 
   @Override
+  public int getId() {
+    return idGenerator.incrementAndGet();
+  }
+
+  @Override
   public boolean takeOwnership(DrillBuf buf) {
     return buf.transferAccounting(acct);
   }
 
   @Override
-  public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
+  public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
     DrillBuf b = new DrillBuf(this, acct, buf);
     out.value = b;
     return acct.transferIn(b, b.capacity());
   }
 
+  @Override
   public DrillBuf buffer(int min, int max) {
     if (min == 0) {
       return empty;
     }
     if(!acct.reserve(min)) {
-      throw new OutOfMemoryException(createErrorMsg(this, min));
+      throw new OutOfMemoryRuntimeException(createErrorMsg(this, min));
     }
 
     try {
@@ -96,7 +101,7 @@ public class TopLevelAllocator implements BufferAllocator {
     } catch (OutOfMemoryError e) {
       if ("Direct buffer memory".equals(e.getMessage())) {
         acct.release(min);
-        throw new OutOfMemoryException(createErrorMsg(this, min), e);
+        throw new OutOfMemoryRuntimeException(createErrorMsg(this, min), e);
       } else {
         throw e;
       }
@@ -124,20 +129,25 @@ public class TopLevelAllocator implements BufferAllocator {
   }
 
   @Override
-  public BufferAllocator getChildAllocator(LimitConsumer limitConsumer, long initialReservation,
-      long maximumReservation,
-      boolean applyFragmentLimit) {
+  public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
+      long initialReservation, long maximumReservation, int flags) {
+    return getChildAllocator(allocatorOwner.getFragmentContext(), initialReservation,
+        maximumReservation, (flags & BufferAllocator.F_LIMITING_ROOT) != 0);
+  }
+
+    @Override
+  public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation,
+      long maximumReservation, boolean applyFragmentLimit) {
     if(!acct.reserve(initialReservation)){
       logger.debug(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
-      throw new OutOfMemoryException(
+      throw new OutOfMemoryRuntimeException(
           String
               .format(
                   "You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.",
                   initialReservation, acct.getCapacity() - acct.getAllocation()));
     }
     logger.debug("New child allocator with initial reservation {}", initialReservation);
-    ChildAllocator allocator = new ChildAllocator(limitConsumer, acct, maximumReservation, initialReservation,
-        childrenMap, applyFragmentLimit);
+    ChildAllocator allocator = new ChildAllocator(context, acct, maximumReservation, initialReservation, childrenMap, applyFragmentLimit);
     if(ENABLE_ACCOUNTING){
       childrenMap.put(allocator, Thread.currentThread().getStackTrace());
     }
@@ -146,17 +156,12 @@ public class TopLevelAllocator implements BufferAllocator {
   }
 
   @Override
-  public void resetLimits() {
-    acct.resetFragmentLimits();
-  }
-
-  @Override
-  public void setLimit(long limit){
+  public void setFragmentLimit(long limit){
     acct.setFragmentLimit(limit);
   }
 
   @Override
-  public long getLimit(){
+  public long getFragmentLimit(){
     return acct.getFragmentLimit();
   }
 
@@ -190,26 +195,30 @@ public class TopLevelAllocator implements BufferAllocator {
 
   private class ChildAllocator implements BufferAllocator {
     private final DrillBuf empty;
-    private AccountorImpl childAcct;
+    private Accountor childAcct;
     private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>();
     private boolean closed = false;
-    private LimitConsumer limitConsumer;
+    private FragmentContext fragmentContext;
     private Map<ChildAllocator, StackTraceElement[]> thisMap;
-    private boolean applyFragmentLimit;
 
-    public ChildAllocator(LimitConsumer limitConsumer,
-        AccountorImpl parentAccountor,
+    public ChildAllocator(FragmentContext context,
+                          Accountor parentAccountor,
                           long max,
                           long pre,
                           Map<ChildAllocator,
                           StackTraceElement[]> map,
         boolean applyFragmentLimit) {
       assert max >= pre;
-      this.applyFragmentLimit = applyFragmentLimit;
-      this.limitConsumer = limitConsumer;
-      childAcct = new AccountorImpl(config, errorOnLeak, limitConsumer, parentAccountor, max, pre, applyFragmentLimit);
+      DrillConfig drillConf = context != null ? context.getConfig() : null;
+      childAcct = new Accountor(drillConf, errorOnLeak, context, parentAccountor, max, pre, applyFragmentLimit);
+      this.fragmentContext=context;
       thisMap = map;
-      empty = DrillBuf.getEmpty(this, childAcct);
+      this.empty = DrillBuf.getEmpty(this, childAcct);
+    }
+
+    @Override
+    public int getId() {
+      return idGenerator.incrementAndGet();
     }
 
     @Override
@@ -218,7 +227,7 @@ public class TopLevelAllocator implements BufferAllocator {
     }
 
     @Override
-    public boolean takeOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
+    public boolean shareOwnership(DrillBuf buf, Pointer<DrillBuf> out) {
       DrillBuf b = new DrillBuf(this, acct, buf);
       out.value = b;
       return acct.transferIn(b, b.capacity());
@@ -227,11 +236,15 @@ public class TopLevelAllocator implements BufferAllocator {
 
     @Override
     public DrillBuf buffer(int size, int max) {
+      if (ENABLE_ACCOUNTING) {
+        injector.injectUnchecked(fragmentContext, CHILD_BUFFER_INJECTION_SITE);
+      }
+
       if (size == 0) {
         return empty;
       }
       if(!childAcct.reserve(size)) {
-        throw new OutOfMemoryException(createErrorMsg(this, size));
+        throw new OutOfMemoryRuntimeException(createErrorMsg(this, size));
       }
 
       try {
@@ -242,13 +255,14 @@ public class TopLevelAllocator implements BufferAllocator {
       } catch (OutOfMemoryError e) {
         if ("Direct buffer memory".equals(e.getMessage())) {
           childAcct.release(size);
-          throw new OutOfMemoryException(createErrorMsg(this, size), e);
+          throw new OutOfMemoryRuntimeException(createErrorMsg(this, size), e);
         } else {
           throw e;
         }
       }
     }
 
+    @Override
     public DrillBuf buffer(int size) {
       return buffer(size, size);
     }
@@ -259,39 +273,40 @@ public class TopLevelAllocator implements BufferAllocator {
     }
 
     @Override
-    public BufferAllocator getChildAllocator(LimitConsumer limitConsumer, long initialReservation,
-        long maximumReservation,
-        boolean applyFragmentLimit) {
+    public BufferAllocator newChildAllocator(AllocatorOwner allocatorOwner,
+        long initialReservation, long maximumReservation, int flags) {
+      return getChildAllocator(allocatorOwner.getFragmentContext(), initialReservation,
+          maximumReservation, (flags & BufferAllocator.F_LIMITING_ROOT) != 0);
+    }
+
+    @Override
+    public BufferAllocator getChildAllocator(FragmentContext context,
+        long initialReservation, long maximumReservation, boolean applyFragmentLimit) {
       if (!childAcct.reserve(initialReservation)) {
-        throw new OutOfMemoryException(
+        throw new OutOfMemoryRuntimeException(
             String
                 .format(
                     "You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.",
                     initialReservation, childAcct.getAvailable()));
       }
       logger.debug("New child allocator with initial reservation {}", initialReservation);
-      ChildAllocator newChildAllocator = new ChildAllocator(limitConsumer, childAcct, maximumReservation,
-          initialReservation, null, applyFragmentLimit);
+      ChildAllocator newChildAllocator = new ChildAllocator(context, childAcct, maximumReservation, initialReservation, null, applyFragmentLimit);
       this.children.put(newChildAllocator, Thread.currentThread().getStackTrace());
       return newChildAllocator;
     }
 
-    public PreAllocator getNewPreAllocator() {
-      return new PreAlloc(this, this.childAcct);
-    }
-
     @Override
-    public void resetLimits(){
-      childAcct.resetFragmentLimits();
+    public AllocationReservation newReservation() {
+      return new PreAlloc(this, this.childAcct);
     }
 
     @Override
-    public void setLimit(long limit){
+    public void setFragmentLimit(long limit){
       childAcct.setFragmentLimit(limit);
     }
 
     @Override
-    public long getLimit(){
+    public long getFragmentLimit(){
       return childAcct.getFragmentLimit();
     }
 
@@ -312,9 +327,10 @@ public class TopLevelAllocator implements BufferAllocator {
             }
 
 
+            final FragmentHandle handle = fragmentContext.getHandle();
             IllegalStateException e = new IllegalStateException(String.format(
-                        "Failure while trying to close child allocator: Child level allocators not closed. Identifier: %s. Stack trace: \n %s",
-                        limitConsumer.getIdentifier(), sb.toString()));
+                    "Failure while trying to close child allocator: Child level allocators not closed. Fragment %d:%d. Stack trace: \n %s",
+                    handle.getMajorFragmentId(), handle.getMinorFragmentId(), sb.toString()));
             if (errorOnLeak) {
               throw e;
             } else {
@@ -345,15 +361,14 @@ public class TopLevelAllocator implements BufferAllocator {
     public DrillBuf getEmpty() {
       return empty;
     }
-
-
   }
 
-  public PreAllocator getNewPreAllocator() {
+  @Override
+  public AllocationReservation newReservation() {
     return new PreAlloc(this, this.acct);
   }
 
-  public class PreAlloc implements PreAllocator{
+  public class PreAlloc extends AllocationReservation {
     int bytes = 0;
     final Accountor acct;
     final BufferAllocator allocator;
@@ -362,25 +377,28 @@ public class TopLevelAllocator implements BufferAllocator {
       this.allocator = allocator;
     }
 
-    /**
-     *
-     */
-    public boolean preAllocate(int bytes) {
-
+    @Override
+    protected boolean reserve(int bytes) {
       if (!acct.reserve(bytes)) {
         return false;
       }
+
       this.bytes += bytes;
       return true;
-
     }
 
-
-    public DrillBuf getAllocation() {
+    @Override
+    protected DrillBuf allocate(int bytes) {
+      assert this.bytes == bytes : "allocation size mismatch";
       DrillBuf b = new DrillBuf(allocator, acct, innerAllocator.directBuffer(bytes, bytes));
       acct.reserved(bytes, b);
       return b;
     }
+
+    @Override
+    protected void releaseReservation(int nBytes) {
+      acct.release(nBytes);
+    }
   }
 
   private static String createErrorMsg(final BufferAllocator allocator, final int size) {