You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:31 UTC

[08/13] drill git commit: DRILL-4134: Allocator Improvements

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
new file mode 100644
index 0000000..b312301
--- /dev/null
+++ b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.common.DrillAutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
+import org.junit.Test;
+
+
+public class TestEndianess {
+
+  @Test
+  public void testLittleEndian() {
+    final BufferAllocator a = new RootAllocator(DrillConfig.getMaxDirectMemory());
+    final ByteBuf b = a.buffer(4);
+    b.setInt(0, 35);
+    assertEquals(b.getByte(0), 35);
+    assertEquals(b.getByte(1), 0);
+    assertEquals(b.getByte(2), 0);
+    assertEquals(b.getByte(3), 0);
+    b.release();
+    DrillAutoCloseables.closeNoChecked(a);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/pom.xml
----------------------------------------------------------------------
diff --git a/exec/memory/impl/pom.xml b/exec/memory/impl/pom.xml
deleted file mode 100644
index 94b9052..0000000
--- a/exec/memory/impl/pom.xml
+++ /dev/null
@@ -1,68 +0,0 @@
-<?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-  license agreements. See the NOTICE file distributed with this work for additional 
-  information regarding copyright ownership. The ASF licenses this file to 
-  You under the Apache License, Version 2.0 (the "License"); you may not use 
-  this file except in compliance with the License. You may obtain a copy of 
-  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-  by applicable law or agreed to in writing, software distributed under the 
-  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-  OF ANY KIND, either express or implied. See the License for the specific 
-  language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>memory-parent</artifactId>
-    <groupId>org.apache.drill.memory</groupId>
-    <version>1.5.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>drill-memory-impl</artifactId>
-  <name>exec/memory/impl</name>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.drill</groupId>
-      <artifactId>drill-protocol</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.drill</groupId>
-      <artifactId>drill-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.drill.memory</groupId>
-      <artifactId>drill-memory-base</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.drill</groupId>
-      <artifactId>drill-common</artifactId>
-      <version>${project.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-    
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <version>2.5.0</version>
-    </dependency>
-    <dependency>
-      <groupId>com.codahale.metrics</groupId>
-      <artifactId>metrics-core</artifactId>
-      <version>3.0.1</version>
-    </dependency>
-
-
-  </dependencies>
-
-
-  <build>
-  </build>
-
-
-
-</project>

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AccountorImpl.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AccountorImpl.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AccountorImpl.java
deleted file mode 100644
index 0ac93e4..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AccountorImpl.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.util.AssertionUtil;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-
-public class AccountorImpl implements Accountor {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AccountorImpl.class);
-
-  private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled();
-
-  public static final String ENABLE_FRAGMENT_MEMORY_LIMIT = "drill.exec.memory.enable_frag_limit";
-  public static final String FRAGMENT_MEM_OVERCOMMIT_FACTOR = "drill.exec.memory.frag_mem_overcommit_factor";
-
-  private final AtomicRemainder remainder;
-  private final long total;
-  private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap();
-  private AccountorImpl parent;
-
-  private final boolean errorOnLeak;
-  // some operators are no subject to the fragment limit. They set the applyFragmentLimit to false
-
-  private final boolean enableFragmentLimit;
-  private final double  fragmentMemOvercommitFactor;
-
-  private final boolean  DEFAULT_ENABLE_FRAGMENT_LIMIT=false;
-  private final double   DEFAULT_FRAGMENT_MEM_OVERCOMMIT_FACTOR=1.5;
-
-  private final boolean applyFragmentLimit;
-
-  private final LimitConsumer limitConsumer;
-  long fragmentLimit;
-
-  private long peakMemoryAllocation = 0;
-
-  // The top level Allocator has an accountor that keeps track of all the LimitConsumers currently executing.
-  // This enables the top level accountor to calculate a new fragment limit whenever necessary.
-  private final List<LimitConsumer> limitConsumers;
-
-  public AccountorImpl(DrillConfig config, boolean errorOnLeak, LimitConsumer context, AccountorImpl parent, long max,
-      long preAllocated, boolean applyFragLimit) {
-    // TODO: fix preallocation stuff
-    this.errorOnLeak = errorOnLeak;
-    AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
-    this.parent = parent;
-
-    boolean enableFragmentLimit;
-    double  fragmentMemOvercommitFactor;
-
-    try {
-      enableFragmentLimit = config.getBoolean(ENABLE_FRAGMENT_MEMORY_LIMIT);
-      fragmentMemOvercommitFactor = config.getDouble(FRAGMENT_MEM_OVERCOMMIT_FACTOR);
-    }catch(Exception e){
-      enableFragmentLimit = DEFAULT_ENABLE_FRAGMENT_LIMIT;
-      fragmentMemOvercommitFactor = DEFAULT_FRAGMENT_MEM_OVERCOMMIT_FACTOR;
-    }
-    this.enableFragmentLimit = enableFragmentLimit;
-    this.fragmentMemOvercommitFactor = fragmentMemOvercommitFactor;
-
-
-    this.applyFragmentLimit=applyFragLimit;
-
-    this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, preAllocated, applyFragmentLimit);
-    this.total = max;
-    this.limitConsumer = context;
-    this.fragmentLimit=this.total; // Allow as much as possible to start with;
-    if (ENABLE_ACCOUNTING) {
-      buffers = Maps.newConcurrentMap();
-    } else {
-      buffers = null;
-    }
-    this.limitConsumers = new ArrayList<LimitConsumer>();
-    if(parent!=null && parent.parent==null){ // Only add the fragment context to the fragment level accountor
-      synchronized(this) {
-        addLimitConsumer(this.limitConsumer);
-      }
-    }
-  }
-
-  public boolean transferTo(Accountor target, DrillBuf buf, long size) {
-    return transfer(target, buf, size, true);
-  }
-
-  public boolean transferIn(DrillBuf buf, long size) {
-    return transfer(this, buf, size, false);
-  }
-
-  private boolean transfer(Accountor target, DrillBuf buf, long size, boolean release) {
-    boolean withinLimit = target.forceAdditionalReservation(size);
-    if(release){
-      release(buf, size);
-    }
-
-    if (ENABLE_ACCOUNTING) {
-      if (target instanceof AccountorImpl) {
-        ((AccountorImpl) target).buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread()
-            .getStackTrace()));
-      }
-    }
-    return withinLimit;
-  }
-
-  public long getAvailable() {
-    if (parent != null) {
-      return Math.min(parent.getAvailable(), getCapacity() - getAllocation());
-    }
-    return getCapacity() - getAllocation();
-  }
-
-  public long getCapacity() {
-    return fragmentLimit;
-  }
-
-  public long getAllocation() {
-    return remainder.getUsed();
-  }
-
-  public long getPeakMemoryAllocation() {
-    return peakMemoryAllocation;
-  }
-
-  public boolean reserve(long size) {
-    boolean status = remainder.get(size, this.applyFragmentLimit);
-    peakMemoryAllocation = Math.max(peakMemoryAllocation, getAllocation());
-    return status;
-  }
-
-  public boolean forceAdditionalReservation(long size) {
-    if (size > 0) {
-      boolean status = remainder.forceGet(size);
-      peakMemoryAllocation = Math.max(peakMemoryAllocation, getAllocation());
-      return status;
-    } else {
-      return true;
-    }
-  }
-
-  public void reserved(long expected, DrillBuf buf) {
-    // make sure to take away the additional memory that happened due to rounding.
-
-    long additional = buf.capacity() - expected;
-    if (additional > 0) {
-      remainder.forceGet(additional);
-    }
-
-    if (ENABLE_ACCOUNTING) {
-      buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread().getStackTrace()));
-    }
-
-    peakMemoryAllocation = Math.max(peakMemoryAllocation, getAllocation());
-  }
-
-
-  public void releasePartial(DrillBuf buf, long size) {
-    remainder.returnAllocation(size);
-    if (ENABLE_ACCOUNTING) {
-      if (buf != null) {
-        DebugStackTrace dst = buffers.get(buf);
-        if (dst == null) {
-          throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf);
-        }
-        dst.size -= size;
-        if (dst.size < 0) {
-          throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf);
-        }
-      }
-    }
-  }
-
-  void release(long size) {
-    remainder.returnAllocation(size);
-  }
-
-  public void release(DrillBuf buf, long size) {
-    remainder.returnAllocation(size);
-    if (ENABLE_ACCOUNTING) {
-      if (buf != null && buffers.remove(buf) == null) {
-        throw new IllegalStateException("Releasing a buffer that has already been released. Buffer: " + buf);
-      }
-    }
-  }
-
-  private void addLimitConsumer(LimitConsumer c) {
-    if (c == null) {
-      return;
-    }
-
-    if (parent != null){
-      parent.addLimitConsumer(c);
-    }else {
-      if(logger.isTraceEnabled()) {
-        String fragStr = c == null ? "[Null Context]" : c.getIdentifier();
-        fragStr+=" (Object Id: "+System.identityHashCode(c)+")";
-        StackTraceElement[] ste = (new Throwable()).getStackTrace();
-        StringBuffer sb = new StringBuffer();
-        for (StackTraceElement s : ste) {
-          sb.append(s.toString());
-          sb.append("\n");
-        }
-
-        logger.trace("Fragment " + fragStr + " added to root accountor.\n"+sb.toString());
-      }
-      synchronized(this) {
-        limitConsumers.add(c);
-      }
-    }
-  }
-
-  private void removeLimitConsumer(LimitConsumer c) {
-    if (c == null) {
-      return;
-    }
-
-    if (parent != null){
-      if (parent.parent==null){
-        // only fragment level allocators will have the fragment context saved
-        parent.removeLimitConsumer(c);
-      }
-    }else{
-      if(logger.isDebugEnabled()) {
-        String fragStr = c == null ? "[Null Context]" : c.getIdentifier();
-        fragStr += " (Object Id: " + System.identityHashCode(c) + ")";
-        logger.trace("Fragment " + fragStr + " removed from root accountor");
-      }
-      synchronized(this) {
-        limitConsumers.remove(c);
-      }
-    }
-  }
-
-  public long resetFragmentLimits(){
-    // returns the new capacity
-    if(!this.enableFragmentLimit){
-      return getCapacity();
-    }
-
-    if(parent!=null){
-      parent.resetFragmentLimits();
-    }else {
-      //Get remaining memory available per fragment and distribute it EQUALLY among all the fragments.
-      //Fragments get the memory limit added to the amount already allocated.
-      //This favours fragments that are already running which will get a limit greater than newly started fragments.
-      //If the already running fragments end quickly, their limits will be assigned back to the remaining fragments
-      //quickly. If they are long running, then we want to favour them with larger limits anyway.
-      synchronized (this) {
-        int nFragments = limitConsumers.size();
-        long allocatedMemory=0;
-        for (LimitConsumer fragment : limitConsumers) {
-          allocatedMemory += fragment.getAllocated();
-        }
-        if(logger.isTraceEnabled()) {
-          logger.trace("Resetting Fragment Memory Limit: total Available memory== "+total
-            +" Total Allocated Memory :"+allocatedMemory
-            +" Number of fragments: "+nFragments
-            + " fragmentMemOvercommitFactor: "+fragmentMemOvercommitFactor
-            + " Root fragment limit: "+this.fragmentLimit + "(Root obj: "+System.identityHashCode(this)+")"
-          );
-        }
-        if(nFragments>0) {
-          long rem = (total - allocatedMemory) / nFragments;
-          for (LimitConsumer fragment : limitConsumers) {
-            fragment.setLimit((long) (rem * fragmentMemOvercommitFactor));
-          }
-        }
-        if(logger.isTraceEnabled() && false){
-          StringBuffer sb= new StringBuffer();
-          sb.append("[root](0:0)");
-          sb.append("Allocated memory: ");
-          sb.append(this.getAllocation());
-          sb.append(" Fragment Limit: ");
-          sb.append(this.getFragmentLimit());
-          logger.trace(sb.toString());
-          for (LimitConsumer fragment : limitConsumers) {
-            sb= new StringBuffer();
-            sb.append('[');
-            sb.append(fragment.getIdentifier());
-            sb.append(']');
-            sb.append("Allocated memory: ");
-            sb.append(fragment.getAllocated());
-            sb.append(" Fragment Limit: ");
-            sb.append(fragment.getLimit());
-            logger.trace(sb.toString());
-          }
-          logger.trace("Resetting Complete");
-        }
-      }
-    }
-    return getCapacity();
-  }
-
-  public void close() {
-    // remove the fragment context and reset fragment limits whenever an allocator closes
-    if (parent != null && parent.parent == null && limitConsumer != null) {
-      logger.debug("Fragment " + limitConsumer.getIdentifier() + "  accountor being closed");
-      removeLimitConsumer(limitConsumer);
-    }
-    resetFragmentLimits();
-
-    if (ENABLE_ACCOUNTING && !buffers.isEmpty()) {
-      StringBuffer sb = new StringBuffer();
-      sb.append("Attempted to close accountor with ");
-      sb.append(buffers.size());
-      sb.append(" buffer(s) still allocated for ");
-      sb.append(limitConsumer.getIdentifier());
-      sb.append(".\n");
-
-      Multimap<DebugStackTrace, DebugStackTrace> multi = LinkedListMultimap.create();
-      for (DebugStackTrace t : buffers.values()) {
-        multi.put(t, t);
-      }
-
-      for (DebugStackTrace entry : multi.keySet()) {
-        Collection<DebugStackTrace> allocs = multi.get(entry);
-
-        sb.append("\n\n\tTotal ");
-        sb.append(allocs.size());
-        sb.append(" allocation(s) of byte size(s): ");
-        for (DebugStackTrace alloc : allocs) {
-          sb.append(alloc.size);
-          sb.append(", ");
-        }
-
-        sb.append("at stack location:\n");
-        entry.addToString(sb);
-      }
-      if (!buffers.isEmpty()) {
-        IllegalStateException e = new IllegalStateException(sb.toString());
-        if (errorOnLeak) {
-          throw e;
-        } else {
-          logger.warn("Memory leaked.", e);
-        }
-      }
-    }
-
-    remainder.close();
-
-  }
-
-  public void setFragmentLimit(long add) {
-    // We ADD the limit to the current allocation. If none has been allocated, this
-    // sets a new limit. If memory has already been allocated, the fragment gets its
-    // limit based on the allocation, though this might still result in reducing the
-    // limit.
-
-    if (parent != null && parent.parent==null) { // This is a fragment level accountor
-      this.fragmentLimit=getAllocation()+add;
-      this.remainder.setLimit(this.fragmentLimit);
-      logger.trace("Fragment " + limitConsumer.getIdentifier() + " memory limit set to " + this.fragmentLimit);
-    }
-  }
-
-  public long getFragmentLimit(){
-    return this.fragmentLimit;
-  }
-
-  public class DebugStackTrace {
-
-    private StackTraceElement[] elements;
-    private long size;
-
-    public DebugStackTrace(long size, StackTraceElement[] elements) {
-      super();
-      this.elements = elements;
-      this.size = size;
-    }
-
-    public void addToString(StringBuffer sb) {
-      for (int i = 3; i < elements.length; i++) {
-        sb.append("\t\t");
-        sb.append(elements[i]);
-        sb.append("\n");
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + Arrays.hashCode(elements);
-//      result = prime * result + (int) (size ^ (size >>> 32));
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (obj == null) {
-        return false;
-      }
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-      DebugStackTrace other = (DebugStackTrace) obj;
-      if (!Arrays.equals(elements, other.elements)) {
-        return false;
-      }
-      // weird equal where size doesn't matter for multimap purposes.
-//      if (size != other.size)
-//        return false;
-      return true;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
deleted file mode 100644
index 4f1a1bd..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-/**
- * Implicitly specifies an allocation policy by providing a factory method to
- * create an enforcement agent.
- *
- * <p>Allocation policies are meant to be global, and may not work properly if
- * different allocators are given different policies. These are designed to
- * be supplied to the root-most allocator only, and then shared with descendant
- * (child) allocators.</p>
- */
-public interface AllocationPolicy {
-  /**
-   * Create an allocation policy enforcement agent. Each newly created allocator should
-   * call this in order to obtain its own agent.
-   *
-   * @return the newly instantiated agent; if an agent's implementation is stateless,
-   *   this may return a sharable singleton
-   */
-  AllocationPolicyAgent newAgent();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
deleted file mode 100644
index ad51ee6..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-/**
- * Per-allocator enforcement agent for allocation policies; created by
- * {@link AllocationPolicy#newAgent()}.
- */
-public interface AllocationPolicyAgent extends AutoCloseable {
-  /**
-   * Checks to see if creating a new allocator using the given specifications
-   * is allowed; should throw an exception if not.
-   *
-   * @param parentAllocator the parent allocator
-   * @param initReservation initial reservation the allocator should have
-   * @param maxAllocation the maximum allocation the allocator will allow
-   * @param flags the allocation option flags
-   * @throws OutOfMemoryException if the new allocator shouldn't be created
-   */
-  void checkNewAllocator(BufferAllocator parentAllocator,
-      long initReservation, long maxAllocation, int flags);
-
-  /**
-   * Get the currently applicable memory limit for the provided allocator.
-   * The interpretation of this value varies with the allocation policy in
-   * use, and each policy should describe what to expect.
-   *
-   * @param bufferAllocator the allocator
-   * @return the memory limit
-   */
-  long getMemoryLimit(BufferAllocator bufferAllocator);
-
-  /**
-   * Initialize the agent for a newly created allocator. Should be called from
-   * the allocator's constructor to initialize the agent for the allocator.
-   *
-   * @param bufferAllocator the newly created allocator.
-   */
-  void initializeAllocator(BufferAllocator bufferAllocator);
-
-  /**
-   * Indicate if any available memory owned by this allocator should
-   * be released to its parent. Allocators may use this to limit the
-   * amount of unused memory they retain for future requests; agents may
-   * request that memory be returned if there is currently a high demand
-   * for memory that other allocators could use if this allocator
-   * doesn't need it.
-   *
-   * @param bufferAllocator
-   * @return true if available memory owned by this allocator should be given
-   *   back to its parent
-   */
-  boolean shouldReleaseToParent(BufferAllocator bufferAllocator);
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
deleted file mode 100644
index 1803572..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import com.google.common.base.Preconditions;
-
-import io.netty.buffer.DrillBuf;
-
-/**
- * Supports cumulative allocation reservation. Clients may increase the size of
- * the reservation repeatedly until they call for an allocation of the current
- * total size. The reservation can only be used once, and will throw an exception
- * if it is used more than once.
- *
- * <p>For the purposes of airtight memory accounting, the reservation must be close()d
- * whether it is used or not.
- */
-public abstract class AllocationReservation implements AutoCloseable {
-  private int nBytes = 0;
-  private boolean used = false;
-  private boolean closed = false;
-
-  /**
-   * Constructor. Prevent construction except by derived classes.
-   *
-   * <p>The expectation is that the derived class will be a non-static inner
-   * class in an allocator.
-   */
-  protected AllocationReservation() {
-  }
-
-  /**
-   * Add to the current reservation.
-   *
-   * <p>Adding may fail if the allocator is not allowed to consume any more space.
-   *
-   * @param nBytes the number of bytes to add
-   * @return true if the addition is possible, false otherwise
-   * @throws IllegalStateException if called after buffer() is used to allocate the reservation
-   */
-  public boolean add(final int nBytes) {
-    Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
-    Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed");
-    Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used");
-
-    if (!reserve(nBytes)) {
-      return false;
-    }
-
-    this.nBytes += nBytes;
-    return true;
-  }
-
-  /**
-   * Requests a reservation of additional space.
-   *
-   * <p>The implementation of the allocator's inner class provides this.
-   *
-   * @param nBytes the amount to reserve
-   * @return true if the reservation can be satisfied, false otherwise
-   */
-  protected abstract boolean reserve(int nBytes);
-
-  /**
-   * Allocate a buffer whose size is the total of all the add()s made.
-   *
-   * <p>The allocation request can still fail, even if the amount of space
-   * requested is available, if the allocation cannot be made contiguously.
-   *
-   * @return the buffer, or null, if the request cannot be satisfied
-   * @throws IllegalStateException if called called more than once
-   */
-  public DrillBuf buffer() {
-    Preconditions.checkState(!closed, "Attempt to allocate after closed");
-    Preconditions.checkState(!used, "Attempt to allocate more than once");
-
-    final DrillBuf drillBuf = allocate(nBytes);
-    used = true;
-    return drillBuf;
-  }
-
-  /**
-   * Allocate the a buffer of the requested size.
-   *
-   * <p>The implementation of the allocator's inner class provides this.
-   *
-   * @param nBytes the size of the buffer requested
-   * @return the buffer, or null, if the request cannot be satisfied
-   */
-  protected abstract DrillBuf allocate(int nBytes);
-
-  @Override
-  public void close() {
-    if (closed) {
-      return;
-    }
-    if (!used) {
-      releaseReservation(nBytes);
-    }
-
-    closed = true;
-  }
-
-  /**
-   * Return the reservation back to the allocator without having used it.
-   *
-   * @param nBytes the size of the reservation
-   */
-  protected abstract void releaseReservation(int nBytes);
-
-  /**
-   * Get the current size of the reservation (the sum of all the add()s).
-   *
-   * @return size of the current reservation
-   */
-  public int getSize() {
-    return nBytes;
-  }
-
-  /**
-   * Return whether or not the reservation has been used.
-   *
-   * @return whether or not the reservation has been used
-   */
-  public boolean isUsed() {
-    return used;
-  }
-
-  /**
-   * Return whether or not the reservation has been closed.
-   *
-   * @return whether or not the reservation has been closed
-   */
-  public boolean isClosed() {
-    return closed;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
deleted file mode 100644
index 8bf2a99..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-/**
- * Exception thrown when a closed BufferAllocator is used. Note
- * this is an unchecked exception.
- *
- * @param message string associated with the cause
- */
-@SuppressWarnings("serial")
-public class AllocatorClosedException extends RuntimeException {
-  public AllocatorClosedException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
deleted file mode 100644
index f2d3df9..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-/**
- * This interface provides a means for allocator owners to inject services
- * required by allocators, as well as to identify themselves for debugging purposes.
- * Identification is done by overriding the implementation of
- * {#link {@link Object#toString()}.
- */
-public interface AllocatorOwner {
-  /**
-   * Get the current ExecutionControls from the allocator's owner.
-   *
-   * @return the current execution controls; may return null if this isn't
-   *   possible
-   */
-  ExecutionControls getExecutionControls();
-
-  @Deprecated // Only for TopLevelAllocator and its friends.
-  FragmentContext getFragmentContext();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
deleted file mode 100644
index 00d8c4f..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-/**
- * JMX bean interface for global allocator statistics.
- */
-// TODO use Stats infrastructure instead of JMX beans
-public interface AllocatorsStatsMXBean {
-  /**
-   * Get the maximum amount of direct memory that can be used.
-   *
-   * <p>This is determined by what is available, or by the drillbit
-   * configuration, if it specifies a value.</p>
-   *
-   * @return the amount of direct memory that can be used
-   */
-  public long getMaxDirectMemory();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
deleted file mode 100644
index 0f6b8b0..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- *
- *
- * TODO: Fix this so that preallocation can never be released back to general pool until allocator is closed.
- */
-public class AtomicRemainder {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AtomicRemainder.class);
-
-  private final AtomicRemainder parent;
-  private final AtomicLong availableShared;
-  private final AtomicLong availablePrivate;
-  private final long initTotal;
-  private final long initShared;
-  private final long initPrivate;
-  private long limit;       // An Allocator can set a variable limit less than or equal to the initTotal
-  private boolean hasLimit; // True for Atomic Remainders associated with a Fragment. May be true for Operator Level allocators some day.
-  private boolean closed = false;
-  private final boolean errorOnLeak;
-  private final boolean applyFragmentLimit;
-
-  public AtomicRemainder(boolean errorOnLeak, AtomicRemainder parent, long max, long pre, boolean applyFragLimit) {
-    this.errorOnLeak = errorOnLeak;
-    this.parent = parent;
-    this.availableShared = new AtomicLong(max - pre);
-    this.availablePrivate = new AtomicLong(pre);
-    this.initTotal = max;
-    this.initShared = max - pre;
-    this.initPrivate = pre;
-    this.limit = max;
-    this.hasLimit=false;
-    this.applyFragmentLimit=applyFragLimit; // If this is an operator that is exempt from the fragment limit, set this to false.
-//    logger.info("new AtomicRemainder. a.s. {} a.p. {} hashcode {}", availableShared, availablePrivate, hashCode(), new Exception());
-  }
-
-  public long getRemainder() {
-    return availableShared.get() + availablePrivate.get();
-  }
-
-  public long getUsed() {
-    return initTotal - getRemainder();
-  }
-
-  /**
-   * Allow an allocator to constrain the remainder to a particular limit that is lower than the initTotal.
-   * If limit is larger than initTotal, then the function will do nothing and the hasLimit flag will not be set.
-   * @param limit new remainder limit
-   */
-  public void setLimit(long limit) {
-    if(limit<initTotal){
-      this.hasLimit=true;
-      this.limit=limit;
-    }
-
-  }
-  /**
-   * Automatically allocate memory. This is used when an actual allocation happened to be larger than requested, or when
-   * a buffer has it's ownership passed to another allocator.<br>
-   * This memory has already been used up so it must be accurately accounted for in future allocations.
-   *
-   * @param size extra allocated memory that needs to be accounted for
-   */
-  public boolean forceGet(long size) {
-    if (get(size, this.applyFragmentLimit)) {
-      return true;
-    } else {
-      availableShared.addAndGet(-size);
-      if (parent != null) {
-        parent.forceGet(size);
-      }
-      return false;
-    }
-  }
-
-  public boolean get(long size, boolean applyFragmentLimitForChild) {
-    if (availablePrivate.get() < 1) {
-      // if there is no preallocated memory, we can operate normally.
-
-      // if there is a parent allocator, check it before allocating.
-      if (parent != null && !parent.get(size, this.applyFragmentLimit)) {
-        return false;
-      }
-
-      // If we need to allocate memory beyond the allowed Fragment Limit
-      if(applyFragmentLimitForChild && this.applyFragmentLimit && this.hasLimit && (getUsed()+size > this.limit)){
-        if (parent != null) {
-          parent.returnAllocation(size);
-        }
-        StackTraceElement[] ste = (new Throwable()).getStackTrace();
-        StringBuilder sb = new StringBuilder();
-        for (StackTraceElement s : ste) {
-          sb.append(s.toString());
-          sb.append("\n");
-        }
-        logger.warn("No more memory. Fragment limit ({} bytes) reached. Trying to allocate {} bytes. {} bytes " +
-          "already allocated.\n{}", limit, size, getUsed(), sb.toString());
-        return false;
-      }
-
-      // attempt to get shared memory, if fails, return false.
-      long outcome = availableShared.addAndGet(-size);
-//      assert outcome <= initShared;
-      if (outcome < 0) {
-        availableShared.addAndGet(size);
-        if (parent != null) {
-          parent.returnAllocation(size);
-        }
-        return false;
-      } else {
-//        if (DEBUG)
-//          logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
-        return true;
-      }
-
-    } else {
-      // if there is preallocated memory, use that first.
-      long unaccount = availablePrivate.addAndGet(-size);
-      if (unaccount >= 0) {
-//        if (DEBUG)
-//          logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
-        return true;
-      } else {
-
-        long additionalSpaceNeeded = -unaccount;
-        // if there is a parent allocator, check it before allocating.
-        if (parent != null && !parent.get(additionalSpaceNeeded, this.applyFragmentLimit)) {
-          // parent allocation failed, return space to private pool.
-          availablePrivate.getAndAdd(size);
-          return false;
-        }
-
-        // we got space from parent pool. lets make sure we have space locally available.
-        long account = availableShared.addAndGet(-additionalSpaceNeeded);
-        if (account >= 0) {
-          // we were succesful, move private back to zero (since we allocated using shared).
-          availablePrivate.addAndGet(additionalSpaceNeeded);
-//          if (DEBUG)
-//            logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
-          return true;
-        } else {
-          // we failed to get space from available shared. Return allocations to initial state.
-          availablePrivate.addAndGet(size);
-          availableShared.addAndGet(additionalSpaceNeeded);
-          if (parent != null) {
-            parent.returnAllocation(additionalSpaceNeeded);
-          }
-          return false;
-        }
-      }
-
-    }
-
-  }
-
-  /**
-   * Return the memory accounting to the allocation pool. Make sure to first maintain hold of the preallocated memory.
-   *
-   * @param size amount of memory returned
-   */
-  public void returnAllocation(long size) {
-    long privateSize = availablePrivate.get();
-    long privateChange = Math.min(size, initPrivate - privateSize);
-    long sharedChange = size - privateChange;
-    availablePrivate.addAndGet(privateChange);
-    availableShared.addAndGet(sharedChange);
-//    if (DEBUG)
-//      logger.info("Return allocation {}, a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
-    if (parent != null) {
-      parent.returnAllocation(sharedChange);
-    }
-  }
-
-  public void close() {
-    if (closed) {
-      logger.warn("Tried to close remainder, but it has already been closed", new Exception());
-      return;
-    }
-    if (availablePrivate.get() != initPrivate || availableShared.get() != initShared) {
-      IllegalStateException e = new IllegalStateException(
-          String
-              .format(ERROR, initPrivate, availablePrivate.get(), initPrivate - availablePrivate.get(), initShared, availableShared.get(), initShared - availableShared.get()));
-      if (errorOnLeak) {
-        throw e;
-      } else {
-        logger.warn("Memory leaked during query.", e);
-      }
-    }
-    if (parent != null) {
-      parent.returnAllocation(initPrivate);
-    }
-    closed = true;
-  }
-
-  static final String ERROR = "Failure while closing accountor.  Expected private and shared pools to be set to initial values.  However, one or more were not.  Stats are\n\tzone\tinit\tallocated\tdelta \n\tprivate\t%d\t%d\t%d \n\tshared\t%d\t%d\t%d.";
-}