You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2016/02/26 06:54:02 UTC
[05/11] incubator-asterixdb-hyracks git commit: Implemented the
memory-bounded HashGroupby and HashJoin for BigObject
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
deleted file mode 100644
index 1cf454d..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFrameBufferManager {
-
- /**
- * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
- *
- * @throws org.apache.hyracks.api.exceptions.HyracksDataException
- */
- void reset() throws HyracksDataException;
-
- /**
- * @param frameIndex
- * @return the specified frame, from the set of memory buffers, being
- * managed by this memory manager
- */
- ByteBuffer getFrame(int frameIndex);
-
- /**
- * Get the startOffset of the specific frame inside buffer
- *
- * @param frameIndex
- * @return the start offset of the frame returned by {@link #getFrame(int)} method.
- */
- int getFrameStartOffset(int frameIndex);
-
- /**
- * Get the size of the specific frame inside buffer
- *
- * @param frameIndex
- * @return the length of the specific frame
- */
- int getFrameSize(int frameIndex);
-
- /**
- * @return the number of frames in this buffer
- */
- int getNumFrames();
-
- /**
- * Writes the whole frame into the buffer.
- *
- * @param frame source frame
- * @return the id of the inserted frame. if failed to return it will be -1.
- */
- int insertFrame(ByteBuffer frame) throws HyracksDataException;
-
- void close();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
deleted file mode 100644
index fab1706..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-public interface IFrameFreeSlotPolicy {
-
- /**
- * Find the best fit frame id which can hold the data, and then pop it out from the index.
- * Return -1 is failed to find any.
- *
- * @param tobeInsertedSize the actual size of the data which should include
- * the meta data like the field offset and the tuple
- * count extra size
- * @return the best fit frame id
- */
- int popBestFit(int tobeInsertedSize);
-
- /**
- * Register the new free slot into the index
- *
- * @param frameID
- * @param freeSpace
- */
- void pushNewFrame(int frameID, int freeSpace);
-
- /**
- * Clear all the existing free slot information.
- */
- void reset();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
deleted file mode 100644
index f555971..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFramePool {
-
- int getMinFrameSize();
-
- int getMemoryBudgetBytes();
-
- /**
- * Get a frame of given size. <br>
- * Returns {@code null} if failed to allocate the required size of frame
- *
- * @param frameSize the actual size of the frame.
- * @return the allocated frame
- * @throws HyracksDataException
- */
- ByteBuffer allocateFrame(int frameSize) throws HyracksDataException;
-
- /**
- * Reset the counters to initial status. This method should not release the pre-allocated resources.
- */
- void reset();
-
- /**
- * Release the pre-allocated resources.
- */
- void close();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
deleted file mode 100644
index 00decb9..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public interface ITupleBufferAccessor {
-
- void reset(TuplePointer tuplePointer);
-
- ByteBuffer getTupleBuffer();
-
- int getTupleStartOffset();
-
- int getTupleLength();
-
- int getAbsFieldStartOffset(int fieldId);
-
- int getFieldLength(int fieldId);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
deleted file mode 100644
index ae502a0..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public interface ITupleBufferManager {
- /**
- * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
- *
- * @throws org.apache.hyracks.api.exceptions.HyracksDataException
- */
- void reset() throws HyracksDataException;
-
- /**
- * @return the number of tuples in this buffer
- */
- int getNumTuples();
-
- boolean insertTuple(IFrameTupleAccessor accessor, int idx, TuplePointer tuplePointer) throws HyracksDataException;
-
- void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException;
-
- void close();
-
- ITupleBufferAccessor getTupleAccessor();
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
deleted file mode 100644
index 444b0b6..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hyracks.api.comm.FixedSizeFrame;
-import org.apache.hyracks.api.comm.FrameHelper;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
-
-public class VariableFrameMemoryManager implements IFrameBufferManager {
-
- private class PhysicalFrameOffset {
- IFrame physicalFrame;
- int physicalOffset;
-
- PhysicalFrameOffset(IFrame frame, int offset) {
- physicalFrame = frame;
- physicalOffset = offset;
- }
- }
-
- private class LogicalFrameStartSize {
- ByteBuffer logicalFrame;
- int logicalStart;
- int logicalSize;
-
- LogicalFrameStartSize(ByteBuffer frame, int start, int size) {
- logicalFrame = frame;
- logicalStart = start;
- logicalSize = size;
- }
- }
-
- private final IFramePool framePool;
- private List<PhysicalFrameOffset> physicalFrameOffsets;
- private List<LogicalFrameStartSize> logicalFrameStartSizes;
- private final IFrameFreeSlotPolicy freeSlotPolicy;
-
- public VariableFrameMemoryManager(IFramePool framePool, IFrameFreeSlotPolicy freeSlotPolicy) {
- this.framePool = framePool;
- this.freeSlotPolicy = freeSlotPolicy;
- int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
- this.physicalFrameOffsets = new ArrayList<>(maxFrames);
- this.logicalFrameStartSizes = new ArrayList<>(maxFrames);
- }
-
- private int findAvailableFrame(int frameSize) throws HyracksDataException {
- int frameId = freeSlotPolicy.popBestFit(frameSize);
- if (frameId >= 0) {
- return frameId;
- }
- ByteBuffer buffer = framePool.allocateFrame(frameSize);
- if (buffer != null) {
- IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), 0);
- physicalFrameOffsets.add(new PhysicalFrameOffset(new FixedSizeFrame(buffer), 0));
- return physicalFrameOffsets.size() - 1;
- }
- return -1;
- }
-
- @Override
- public void reset() throws HyracksDataException {
- physicalFrameOffsets.clear();
- logicalFrameStartSizes.clear();
- freeSlotPolicy.reset();
- framePool.reset();
- }
-
- @Override
- public ByteBuffer getFrame(int frameIndex) {
- return logicalFrameStartSizes.get(frameIndex).logicalFrame;
- }
-
- @Override
- public int getFrameStartOffset(int frameIndex) {
- return logicalFrameStartSizes.get(frameIndex).logicalStart;
- }
-
- @Override
- public int getFrameSize(int frameIndex) {
- return logicalFrameStartSizes.get(frameIndex).logicalSize;
- }
-
- @Override
- public int getNumFrames() {
- return logicalFrameStartSizes.size();
- }
-
- @Override
- public int insertFrame(ByteBuffer frame) throws HyracksDataException {
- int frameSize = frame.capacity();
- int physicalFrameId = findAvailableFrame(frameSize);
- if (physicalFrameId < 0) {
- return -1;
- }
- ByteBuffer buffer = physicalFrameOffsets.get(physicalFrameId).physicalFrame.getBuffer();
- int offset = physicalFrameOffsets.get(physicalFrameId).physicalOffset;
- System.arraycopy(frame.array(), 0, buffer.array(), offset, frameSize);
- if (offset + frameSize < buffer.capacity()) {
- freeSlotPolicy.pushNewFrame(physicalFrameId, buffer.capacity() - offset - frameSize);
- }
- physicalFrameOffsets.get(physicalFrameId).physicalOffset = offset + frameSize;
- logicalFrameStartSizes.add(new LogicalFrameStartSize(buffer, offset, frameSize));
- return logicalFrameStartSizes.size() - 1;
- }
-
- @Override
- public void close() {
- physicalFrameOffsets.clear();
- logicalFrameStartSizes.clear();
- freeSlotPolicy.reset();
- framePool.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
deleted file mode 100644
index 344f961..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class VariableFramePool implements IFramePool {
- public static final int UNLIMITED_MEMORY = -1;
-
- private final IHyracksFrameMgrContext ctx;
- private final int minFrameSize;
- private final int memBudget;
-
- private int allocateMem;
- private ArrayList<ByteBuffer> buffers; // the unused slots were sorted by size increasingly.
- private BitSet used; // the merged one also marked as used.
-
- /**
- * The constructor of the VariableFramePool.
- *
- * @param ctx
- * @param memBudgetInBytes the given memory budgets to allocate the frames. If it less than 0, it will be treated as unlimited budgets
- */
- public VariableFramePool(IHyracksFrameMgrContext ctx, int memBudgetInBytes) {
- this.ctx = ctx;
- this.minFrameSize = ctx.getInitialFrameSize();
- this.allocateMem = 0;
- if (memBudgetInBytes == UNLIMITED_MEMORY) {
- this.memBudget = Integer.MAX_VALUE;
- this.buffers = new ArrayList<>();
- this.used = new BitSet();
- } else {
- this.memBudget = memBudgetInBytes;
- this.buffers = new ArrayList<>(memBudgetInBytes / minFrameSize);
- this.used = new BitSet(memBudgetInBytes / minFrameSize);
- }
- }
-
- @Override
- public int getMinFrameSize() {
- return minFrameSize;
- }
-
- @Override
- public int getMemoryBudgetBytes() {
- return memBudget;
- }
-
- @Override
- public ByteBuffer allocateFrame(int frameSize) throws HyracksDataException {
- int frameId = findExistingFrame(frameSize);
- if (frameId >= 0) {
- return reuseFrame(frameId);
- }
- if (haveEnoughFreeSpace(frameSize)) {
- return createNewFrame(frameSize);
- }
- return mergeExistingFrames(frameSize);
-
- }
-
- private boolean haveEnoughFreeSpace(int frameSize) {
- return frameSize + allocateMem <= memBudget;
- }
-
- private static int getFirstUnUsedPos(BitSet used) {
- return used.nextClearBit(0);
- }
-
- private static int getLastUnUsedPos(BitSet used, int lastPos) {
- return used.previousClearBit(lastPos);
- }
-
- private static int binarySearchUnusedBuffer(ArrayList<ByteBuffer> buffers, BitSet used, int frameSize) {
- int l = getFirstUnUsedPos(used); // to skip the merged null buffers
- int h = getLastUnUsedPos(used, (buffers.size() - 1)) + 1; // to skip the newly created buffers
- if (l >= h) {
- return -1;
- }
- int highest = h;
- int mid = (l + h) / 2;
- while (l < h) {
- ByteBuffer buffer = buffers.get(mid);
- if (buffer.capacity() == frameSize) {
- break;
- }
- if (buffer.capacity() < frameSize) {
- l = mid + 1;
- } else {
- h = mid;
- }
- mid = (l + h) / 2;
- }
- mid = used.nextClearBit(mid);
- return mid < highest ? mid : -1;
- }
-
- private int findExistingFrame(int frameSize) {
- return binarySearchUnusedBuffer(buffers, used, frameSize);
- }
-
- private ByteBuffer reuseFrame(int id) {
- used.set(id);
- buffers.get(id).clear();
- return buffers.get(id);
- }
-
- private ByteBuffer createNewFrame(int frameSize) throws HyracksDataException {
- buffers.add(ctx.allocateFrame(frameSize));
- allocateMem += frameSize;
- return reuseFrame(buffers.size() - 1);
- }
-
- /**
- * The merging sequence is from the smallest to the largest order.
- * Once the buffer get merged, it will be remove from the list in order to free the object.
- * And the index spot of it will be marked as used.
- *
- * @param frameSize
- * @return
- * @throws HyracksDataException
- */
- private ByteBuffer mergeExistingFrames(int frameSize) throws HyracksDataException {
- int mergedSize = memBudget - allocateMem;
- int highBound = getLastUnUsedPos(used, buffers.size() - 1) + 1;
- for (int i = getFirstUnUsedPos(used); i < highBound; ++i) {
- if (!used.get(i)) {
- mergedSize += deAllocateFrame(i);
- if (mergedSize >= frameSize) {
- return createNewFrame(mergedSize);
- }
- }
- }
- return null;
- }
-
- private int deAllocateFrame(int id) {
- ByteBuffer frame = buffers.get(id);
- ctx.deallocateFrames(frame.capacity());
- buffers.set(id, null);
- used.set(id);
- allocateMem -= frame.capacity();
- return frame.capacity();
- }
-
- @Override
- public void reset() {
- removeEmptySpot(buffers);
- Collections.sort(buffers, sizeByteBufferComparator);
- used.clear();
- }
-
- private static void removeEmptySpot(List<ByteBuffer> buffers) {
- for (int i = 0; i < buffers.size(); ) {
- if (buffers.get(i) == null) {
- buffers.remove(i);
- } else {
- i++;
- }
- }
- }
-
- @Override
- public void close() {
- buffers.clear();
- used.clear();
- allocateMem = 0;
- }
-
- private static Comparator<ByteBuffer> sizeByteBufferComparator = new Comparator<ByteBuffer>() {
- @Override
- public int compare(ByteBuffer o1, ByteBuffer o2) {
- if (o1.capacity() == o2.capacity()) {
- return 0;
- }
- return o1.capacity() < o2.capacity() ? -1 : 1;
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
deleted file mode 100644
index 20642bf..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
-import org.apache.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public class VariableTupleMemoryManager implements ITupleBufferManager {
-
- private final static Logger LOG = Logger.getLogger(VariableTupleMemoryManager.class.getName());
-
- private final int MIN_FREE_SPACE;
- private final IFramePool pool;
- private final IFrameFreeSlotPolicy policy;
- private final IAppendDeletableFrameTupleAccessor accessor;
- private final ArrayList<ByteBuffer> frames;
- private final RecordDescriptor recordDescriptor;
- private int numTuples;
- private int statsReOrg;
-
- public VariableTupleMemoryManager(IFramePool framePool, RecordDescriptor recordDescriptor) {
- this.pool = framePool;
- int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
- this.policy = new FrameFreeSlotLastFit(maxFrames);
- this.accessor = new DeletableFrameTupleAppender(recordDescriptor);
- this.frames = new ArrayList<>();
- this.MIN_FREE_SPACE = calculateMinFreeSpace(recordDescriptor);
- this.recordDescriptor = recordDescriptor;
- this.numTuples = 0;
- this.statsReOrg = 0;
- }
-
- @Override
- public void reset() throws HyracksDataException {
- pool.reset();
- policy.reset();
- frames.clear();
- numTuples = 0;
- }
-
- @Override
- public int getNumTuples() {
- return numTuples;
- }
-
- @Override
- public boolean insertTuple(IFrameTupleAccessor fta, int idx, TuplePointer tuplePointer)
- throws HyracksDataException {
- int requiredFreeSpace = calculatePhysicalSpace(fta, idx);
- int frameId = findAvailableFrame(requiredFreeSpace);
- if (frameId < 0) {
- if (canBeInsertedAfterCleanUpFragmentation(requiredFreeSpace)) {
- reOrganizeFrames();
- frameId = findAvailableFrame(requiredFreeSpace);
- statsReOrg++;
- } else {
- return false;
- }
- }
- assert frameId >= 0;
- accessor.reset(frames.get(frameId));
- assert accessor.getContiguousFreeSpace() >= requiredFreeSpace;
- int tid = accessor.append(fta, idx);
- assert tid >= 0;
- tuplePointer.reset(frameId, tid);
- if (accessor.getContiguousFreeSpace() > MIN_FREE_SPACE) {
- policy.pushNewFrame(frameId, accessor.getContiguousFreeSpace());
- }
- numTuples++;
- return true;
- }
-
- private void reOrganizeFrames() {
- policy.reset();
- for (int i = 0; i < frames.size(); i++) {
- accessor.reset(frames.get(i));
- accessor.reOrganizeBuffer();
- policy.pushNewFrame(i, accessor.getContiguousFreeSpace());
- }
- }
-
- private boolean canBeInsertedAfterCleanUpFragmentation(int requiredFreeSpace) {
- for (int i = 0; i < frames.size(); i++) {
- accessor.reset(frames.get(i));
- if (accessor.getTotalFreeSpace() >= requiredFreeSpace) {
- return true;
- }
- }
- return false;
- }
-
- private int findAvailableFrame(int requiredFreeSpace) throws HyracksDataException {
- int frameId = policy.popBestFit(requiredFreeSpace);
- if (frameId >= 0) {
- return frameId;
- }
-
- int frameSize = calculateMinFrameSizeToPlaceTuple(requiredFreeSpace, pool.getMinFrameSize());
- ByteBuffer buffer = pool.allocateFrame(frameSize);
- if (buffer != null) {
- accessor.clear(buffer);
- frames.add(buffer);
- return frames.size() - 1;
- }
- return -1;
- }
-
- private static int calculateMinFrameSizeToPlaceTuple(int requiredFreeSpace, int minFrameSize) {
- return (1 + (requiredFreeSpace + 4 - 1) / minFrameSize) * minFrameSize;
- }
-
- private static int calculatePhysicalSpace(IFrameTupleAccessor fta, int idx) {
- // 4 bytes to store the offset
- return 4 + fta.getTupleLength(idx);
- }
-
- private static int calculateMinFreeSpace(RecordDescriptor recordDescriptor) {
- // + 4 for the tuple offset
- return recordDescriptor.getFieldCount() * 4 + 4;
- }
-
- @Override
- public void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException {
- accessor.reset(frames.get(tuplePointer.frameIndex));
- accessor.delete(tuplePointer.tupleIndex);
- numTuples--;
- }
-
- @Override
- public void close() {
- pool.close();
- policy.reset();
- frames.clear();
- numTuples = 0;
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("VariableTupleMemoryManager has reorganized " + statsReOrg + " times");
- }
- statsReOrg = 0;
- }
-
- @Override
- public ITupleBufferAccessor getTupleAccessor() {
- return new ITupleBufferAccessor() {
- private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender(
- recordDescriptor);
- private int tid;
-
- @Override
- public void reset(TuplePointer tuplePointer) {
- bufferAccessor.reset(frames.get(tuplePointer.frameIndex));
- tid = tuplePointer.tupleIndex;
- }
-
- @Override
- public ByteBuffer getTupleBuffer() {
- return bufferAccessor.getBuffer();
- }
-
- @Override
- public int getTupleStartOffset() {
- return bufferAccessor.getTupleStartOffset(tid);
- }
-
- @Override
- public int getTupleLength() {
- return bufferAccessor.getTupleLength(tid);
- }
-
- @Override
- public int getAbsFieldStartOffset(int fieldId) {
- return bufferAccessor.getAbsoluteFieldStartOffset(tid, fieldId);
- }
-
- @Override
- public int getFieldLength(int fieldId) {
- return bufferAccessor.getFieldLength(tid, fieldId);
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
index f77ec63..31ea07d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
@@ -26,7 +26,7 @@ import org.apache.hyracks.api.comm.IFrameTupleAppender;
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
- * Basically it a union of the {@link IFrameTupleAccessor} and {@link IFrameTupleAppender}.
+ * Basically, it is an union of the {@link IFrameTupleAccessor} and {@link IFrameTupleAppender}.
* Moreover, it has the delete function as well.
* This is a special TupleAccessor used for TopK sorting.
* In HeapSort, or other Tuple-based operators, we need to append the tuple, access the arbitrary previously
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java
index c6ca09b..d9a62c8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java
@@ -72,9 +72,7 @@ public abstract class AbstractHeap implements IHeap<IResetableComparable> {
@Override
public void reset() {
- for (int i = 0; i < numEntry; i++) {
- entries[i] = null;
- }
+ Arrays.fill(entries, null);
numEntry = 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
index ceae0f1..8cd6792 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
@@ -22,15 +22,19 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface ISerializableTable {
- public void insert(int entry, TuplePointer tuplePointer) throws HyracksDataException;
+ void insert(int entry, TuplePointer tuplePointer) throws HyracksDataException;
- public void getTuplePointer(int entry, int offset, TuplePointer tuplePointer);
+ void delete(int entry);
- public int getFrameCount();
+ boolean getTuplePointer(int entry, int offset, TuplePointer tuplePointer);
- public int getTupleCount();
+ int getFrameCount();
- public void reset();
+ int getTupleCount();
- public void close();
+ int getTupleCount(int entry);
+
+ void reset();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index 7db57c0..1f2ebef 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.std.structures;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -36,14 +37,14 @@ public class SerializableHashTable implements ISerializableTable {
private IntSerDeBuffer[] headers;
private List<IntSerDeBuffer> contents = new ArrayList<IntSerDeBuffer>();
private List<Integer> frameCurrentIndex = new ArrayList<Integer>();
- private final IHyracksTaskContext ctx;
- private int frameCapacity = 0;
+ private final IHyracksFrameMgrContext ctx;
+ private final int frameCapacity;
private int currentLargestFrameIndex = 0;
private int tupleCount = 0;
private int headerFrameCount = 0;
private TuplePointer tempTuplePointer = new TuplePointer();
- public SerializableHashTable(int tableSize, final IHyracksTaskContext ctx) throws HyracksDataException {
+ public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException {
this.ctx = ctx;
int frameSize = ctx.getInitialFrameSize();
@@ -81,28 +82,45 @@ public class SerializableHashTable implements ISerializableTable {
}
@Override
- public void getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
+ public void delete(int entry) {
+ int hFrameIndex = getHeaderFrameIndex(entry);
+ int headerOffset = getHeaderFrameOffset(entry);
+ IntSerDeBuffer header = headers[hFrameIndex];
+ if (header != null) {
+ int frameIndex = header.getInt(headerOffset);
+ int offsetIndex = header.getInt(headerOffset + 1);
+ if (frameIndex >= 0) {
+ IntSerDeBuffer frame = contents.get(frameIndex);
+ int entryUsedItems = frame.getInt(offsetIndex + 1);
+ frame.writeInt(offsetIndex + 1, 0);
+ tupleCount -= entryUsedItems;
+ }
+ }
+ }
+
+ @Override
+ public boolean getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
int hFrameIndex = getHeaderFrameIndex(entry);
int headerOffset = getHeaderFrameOffset(entry);
IntSerDeBuffer header = headers[hFrameIndex];
if (header == null) {
dataPointer.frameIndex = -1;
dataPointer.tupleIndex = -1;
- return;
+ return false;
}
int frameIndex = header.getInt(headerOffset);
int offsetIndex = header.getInt(headerOffset + 1);
if (frameIndex < 0) {
dataPointer.frameIndex = -1;
dataPointer.tupleIndex = -1;
- return;
+ return false;
}
IntSerDeBuffer frame = contents.get(frameIndex);
int entryUsedItems = frame.getInt(offsetIndex + 1);
if (offset > entryUsedItems - 1) {
dataPointer.frameIndex = -1;
dataPointer.tupleIndex = -1;
- return;
+ return false;
}
int startIndex = offsetIndex + 2 + offset * 2;
while (startIndex >= frameCapacity) {
@@ -112,6 +130,7 @@ public class SerializableHashTable implements ISerializableTable {
frame = contents.get(frameIndex);
dataPointer.frameIndex = frame.getInt(startIndex);
dataPointer.tupleIndex = frame.getInt(startIndex + 1);
+ return true;
}
@Override
@@ -139,9 +158,26 @@ public class SerializableHashTable implements ISerializableTable {
}
@Override
+ public int getTupleCount(int entry) {
+ int hFrameIndex = getHeaderFrameIndex(entry);
+ int headerOffset = getHeaderFrameOffset(entry);
+ IntSerDeBuffer header = headers[hFrameIndex];
+ if (header != null) {
+ int frameIndex = header.getInt(headerOffset);
+ int offsetIndex = header.getInt(headerOffset + 1);
+ if (frameIndex >= 0) {
+ IntSerDeBuffer frame = contents.get(frameIndex);
+ int entryUsedItems = frame.getInt(offsetIndex + 1);
+ return entryUsedItems;
+ }
+ }
+ return 0;
+ }
+
+ @Override
public void close() {
int nFrames = contents.size();
- for (int i = 0; i < headers.length; i++)
+ for (int i = 0; i < headers.length; i++)
headers[i] = null;
contents.clear();
frameCurrentIndex.clear();
@@ -259,31 +295,30 @@ public class SerializableHashTable implements ISerializableTable {
return offset;
}
-}
-
-class IntSerDeBuffer {
+ class IntSerDeBuffer {
- private byte[] bytes;
+ private byte[] bytes;
- public IntSerDeBuffer(byte[] data) {
- this.bytes = data;
- }
+ public IntSerDeBuffer(byte[] data) {
+ this.bytes = data;
+ }
- public int getInt(int pos) {
- int offset = pos * 4;
- return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
- + ((bytes[offset + 3] & 0xff) << 0);
- }
+ public int getInt(int pos) {
+ int offset = pos * 4;
+ return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16)
+ + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0);
+ }
- public void writeInt(int pos, int value) {
- int offset = pos * 4;
- bytes[offset++] = (byte) (value >> 24);
- bytes[offset++] = (byte) (value >> 16);
- bytes[offset++] = (byte) (value >> 8);
- bytes[offset++] = (byte) (value);
- }
+ public void writeInt(int pos, int value) {
+ int offset = pos * 4;
+ bytes[offset++] = (byte) (value >> 24);
+ bytes[offset++] = (byte) (value >> 16);
+ bytes[offset++] = (byte) (value >> 8);
+ bytes[offset++] = (byte) (value);
+ }
- public int capacity() {
- return bytes.length / 4;
+ public int capacity() {
+ return bytes.length / 4;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java
new file mode 100644
index 0000000..c74fe04
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.util;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
+
+public class FrameTuplePairComparator {
+ private final int[] keys0;
+ private final int[] keys1;
+ private final IBinaryComparator[] comparators;
+
+ public FrameTuplePairComparator(int[] keys0, int[] keys1, IBinaryComparator[] comparators) {
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.comparators = comparators;
+ }
+
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+ throws HyracksDataException {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ for (int i = 0; i < keys0.length; ++i) {
+ int fIdx0 = keys0[i];
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fIdx1 = keys1[i];
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, ITuplePointerAccessor bufferAccessor)
+ throws HyracksDataException {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ for (int i = 0; i < keys0.length; ++i) {
+ int fIdx0 = keys0[i];
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = bufferAccessor.getAbsFieldStartOffset(keys1[i]);
+ int fLen1 = bufferAccessor.getFieldLength(keys1[i]);
+
+ int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, bufferAccessor
+ .getBuffer().array(), fStart1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java
new file mode 100644
index 0000000..3e4e578
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.BUDGET;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.MIN_FRAME_SIZE;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.NUM_MIN_FRAME;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.commonFrameManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractFramePoolTest {
+ IFramePool pool;
+
+ @Test
+ public void testGetMinFrameSize() throws Exception {
+ assertEquals(MIN_FRAME_SIZE, commonFrameManager.getInitialFrameSize());
+ assertEquals(MIN_FRAME_SIZE, pool.getMinFrameSize());
+ }
+
+ @Test
+ public void testGetMemoryBudgetBytes() throws Exception {
+ assertEquals(BUDGET, pool.getMemoryBudgetBytes());
+ }
+
+ protected void testAllocateShouldFailAfterAllSpaceGetUsed() throws HyracksDataException {
+ for (int i = 0; i < NUM_MIN_FRAME; i++) {
+ assertNull(pool.allocateFrame(MIN_FRAME_SIZE));
+ }
+ }
+
+ protected HashSet<ByteBufferPtr> testAllocateAllSpacesWithMinFrames() throws HyracksDataException {
+ HashSet<ByteBufferPtr> set = new HashSet<>();
+ for (int i = 0; i < NUM_MIN_FRAME; i++) {
+ testAllocateNewBuffer(set, MIN_FRAME_SIZE);
+ }
+ return set;
+ }
+
+ protected void testAllocateNewBuffer(HashSet<ByteBufferPtr> set, int frameSize) throws HyracksDataException {
+ ByteBuffer buffer = pool.allocateFrame(frameSize);
+ assertNotNull(buffer);
+ assertEquals(buffer.capacity(), frameSize);
+ assertTrue(!set.contains(new ByteBufferPtr(buffer)));
+ set.add(new ByteBufferPtr(buffer));
+ }
+
+ /**
+ * Pool will become 1,2,3,4,5
+ *
+ * @throws HyracksDataException
+ */
+ protected Set<ByteBufferPtr> testAllocateVariableFrames() throws HyracksDataException {
+ int budget = BUDGET;
+ int allocate = 0;
+ int i = 1;
+ Set<ByteBufferPtr> set = new HashSet<>();
+ while (budget - allocate >= i * MIN_FRAME_SIZE) {
+ ByteBuffer buffer = pool.allocateFrame(i * MIN_FRAME_SIZE);
+ assertNotNull(buffer);
+ set.add(new ByteBufferPtr(buffer));
+ allocate += i++ * MIN_FRAME_SIZE;
+ }
+ return set;
+ }
+
+ protected void testShouldFindTheMatchFrames(Set<?> set) throws HyracksDataException {
+ pool.reset();
+ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
+
+ for (int i = 0; i < list.size(); i++) {
+ ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE);
+ assertNotNull(buffer);
+ assertTrue(set.contains(new ByteBufferPtr(buffer)));
+ assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity());
+ }
+ pool.reset();
+ for (int i = list.size() - 1; i >= 0; i--) {
+ ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE);
+ assertNotNull(buffer);
+ assertTrue(set.contains(new ByteBufferPtr(buffer)));
+ assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity());
+ }
+
+ Collections.shuffle(list);
+ pool.reset();
+ for (int i = 0; i < list.size(); i++) {
+ ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE);
+ assertNotNull(buffer);
+ assertTrue(set.contains(new ByteBufferPtr(buffer)));
+ assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity());
+ }
+
+ }
+
+ public static class ByteBufferPtr {
+ ByteBuffer bytebuffer;
+
+ public ByteBufferPtr(ByteBuffer buffer) {
+ bytebuffer = buffer;
+ }
+
+ @Override
+ public int hashCode() {
+ return bytebuffer.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this.bytebuffer == ((ByteBufferPtr) obj).bytebuffer;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java
new file mode 100644
index 0000000..cdf8834
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+import org.apache.hyracks.dataflow.std.sort.Utility;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public abstract class AbstractTupleMemoryManagerTest {
+ ISerializerDeserializer[] fieldsSerDer = new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+ RecordDescriptor recordDescriptor = new RecordDescriptor(fieldsSerDer);
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptor.getFieldCount());
+ FrameTupleAccessor inFTA = new FrameTupleAccessor(recordDescriptor);
+ Random random = new Random(System.currentTimeMillis());
+
+ abstract ITuplePointerAccessor getTupleAccessor();
+
+ protected void assertEachTupleInFTAIsInBuffer(Map<Integer, Integer> map, Map<TuplePointer, Integer> mapInserted) {
+ ITuplePointerAccessor accessor = getTupleAccessor();
+ for (Map.Entry<TuplePointer, Integer> entry : mapInserted.entrySet()) {
+ accessor.reset(entry.getKey());
+ int dataLength = map.get(entry.getValue());
+ assertEquals((int) entry.getValue(),
+ IntSerDeUtils.getInt(accessor.getBuffer().array(), accessor.getAbsFieldStartOffset(0)));
+ assertEquals(dataLength, accessor.getTupleLength());
+ }
+ assertEquals(map.size(), mapInserted.size());
+ }
+
+ protected Map<Integer, Integer> prepareFixedSizeTuples(
+ int tuplePerFrame,
+ int extraMetaBytePerFrame,
+ int extraMetaBytePerRecord) throws HyracksDataException {
+ Map<Integer, Integer> dataSet = new HashMap<>();
+ ByteBuffer buffer = ByteBuffer.allocate(Common.BUDGET);
+ FixedSizeFrame frame = new FixedSizeFrame(buffer);
+ FrameTupleAppender appender = new FrameTupleAppender();
+ appender.reset(frame, true);
+
+ int sizePerTuple = (Common.MIN_FRAME_SIZE - 1 - tuplePerFrame * 4 - 4 - extraMetaBytePerFrame) / tuplePerFrame;
+ int sizeChar =
+ sizePerTuple - extraMetaBytePerRecord - fieldsSerDer.length * 4 - 4 - 2; //2byte to write str length
+ assert (sizeChar > 0);
+ for (int i = 0; i < Common.NUM_MIN_FRAME * tuplePerFrame; i++) {
+ tupleBuilder.reset();
+ tupleBuilder.addField(fieldsSerDer[0], i);
+ tupleBuilder.addField(fieldsSerDer[1], Utility.repeatString('a', sizeChar));
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ assert false;
+ }
+ dataSet.put(i, tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * 4);
+ }
+ inFTA.reset(buffer);
+ return dataSet;
+ }
+
+ protected Map<Integer, Integer> prepareVariableSizeTuples() throws HyracksDataException {
+ Map<Integer, Integer> dataSet = new HashMap<>();
+ ByteBuffer buffer = ByteBuffer.allocate(Common.BUDGET);
+ FixedSizeFrame frame = new FixedSizeFrame(buffer);
+ FrameTupleAppender appender = new FrameTupleAppender();
+ appender.reset(frame, true);
+
+ for (int i = 0; true; i++) {
+ tupleBuilder.reset();
+ tupleBuilder.addField(fieldsSerDer[0], i);
+ tupleBuilder.addField(fieldsSerDer[1], Utility.repeatString('a', i));
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ break;
+ }
+ dataSet.put(i, tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * 4);
+ }
+ inFTA.reset(buffer);
+ return dataSet;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java
new file mode 100644
index 0000000..7389aab
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+
+public class Common {
+ static int MIN_FRAME_SIZE = 256;
+ static int NUM_MIN_FRAME = 15;
+ static int BUDGET = NUM_MIN_FRAME * MIN_FRAME_SIZE;
+
+ static FrameManager commonFrameManager = new FrameManager(MIN_FRAME_SIZE);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java
new file mode 100644
index 0000000..2a84e0e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.BUDGET;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.MIN_FRAME_SIZE;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.commonFrameManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.HashSet;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DeletableFramePoolTest extends AbstractFramePoolTest {
+
+ @Before
+ public void setUp() {
+ pool = new DeallocatableFramePool(commonFrameManager, BUDGET);
+ }
+
+ DeallocatableFramePool getPool() {
+ return (DeallocatableFramePool) pool;
+ }
+
+ @Test
+ public void testAllocateBuffers() throws HyracksDataException {
+ testAllocateAllSpacesWithMinFrames();
+ }
+
+ @Test
+ public void testCanNotAllocateMore() throws HyracksDataException {
+ testAllocateAllSpacesWithMinFrames();
+ assertNull(pool.allocateFrame(MIN_FRAME_SIZE));
+ }
+
+ @Test
+ public void testReusePreAllocatedBuffer() throws HyracksDataException {
+ HashSet<ByteBufferPtr> set = testAllocateAllSpacesWithMinFrames();
+ for (ByteBufferPtr ptr : set) {
+ getPool().deAllocateBuffer(ptr.bytebuffer);
+ }
+ HashSet<ByteBufferPtr> set2 = testAllocateAllSpacesWithMinFrames();
+ assertEquals(set, set2);
+ }
+
+ @Test
+ public void testMergeCase() throws HyracksDataException {
+ HashSet<ByteBufferPtr> set = testAllocateAllSpacesWithMinFrames();
+ for (ByteBufferPtr ptr : set) {
+ getPool().deAllocateBuffer(ptr.bytebuffer);
+ }
+ set.clear();
+ int i = 1;
+ for (int sum = 0; sum + MIN_FRAME_SIZE * i <= BUDGET; i++) {
+ sum += MIN_FRAME_SIZE * i;
+ testAllocateNewBuffer(set, MIN_FRAME_SIZE * i);
+ }
+ assertNull(pool.allocateFrame(MIN_FRAME_SIZE * i));
+ for (ByteBufferPtr ptr : set) {
+ getPool().deAllocateBuffer(ptr.bytebuffer);
+ }
+ set.clear();
+ testAllocateNewBuffer(set, BUDGET);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java
new file mode 100644
index 0000000..992c7f6
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FrameFreeSlotBestFitUsingTreeMapTest {
+
+ static int size = 10;
+
+ FrameFreeSlotSmallestFit policy;
+
+ @Before
+ public void intial() {
+ policy = new FrameFreeSlotSmallestFit();
+ }
+
+ @Test
+ public void testAll() {
+
+ for (int i = 0; i < size; i++) {
+ policy.pushNewFrame(i, i);
+ assertEquals(i, policy.popBestFit(i));
+ }
+ assertEquals(-1, policy.popBestFit(0));
+
+ for (int i = 0; i < size; i++) {
+ policy.pushNewFrame(i, i);
+ }
+ for (int i = 0; i < size; i++) {
+ assertEquals(i, policy.popBestFit(i));
+ }
+
+ }
+
+ @Test
+ public void testReset(){
+ testAll();
+ policy.reset();
+ testAll();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java
new file mode 100644
index 0000000..88a54bd
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static junit.framework.Assert.assertEquals;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FrameFreeSlotBiggestFirstTest {
+
+ static int size = 10;
+
+ FrameFreeSlotBiggestFirst policy;
+
+ @Before
+ public void intial() {
+ policy = new FrameFreeSlotBiggestFirst(size);
+ }
+
+ @Test
+ public void testAll() {
+
+ for (int i = 0; i < size; i++) {
+ policy.pushNewFrame(i, i);
+ assertEquals(i, policy.popBestFit(i));
+ }
+ assertEquals(-1, policy.popBestFit(0));
+
+ for (int i = 0; i < size; i++) {
+ policy.pushNewFrame(i, i);
+ }
+ for (int i = 0; i < size; i++) {
+ assertEquals(size - i - 1, policy.popBestFit(0));
+ }
+
+ for (int i = 0; i < size; i++) {
+ policy.pushNewFrame(i, i);
+ }
+ for (int i = 0; i < size / 2; i++) {
+ assertEquals(size - i - 1, policy.popBestFit(size / 2));
+ }
+ assertEquals(-1, policy.popBestFit(size / 2));
+ for (int i = 0; i < size / 2; i++) {
+ assertEquals(size / 2 - i - 1, policy.popBestFit(0));
+ }
+
+ }
+
+ @Test
+ public void testReset() {
+ testAll();
+ policy.reset();
+ testAll();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java
new file mode 100644
index 0000000..87b1b91
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FrameFreeSlotLastFitTest {
+
+ FrameFreeSlotLastFit zeroPolicy;
+ FrameFreeSlotLastFit unifiedPolicy;
+ FrameFreeSlotLastFit ascPolicy;
+ FrameFreeSlotLastFit dscPolicy;
+
+ static final int size = 10;
+ static final int medium = 5;
+
+ @Before
+ public void setUp() throws Exception {
+ zeroPolicy = new FrameFreeSlotLastFit(0);
+ unifiedPolicy = new FrameFreeSlotLastFit(size);
+ ascPolicy = new FrameFreeSlotLastFit(size);
+ dscPolicy = new FrameFreeSlotLastFit(size);
+ }
+
+ @Test
+ public void testPushAndPop() throws Exception {
+ for (int i = 0; i < size; i++) {
+ unifiedPolicy.pushNewFrame(i, medium);
+ }
+ for (int i = 0; i < size; i++) {
+ assertTrue(unifiedPolicy.popBestFit(medium) == size - i - 1);
+ }
+ assertTrue(unifiedPolicy.popBestFit(0) == -1);
+
+ for (int i = 0; i < size / 2; i++) {
+ ascPolicy.pushNewFrame(i, i);
+ assertEquals(ascPolicy.popBestFit(medium), -1);
+ dscPolicy.pushNewFrame(i, size - i - 1);
+ assertEquals(dscPolicy.popBestFit(medium), i);
+ }
+
+ for (int i = size / 2; i < size; i++) {
+ ascPolicy.pushNewFrame(i, i);
+ assertEquals(ascPolicy.popBestFit(medium), i);
+ dscPolicy.pushNewFrame(i, size - i - 1);
+ assertEquals(dscPolicy.popBestFit(medium), -1);
+ }
+
+ ascPolicy.reset();
+ for (int i = 0; i < size; i++) {
+ ascPolicy.pushNewFrame(size - i, size - i);
+ }
+
+ for (int i = 0; i < size; i++) {
+ assertEquals(size - i, ascPolicy.popBestFit(size - i));
+ }
+ }
+
+ @Test
+ public void testReset() throws Exception {
+ testPushAndPop();
+
+ zeroPolicy.reset();
+ unifiedPolicy.reset();
+ ascPolicy.reset();
+ dscPolicy.reset();
+ testPushAndPop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java
new file mode 100644
index 0000000..ce31108
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class VPartitionTupleBufferManagerTest extends AbstractTupleMemoryManagerTest {
+
+ VPartitionTupleBufferManager bufferManager;
+ final int PARTITION = 4;
+
+ @Before
+ public void setup() throws HyracksDataException {
+ IPartitionedMemoryConstrain constrain = new IPartitionedMemoryConstrain() {
+ @Override
+ public int frameLimit(int partitionId) {
+ return Integer.MAX_VALUE;
+ }
+ };
+ bufferManager = new VPartitionTupleBufferManager(Common.commonFrameManager, constrain, PARTITION,
+ Common.BUDGET);
+ }
+
+ @Test
+ public void testGetNumPartitions() throws Exception {
+ assertEquals(PARTITION, bufferManager.getNumPartitions());
+ }
+
+ @Test
+ public void testGetNumTuples() throws Exception {
+ testNumTuplesAndSizeIsZero();
+ }
+
+ @Test
+ public void testInsertToFull() throws HyracksDataException {
+ Map<Integer, Integer> inMap = prepareFixedSizeTuples(10, 0, 0);
+ for (int pid = 0; pid < PARTITION; pid++) {
+ assertInsertOnePartitionToFull(pid, inMap);
+ bufferManager.reset();
+ }
+ }
+
+ @Test
+ public void testInsertClearSequence() throws HyracksDataException {
+ Map<Integer, Integer> inMap = prepareFixedSizeTuples(10, 0, 0);
+ for (int pid = 0; pid < PARTITION; pid++) {
+ assertInsertOnePartitionToFull(pid, inMap);
+ bufferManager.reset();
+ }
+ }
+
+ private void assertInsertOnePartitionToFull(int pid, Map<Integer, Integer> inMap) throws HyracksDataException {
+ testNumTuplesAndSizeIsZero();
+
+ Map<TuplePointer, Integer> outMap = testInsertOnePartitionToFull(pid);
+ assertEquals(outMap.size(), bufferManager.getNumTuples(pid));
+ assertEquals(Common.BUDGET, bufferManager.getPhysicalSize(pid));
+ testCanNotInsertToAnyPartitions();
+ assertEachTupleInFTAIsInBuffer(inMap, outMap);
+
+ }
+
+ private void testCanNotInsertToAnyPartitions() throws HyracksDataException {
+ for (int i = 0; i < PARTITION; i++) {
+ assertFalse(bufferManager.insertTuple(i, tupleBuilder.getByteArray(), tupleBuilder.getFieldEndOffsets(), 0,
+ tupleBuilder.getSize(), null));
+ }
+ }
+
+ private Map<TuplePointer, Integer> testInsertOnePartitionToFull(int idpart) throws HyracksDataException {
+ Map<TuplePointer, Integer> tuplePointerIntegerMap = new HashMap<>();
+
+ for (int i = 0; i < inFTA.getTupleCount(); i++) {
+ TuplePointer tuplePointer = new TuplePointer();
+ copyDataToTupleBuilder(inFTA, i, tupleBuilder);
+ if (!bufferManager.insertTuple(idpart, tupleBuilder.getByteArray(), tupleBuilder.getFieldEndOffsets(), 0,
+ tupleBuilder.getSize(), tuplePointer)) {
+ assert false;
+ }
+ tuplePointerIntegerMap.put(tuplePointer,
+ IntSerDeUtils.getInt(inFTA.getBuffer().array(), inFTA.getAbsoluteFieldStartOffset(i, 0)));
+ }
+ return tuplePointerIntegerMap;
+
+ }
+
+ private static void copyDataToTupleBuilder(FrameTupleAccessor inFTA, int tid, ArrayTupleBuilder tupleBuilder)
+ throws HyracksDataException {
+ tupleBuilder.reset();
+ for (int fid = 0; fid < inFTA.getFieldCount(); fid++) {
+ tupleBuilder.addField(inFTA.getBuffer().array(), inFTA.getAbsoluteFieldStartOffset(tid, fid),
+ inFTA.getFieldLength(tid, fid));
+ }
+ }
+
+ private void testNumTuplesAndSizeIsZero() {
+ for (int i = 0; i < bufferManager.getNumPartitions(); ++i) {
+ assertEquals(0, bufferManager.getNumTuples(i));
+ assertEquals(0, bufferManager.getPhysicalSize(0));
+ }
+ }
+
+ @Test
+ public void testClearPartition() throws Exception {
+
+ Map<Integer, Integer> inMap = prepareFixedSizeTuples(10, 0, 0);
+ for (int pid = 0; pid < PARTITION; pid++) {
+ assertInsertOnePartitionToFull(pid, inMap);
+ assertClearFullPartitionIsTheSameAsReset();
+ }
+ }
+
+ private void assertClearFullPartitionIsTheSameAsReset() throws HyracksDataException {
+ for (int i = 0; i < PARTITION; i++) {
+ bufferManager.clearPartition(i);
+ }
+ testNumTuplesAndSizeIsZero();
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ testInsertToFull();
+ bufferManager.close();
+ }
+
+ @Override
+ ITuplePointerAccessor getTupleAccessor() {
+ return bufferManager.getTupleAccessor(recordDescriptor);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java
new file mode 100644
index 0000000..8dbe1f0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.BUDGET;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.MIN_FRAME_SIZE;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.NUM_MIN_FRAME;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.commonFrameManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class VariableFramePoolTest extends AbstractFramePoolTest {
+
+ @Before
+ public void setUp() throws Exception {
+ pool = new VariableFramePool(commonFrameManager, BUDGET);
+ }
+
+ @Test
+ public void testAllocateUniformFrameShouldSuccess() throws Exception {
+ testAllocateAllSpacesWithMinFrames();
+ testAllocateShouldFailAfterAllSpaceGetUsed();
+ pool.reset();
+ testAllocateAllSpacesWithMinFrames();
+ pool.close();
+ }
+
+ @Test
+ public void testResetShouldReuseExistingFrames() throws HyracksDataException {
+ Set<?> set1 = testAllocateAllSpacesWithMinFrames();
+ pool.reset();
+ Set<?> set2 = testAllocateAllSpacesWithMinFrames();
+ assertEquals(set1, set2);
+ pool.close();
+ }
+
+ @Test
+ public void testCloseShouldNotReuseExistingFrames() throws HyracksDataException {
+ Set<?> set1 = testAllocateAllSpacesWithMinFrames();
+ pool.close();
+ Set<?> set2 = testAllocateAllSpacesWithMinFrames();
+ assertFalse(set1.equals(set2));
+ pool.close();
+ }
+
+ @Test
+ public void testShouldReturnLargerFramesIfFitOneIsUsed() throws HyracksDataException {
+ Set<?> set = testAllocateVariableFrames();
+ pool.reset();
+ testShouldFindTheMatchFrames(set);
+ pool.reset();
+
+ // allocate seq: 1, 1, 2, 3, 4
+ ByteBuffer placeBuffer = pool.allocateFrame(MIN_FRAME_SIZE);
+ assertTrue(set.contains(new ByteBufferPtr(placeBuffer)));
+ for (int i = 1; i <= 4; i++) {
+ ByteBuffer buffer = pool.allocateFrame(i * MIN_FRAME_SIZE);
+ assertNotNull(buffer);
+ assertTrue(set.contains(new ByteBufferPtr(buffer)));
+ }
+ assertNull(pool.allocateFrame(MIN_FRAME_SIZE));
+ pool.close();
+ }
+
+ @Test
+ public void testShouldMergeIfNoLargerFrames() throws HyracksDataException {
+ Set<?> set = testAllocateAllSpacesWithMinFrames();
+ pool.reset();
+ int chunks = 5;
+ for (int i = 0; i < NUM_MIN_FRAME; i += chunks) {
+ ByteBuffer buffer = pool.allocateFrame(chunks * MIN_FRAME_SIZE);
+ assertNotNull(buffer);
+ assertTrue(!set.contains(new ByteBufferPtr(buffer)));
+ }
+ }
+
+ @Test
+ public void testUseMiddleSizeFrameAndNeedToMergeSmallAndBigger() throws HyracksDataException {
+ Set<?> set = testAllocateVariableFrames();
+ pool.reset();
+ // allocate seq: 3, 6, 1;
+ ByteBuffer buffer = pool.allocateFrame(3 * MIN_FRAME_SIZE);
+ assertTrue(set.contains(new ByteBufferPtr(buffer)));
+ buffer = pool.allocateFrame(6 * MIN_FRAME_SIZE);
+ assertFalse(set.contains(new ByteBufferPtr(buffer)));
+ buffer = pool.allocateFrame(1 * MIN_FRAME_SIZE);
+ assertTrue(set.contains(new ByteBufferPtr(buffer)));
+ assertEquals(5 * MIN_FRAME_SIZE, buffer.capacity());
+ pool.reset();
+ }
+}
\ No newline at end of file