You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:31 UTC
[08/13] drill git commit: DRILL-4134: Allocator Improvements
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
new file mode 100644
index 0000000..b312301
--- /dev/null
+++ b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.common.DrillAutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
+import org.junit.Test;
+
+
+public class TestEndianess {
+
+ @Test
+ public void testLittleEndian() {
+ final BufferAllocator a = new RootAllocator(DrillConfig.getMaxDirectMemory());
+ final ByteBuf b = a.buffer(4);
+ b.setInt(0, 35);
+ assertEquals(b.getByte(0), 35);
+ assertEquals(b.getByte(1), 0);
+ assertEquals(b.getByte(2), 0);
+ assertEquals(b.getByte(3), 0);
+ b.release();
+ DrillAutoCloseables.closeNoChecked(a);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/pom.xml
----------------------------------------------------------------------
diff --git a/exec/memory/impl/pom.xml b/exec/memory/impl/pom.xml
deleted file mode 100644
index 94b9052..0000000
--- a/exec/memory/impl/pom.xml
+++ /dev/null
@@ -1,68 +0,0 @@
-<?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- You under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>memory-parent</artifactId>
- <groupId>org.apache.drill.memory</groupId>
- <version>1.5.0-SNAPSHOT</version>
- </parent>
- <artifactId>drill-memory-impl</artifactId>
- <name>exec/memory/impl</name>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.drill</groupId>
- <artifactId>drill-protocol</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.drill</groupId>
- <artifactId>drill-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.drill.memory</groupId>
- <artifactId>drill-memory-base</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.drill</groupId>
- <artifactId>drill-common</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>2.5.0</version>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>3.0.1</version>
- </dependency>
-
-
- </dependencies>
-
-
- <build>
- </build>
-
-
-
-</project>
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AccountorImpl.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AccountorImpl.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AccountorImpl.java
deleted file mode 100644
index 0ac93e4..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AccountorImpl.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.util.AssertionUtil;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-
-public class AccountorImpl implements Accountor {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AccountorImpl.class);
-
- private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled();
-
- public static final String ENABLE_FRAGMENT_MEMORY_LIMIT = "drill.exec.memory.enable_frag_limit";
- public static final String FRAGMENT_MEM_OVERCOMMIT_FACTOR = "drill.exec.memory.frag_mem_overcommit_factor";
-
- private final AtomicRemainder remainder;
- private final long total;
- private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap();
- private AccountorImpl parent;
-
- private final boolean errorOnLeak;
- // some operators are no subject to the fragment limit. They set the applyFragmentLimit to false
-
- private final boolean enableFragmentLimit;
- private final double fragmentMemOvercommitFactor;
-
- private final boolean DEFAULT_ENABLE_FRAGMENT_LIMIT=false;
- private final double DEFAULT_FRAGMENT_MEM_OVERCOMMIT_FACTOR=1.5;
-
- private final boolean applyFragmentLimit;
-
- private final LimitConsumer limitConsumer;
- long fragmentLimit;
-
- private long peakMemoryAllocation = 0;
-
- // The top level Allocator has an accountor that keeps track of all the LimitConsumers currently executing.
- // This enables the top level accountor to calculate a new fragment limit whenever necessary.
- private final List<LimitConsumer> limitConsumers;
-
- public AccountorImpl(DrillConfig config, boolean errorOnLeak, LimitConsumer context, AccountorImpl parent, long max,
- long preAllocated, boolean applyFragLimit) {
- // TODO: fix preallocation stuff
- this.errorOnLeak = errorOnLeak;
- AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
- this.parent = parent;
-
- boolean enableFragmentLimit;
- double fragmentMemOvercommitFactor;
-
- try {
- enableFragmentLimit = config.getBoolean(ENABLE_FRAGMENT_MEMORY_LIMIT);
- fragmentMemOvercommitFactor = config.getDouble(FRAGMENT_MEM_OVERCOMMIT_FACTOR);
- }catch(Exception e){
- enableFragmentLimit = DEFAULT_ENABLE_FRAGMENT_LIMIT;
- fragmentMemOvercommitFactor = DEFAULT_FRAGMENT_MEM_OVERCOMMIT_FACTOR;
- }
- this.enableFragmentLimit = enableFragmentLimit;
- this.fragmentMemOvercommitFactor = fragmentMemOvercommitFactor;
-
-
- this.applyFragmentLimit=applyFragLimit;
-
- this.remainder = new AtomicRemainder(errorOnLeak, parentRemainder, max, preAllocated, applyFragmentLimit);
- this.total = max;
- this.limitConsumer = context;
- this.fragmentLimit=this.total; // Allow as much as possible to start with;
- if (ENABLE_ACCOUNTING) {
- buffers = Maps.newConcurrentMap();
- } else {
- buffers = null;
- }
- this.limitConsumers = new ArrayList<LimitConsumer>();
- if(parent!=null && parent.parent==null){ // Only add the fragment context to the fragment level accountor
- synchronized(this) {
- addLimitConsumer(this.limitConsumer);
- }
- }
- }
-
- public boolean transferTo(Accountor target, DrillBuf buf, long size) {
- return transfer(target, buf, size, true);
- }
-
- public boolean transferIn(DrillBuf buf, long size) {
- return transfer(this, buf, size, false);
- }
-
- private boolean transfer(Accountor target, DrillBuf buf, long size, boolean release) {
- boolean withinLimit = target.forceAdditionalReservation(size);
- if(release){
- release(buf, size);
- }
-
- if (ENABLE_ACCOUNTING) {
- if (target instanceof AccountorImpl) {
- ((AccountorImpl) target).buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread()
- .getStackTrace()));
- }
- }
- return withinLimit;
- }
-
- public long getAvailable() {
- if (parent != null) {
- return Math.min(parent.getAvailable(), getCapacity() - getAllocation());
- }
- return getCapacity() - getAllocation();
- }
-
- public long getCapacity() {
- return fragmentLimit;
- }
-
- public long getAllocation() {
- return remainder.getUsed();
- }
-
- public long getPeakMemoryAllocation() {
- return peakMemoryAllocation;
- }
-
- public boolean reserve(long size) {
- boolean status = remainder.get(size, this.applyFragmentLimit);
- peakMemoryAllocation = Math.max(peakMemoryAllocation, getAllocation());
- return status;
- }
-
- public boolean forceAdditionalReservation(long size) {
- if (size > 0) {
- boolean status = remainder.forceGet(size);
- peakMemoryAllocation = Math.max(peakMemoryAllocation, getAllocation());
- return status;
- } else {
- return true;
- }
- }
-
- public void reserved(long expected, DrillBuf buf) {
- // make sure to take away the additional memory that happened due to rounding.
-
- long additional = buf.capacity() - expected;
- if (additional > 0) {
- remainder.forceGet(additional);
- }
-
- if (ENABLE_ACCOUNTING) {
- buffers.put(buf, new DebugStackTrace(buf.capacity(), Thread.currentThread().getStackTrace()));
- }
-
- peakMemoryAllocation = Math.max(peakMemoryAllocation, getAllocation());
- }
-
-
- public void releasePartial(DrillBuf buf, long size) {
- remainder.returnAllocation(size);
- if (ENABLE_ACCOUNTING) {
- if (buf != null) {
- DebugStackTrace dst = buffers.get(buf);
- if (dst == null) {
- throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf);
- }
- dst.size -= size;
- if (dst.size < 0) {
- throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf);
- }
- }
- }
- }
-
- void release(long size) {
- remainder.returnAllocation(size);
- }
-
- public void release(DrillBuf buf, long size) {
- remainder.returnAllocation(size);
- if (ENABLE_ACCOUNTING) {
- if (buf != null && buffers.remove(buf) == null) {
- throw new IllegalStateException("Releasing a buffer that has already been released. Buffer: " + buf);
- }
- }
- }
-
- private void addLimitConsumer(LimitConsumer c) {
- if (c == null) {
- return;
- }
-
- if (parent != null){
- parent.addLimitConsumer(c);
- }else {
- if(logger.isTraceEnabled()) {
- String fragStr = c == null ? "[Null Context]" : c.getIdentifier();
- fragStr+=" (Object Id: "+System.identityHashCode(c)+")";
- StackTraceElement[] ste = (new Throwable()).getStackTrace();
- StringBuffer sb = new StringBuffer();
- for (StackTraceElement s : ste) {
- sb.append(s.toString());
- sb.append("\n");
- }
-
- logger.trace("Fragment " + fragStr + " added to root accountor.\n"+sb.toString());
- }
- synchronized(this) {
- limitConsumers.add(c);
- }
- }
- }
-
- private void removeLimitConsumer(LimitConsumer c) {
- if (c == null) {
- return;
- }
-
- if (parent != null){
- if (parent.parent==null){
- // only fragment level allocators will have the fragment context saved
- parent.removeLimitConsumer(c);
- }
- }else{
- if(logger.isDebugEnabled()) {
- String fragStr = c == null ? "[Null Context]" : c.getIdentifier();
- fragStr += " (Object Id: " + System.identityHashCode(c) + ")";
- logger.trace("Fragment " + fragStr + " removed from root accountor");
- }
- synchronized(this) {
- limitConsumers.remove(c);
- }
- }
- }
-
- public long resetFragmentLimits(){
- // returns the new capacity
- if(!this.enableFragmentLimit){
- return getCapacity();
- }
-
- if(parent!=null){
- parent.resetFragmentLimits();
- }else {
- //Get remaining memory available per fragment and distribute it EQUALLY among all the fragments.
- //Fragments get the memory limit added to the amount already allocated.
- //This favours fragments that are already running which will get a limit greater than newly started fragments.
- //If the already running fragments end quickly, their limits will be assigned back to the remaining fragments
- //quickly. If they are long running, then we want to favour them with larger limits anyway.
- synchronized (this) {
- int nFragments = limitConsumers.size();
- long allocatedMemory=0;
- for (LimitConsumer fragment : limitConsumers) {
- allocatedMemory += fragment.getAllocated();
- }
- if(logger.isTraceEnabled()) {
- logger.trace("Resetting Fragment Memory Limit: total Available memory== "+total
- +" Total Allocated Memory :"+allocatedMemory
- +" Number of fragments: "+nFragments
- + " fragmentMemOvercommitFactor: "+fragmentMemOvercommitFactor
- + " Root fragment limit: "+this.fragmentLimit + "(Root obj: "+System.identityHashCode(this)+")"
- );
- }
- if(nFragments>0) {
- long rem = (total - allocatedMemory) / nFragments;
- for (LimitConsumer fragment : limitConsumers) {
- fragment.setLimit((long) (rem * fragmentMemOvercommitFactor));
- }
- }
- if(logger.isTraceEnabled() && false){
- StringBuffer sb= new StringBuffer();
- sb.append("[root](0:0)");
- sb.append("Allocated memory: ");
- sb.append(this.getAllocation());
- sb.append(" Fragment Limit: ");
- sb.append(this.getFragmentLimit());
- logger.trace(sb.toString());
- for (LimitConsumer fragment : limitConsumers) {
- sb= new StringBuffer();
- sb.append('[');
- sb.append(fragment.getIdentifier());
- sb.append(']');
- sb.append("Allocated memory: ");
- sb.append(fragment.getAllocated());
- sb.append(" Fragment Limit: ");
- sb.append(fragment.getLimit());
- logger.trace(sb.toString());
- }
- logger.trace("Resetting Complete");
- }
- }
- }
- return getCapacity();
- }
-
- public void close() {
- // remove the fragment context and reset fragment limits whenever an allocator closes
- if (parent != null && parent.parent == null && limitConsumer != null) {
- logger.debug("Fragment " + limitConsumer.getIdentifier() + " accountor being closed");
- removeLimitConsumer(limitConsumer);
- }
- resetFragmentLimits();
-
- if (ENABLE_ACCOUNTING && !buffers.isEmpty()) {
- StringBuffer sb = new StringBuffer();
- sb.append("Attempted to close accountor with ");
- sb.append(buffers.size());
- sb.append(" buffer(s) still allocated for ");
- sb.append(limitConsumer.getIdentifier());
- sb.append(".\n");
-
- Multimap<DebugStackTrace, DebugStackTrace> multi = LinkedListMultimap.create();
- for (DebugStackTrace t : buffers.values()) {
- multi.put(t, t);
- }
-
- for (DebugStackTrace entry : multi.keySet()) {
- Collection<DebugStackTrace> allocs = multi.get(entry);
-
- sb.append("\n\n\tTotal ");
- sb.append(allocs.size());
- sb.append(" allocation(s) of byte size(s): ");
- for (DebugStackTrace alloc : allocs) {
- sb.append(alloc.size);
- sb.append(", ");
- }
-
- sb.append("at stack location:\n");
- entry.addToString(sb);
- }
- if (!buffers.isEmpty()) {
- IllegalStateException e = new IllegalStateException(sb.toString());
- if (errorOnLeak) {
- throw e;
- } else {
- logger.warn("Memory leaked.", e);
- }
- }
- }
-
- remainder.close();
-
- }
-
- public void setFragmentLimit(long add) {
- // We ADD the limit to the current allocation. If none has been allocated, this
- // sets a new limit. If memory has already been allocated, the fragment gets its
- // limit based on the allocation, though this might still result in reducing the
- // limit.
-
- if (parent != null && parent.parent==null) { // This is a fragment level accountor
- this.fragmentLimit=getAllocation()+add;
- this.remainder.setLimit(this.fragmentLimit);
- logger.trace("Fragment " + limitConsumer.getIdentifier() + " memory limit set to " + this.fragmentLimit);
- }
- }
-
- public long getFragmentLimit(){
- return this.fragmentLimit;
- }
-
- public class DebugStackTrace {
-
- private StackTraceElement[] elements;
- private long size;
-
- public DebugStackTrace(long size, StackTraceElement[] elements) {
- super();
- this.elements = elements;
- this.size = size;
- }
-
- public void addToString(StringBuffer sb) {
- for (int i = 3; i < elements.length; i++) {
- sb.append("\t\t");
- sb.append(elements[i]);
- sb.append("\n");
- }
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + Arrays.hashCode(elements);
-// result = prime * result + (int) (size ^ (size >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- DebugStackTrace other = (DebugStackTrace) obj;
- if (!Arrays.equals(elements, other.elements)) {
- return false;
- }
- // weird equal where size doesn't matter for multimap purposes.
-// if (size != other.size)
-// return false;
- return true;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
deleted file mode 100644
index 4f1a1bd..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicy.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-/**
- * Implicitly specifies an allocation policy by providing a factory method to
- * create an enforcement agent.
- *
- * <p>Allocation policies are meant to be global, and may not work properly if
- * different allocators are given different policies. These are designed to
- * be supplied to the root-most allocator only, and then shared with descendant
- * (child) allocators.</p>
- */
-public interface AllocationPolicy {
- /**
- * Create an allocation policy enforcement agent. Each newly created allocator should
- * call this in order to obtain its own agent.
- *
- * @return the newly instantiated agent; if an agent's implementation is stateless,
- * this may return a sharable singleton
- */
- AllocationPolicyAgent newAgent();
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
deleted file mode 100644
index ad51ee6..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationPolicyAgent.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-/**
- * Per-allocator enforcement agent for allocation policies; created by
- * {@link AllocationPolicy#newAgent()}.
- */
-public interface AllocationPolicyAgent extends AutoCloseable {
- /**
- * Checks to see if creating a new allocator using the given specifications
- * is allowed; should throw an exception if not.
- *
- * @param parentAllocator the parent allocator
- * @param initReservation initial reservation the allocator should have
- * @param maxAllocation the maximum allocation the allocator will allow
- * @param flags the allocation option flags
- * @throws OutOfMemoryException if the new allocator shouldn't be created
- */
- void checkNewAllocator(BufferAllocator parentAllocator,
- long initReservation, long maxAllocation, int flags);
-
- /**
- * Get the currently applicable memory limit for the provided allocator.
- * The interpretation of this value varies with the allocation policy in
- * use, and each policy should describe what to expect.
- *
- * @param bufferAllocator the allocator
- * @return the memory limit
- */
- long getMemoryLimit(BufferAllocator bufferAllocator);
-
- /**
- * Initialize the agent for a newly created allocator. Should be called from
- * the allocator's constructor to initialize the agent for the allocator.
- *
- * @param bufferAllocator the newly created allocator.
- */
- void initializeAllocator(BufferAllocator bufferAllocator);
-
- /**
- * Indicate if any available memory owned by this allocator should
- * be released to its parent. Allocators may use this to limit the
- * amount of unused memory they retain for future requests; agents may
- * request that memory be returned if there is currently a high demand
- * for memory that other allocators could use if this allocator
- * doesn't need it.
- *
- * @param bufferAllocator
- * @return true if available memory owned by this allocator should be given
- * back to its parent
- */
- boolean shouldReleaseToParent(BufferAllocator bufferAllocator);
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
deleted file mode 100644
index 1803572..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocationReservation.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import com.google.common.base.Preconditions;
-
-import io.netty.buffer.DrillBuf;
-
-/**
- * Supports cumulative allocation reservation. Clients may increase the size of
- * the reservation repeatedly until they call for an allocation of the current
- * total size. The reservation can only be used once, and will throw an exception
- * if it is used more than once.
- *
- * <p>For the purposes of airtight memory accounting, the reservation must be close()d
- * whether it is used or not.
- */
-public abstract class AllocationReservation implements AutoCloseable {
- private int nBytes = 0;
- private boolean used = false;
- private boolean closed = false;
-
- /**
- * Constructor. Prevent construction except by derived classes.
- *
- * <p>The expectation is that the derived class will be a non-static inner
- * class in an allocator.
- */
- protected AllocationReservation() {
- }
-
- /**
- * Add to the current reservation.
- *
- * <p>Adding may fail if the allocator is not allowed to consume any more space.
- *
- * @param nBytes the number of bytes to add
- * @return true if the addition is possible, false otherwise
- * @throws IllegalStateException if called after buffer() is used to allocate the reservation
- */
- public boolean add(final int nBytes) {
- Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
- Preconditions.checkState(!closed, "Attempt to increase reservation after reservation has been closed");
- Preconditions.checkState(!used, "Attempt to increase reservation after reservation has been used");
-
- if (!reserve(nBytes)) {
- return false;
- }
-
- this.nBytes += nBytes;
- return true;
- }
-
- /**
- * Requests a reservation of additional space.
- *
- * <p>The implementation of the allocator's inner class provides this.
- *
- * @param nBytes the amount to reserve
- * @return true if the reservation can be satisfied, false otherwise
- */
- protected abstract boolean reserve(int nBytes);
-
- /**
- * Allocate a buffer whose size is the total of all the add()s made.
- *
- * <p>The allocation request can still fail, even if the amount of space
- * requested is available, if the allocation cannot be made contiguously.
- *
- * @return the buffer, or null, if the request cannot be satisfied
- * @throws IllegalStateException if called called more than once
- */
- public DrillBuf buffer() {
- Preconditions.checkState(!closed, "Attempt to allocate after closed");
- Preconditions.checkState(!used, "Attempt to allocate more than once");
-
- final DrillBuf drillBuf = allocate(nBytes);
- used = true;
- return drillBuf;
- }
-
- /**
- * Allocate the a buffer of the requested size.
- *
- * <p>The implementation of the allocator's inner class provides this.
- *
- * @param nBytes the size of the buffer requested
- * @return the buffer, or null, if the request cannot be satisfied
- */
- protected abstract DrillBuf allocate(int nBytes);
-
- @Override
- public void close() {
- if (closed) {
- return;
- }
- if (!used) {
- releaseReservation(nBytes);
- }
-
- closed = true;
- }
-
- /**
- * Return the reservation back to the allocator without having used it.
- *
- * @param nBytes the size of the reservation
- */
- protected abstract void releaseReservation(int nBytes);
-
- /**
- * Get the current size of the reservation (the sum of all the add()s).
- *
- * @return size of the current reservation
- */
- public int getSize() {
- return nBytes;
- }
-
- /**
- * Return whether or not the reservation has been used.
- *
- * @return whether or not the reservation has been used
- */
- public boolean isUsed() {
- return used;
- }
-
- /**
- * Return whether or not the reservation has been closed.
- *
- * @return whether or not the reservation has been closed
- */
- public boolean isClosed() {
- return closed;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
deleted file mode 100644
index 8bf2a99..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorClosedException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-/**
- * Exception thrown when a closed BufferAllocator is used. Note
- * this is an unchecked exception.
- *
- * @param message string associated with the cause
- */
-@SuppressWarnings("serial")
-public class AllocatorClosedException extends RuntimeException {
- public AllocatorClosedException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
deleted file mode 100644
index f2d3df9..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorOwner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.testing.ExecutionControls;
-
-/**
- * This interface provides a means for allocator owners to inject services
- * required by allocators, as well as to identify themselves for debugging purposes.
- * Identification is done by overriding the implementation of
- * {#link {@link Object#toString()}.
- */
-public interface AllocatorOwner {
- /**
- * Get the current ExecutionControls from the allocator's owner.
- *
- * @return the current execution controls; may return null if this isn't
- * possible
- */
- ExecutionControls getExecutionControls();
-
- @Deprecated // Only for TopLevelAllocator and its friends.
- FragmentContext getFragmentContext();
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
deleted file mode 100644
index 00d8c4f..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AllocatorsStatsMXBean.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-/**
- * JMX bean interface for global allocator statistics.
- */
-// TODO use Stats infrastructure instead of JMX beans
-public interface AllocatorsStatsMXBean {
- /**
- * Get the maximum amount of direct memory that can be used.
- *
- * <p>This is determined by what is available, or by the drillbit
- * configuration, if it specifies a value.</p>
- *
- * @return the amount of direct memory that can be used
- */
- public long getMaxDirectMemory();
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
----------------------------------------------------------------------
diff --git a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
deleted file mode 100644
index 0f6b8b0..0000000
--- a/exec/memory/impl/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.memory;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- *
- *
- * TODO: Fix this so that preallocation can never be released back to general pool until allocator is closed.
- */
-public class AtomicRemainder {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AtomicRemainder.class);
-
- private final AtomicRemainder parent;
- private final AtomicLong availableShared;
- private final AtomicLong availablePrivate;
- private final long initTotal;
- private final long initShared;
- private final long initPrivate;
- private long limit; // An Allocator can set a variable limit less than or equal to the initTotal
- private boolean hasLimit; // True for Atomic Remainders associated with a Fragment. May be true for Operator Level allocators some day.
- private boolean closed = false;
- private final boolean errorOnLeak;
- private final boolean applyFragmentLimit;
-
- public AtomicRemainder(boolean errorOnLeak, AtomicRemainder parent, long max, long pre, boolean applyFragLimit) {
- this.errorOnLeak = errorOnLeak;
- this.parent = parent;
- this.availableShared = new AtomicLong(max - pre);
- this.availablePrivate = new AtomicLong(pre);
- this.initTotal = max;
- this.initShared = max - pre;
- this.initPrivate = pre;
- this.limit = max;
- this.hasLimit=false;
- this.applyFragmentLimit=applyFragLimit; // If this is an operator that is exempt from the fragment limit, set this to false.
-// logger.info("new AtomicRemainder. a.s. {} a.p. {} hashcode {}", availableShared, availablePrivate, hashCode(), new Exception());
- }
-
- public long getRemainder() {
- return availableShared.get() + availablePrivate.get();
- }
-
- public long getUsed() {
- return initTotal - getRemainder();
- }
-
- /**
- * Allow an allocator to constrain the remainder to a particular limit that is lower than the initTotal.
- * If limit is larger than initTotal, then the function will do nothing and the hasLimit flag will not be set.
- * @param limit new remainder limit
- */
- public void setLimit(long limit) {
- if(limit<initTotal){
- this.hasLimit=true;
- this.limit=limit;
- }
-
- }
- /**
- * Automatically allocate memory. This is used when an actual allocation happened to be larger than requested, or when
- * a buffer has it's ownership passed to another allocator.<br>
- * This memory has already been used up so it must be accurately accounted for in future allocations.
- *
- * @param size extra allocated memory that needs to be accounted for
- */
- public boolean forceGet(long size) {
- if (get(size, this.applyFragmentLimit)) {
- return true;
- } else {
- availableShared.addAndGet(-size);
- if (parent != null) {
- parent.forceGet(size);
- }
- return false;
- }
- }
-
- public boolean get(long size, boolean applyFragmentLimitForChild) {
- if (availablePrivate.get() < 1) {
- // if there is no preallocated memory, we can operate normally.
-
- // if there is a parent allocator, check it before allocating.
- if (parent != null && !parent.get(size, this.applyFragmentLimit)) {
- return false;
- }
-
- // If we need to allocate memory beyond the allowed Fragment Limit
- if(applyFragmentLimitForChild && this.applyFragmentLimit && this.hasLimit && (getUsed()+size > this.limit)){
- if (parent != null) {
- parent.returnAllocation(size);
- }
- StackTraceElement[] ste = (new Throwable()).getStackTrace();
- StringBuilder sb = new StringBuilder();
- for (StackTraceElement s : ste) {
- sb.append(s.toString());
- sb.append("\n");
- }
- logger.warn("No more memory. Fragment limit ({} bytes) reached. Trying to allocate {} bytes. {} bytes " +
- "already allocated.\n{}", limit, size, getUsed(), sb.toString());
- return false;
- }
-
- // attempt to get shared memory, if fails, return false.
- long outcome = availableShared.addAndGet(-size);
-// assert outcome <= initShared;
- if (outcome < 0) {
- availableShared.addAndGet(size);
- if (parent != null) {
- parent.returnAllocation(size);
- }
- return false;
- } else {
-// if (DEBUG)
-// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
- return true;
- }
-
- } else {
- // if there is preallocated memory, use that first.
- long unaccount = availablePrivate.addAndGet(-size);
- if (unaccount >= 0) {
-// if (DEBUG)
-// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
- return true;
- } else {
-
- long additionalSpaceNeeded = -unaccount;
- // if there is a parent allocator, check it before allocating.
- if (parent != null && !parent.get(additionalSpaceNeeded, this.applyFragmentLimit)) {
- // parent allocation failed, return space to private pool.
- availablePrivate.getAndAdd(size);
- return false;
- }
-
- // we got space from parent pool. lets make sure we have space locally available.
- long account = availableShared.addAndGet(-additionalSpaceNeeded);
- if (account >= 0) {
- // we were succesful, move private back to zero (since we allocated using shared).
- availablePrivate.addAndGet(additionalSpaceNeeded);
-// if (DEBUG)
-// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
- return true;
- } else {
- // we failed to get space from available shared. Return allocations to initial state.
- availablePrivate.addAndGet(size);
- availableShared.addAndGet(additionalSpaceNeeded);
- if (parent != null) {
- parent.returnAllocation(additionalSpaceNeeded);
- }
- return false;
- }
- }
-
- }
-
- }
-
- /**
- * Return the memory accounting to the allocation pool. Make sure to first maintain hold of the preallocated memory.
- *
- * @param size amount of memory returned
- */
- public void returnAllocation(long size) {
- long privateSize = availablePrivate.get();
- long privateChange = Math.min(size, initPrivate - privateSize);
- long sharedChange = size - privateChange;
- availablePrivate.addAndGet(privateChange);
- availableShared.addAndGet(sharedChange);
-// if (DEBUG)
-// logger.info("Return allocation {}, a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
- if (parent != null) {
- parent.returnAllocation(sharedChange);
- }
- }
-
- public void close() {
- if (closed) {
- logger.warn("Tried to close remainder, but it has already been closed", new Exception());
- return;
- }
- if (availablePrivate.get() != initPrivate || availableShared.get() != initShared) {
- IllegalStateException e = new IllegalStateException(
- String
- .format(ERROR, initPrivate, availablePrivate.get(), initPrivate - availablePrivate.get(), initShared, availableShared.get(), initShared - availableShared.get()));
- if (errorOnLeak) {
- throw e;
- } else {
- logger.warn("Memory leaked during query.", e);
- }
- }
- if (parent != null) {
- parent.returnAllocation(initPrivate);
- }
- closed = true;
- }
-
- static final String ERROR = "Failure while closing accountor. Expected private and shared pools to be set to initial values. However, one or more were not. Stats are\n\tzone\tinit\tallocated\tdelta \n\tprivate\t%d\t%d\t%d \n\tshared\t%d\t%d\t%d.";
-}