You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2016/02/26 06:54:07 UTC
[10/11] incubator-asterixdb-hyracks git commit: Implemented the
memory-bounded HashGroupby and HashJoin for BigObject
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
new file mode 100644
index 0000000..1118bf3
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Manage the buffer space in the unit of frame.
+ */
+public interface IFrameBufferManager {
+
+ /**
+ * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
+ *
+ * @throws org.apache.hyracks.api.exceptions.HyracksDataException
+ */
+ void reset() throws HyracksDataException;
+
+ /**
+ * @param frameIndex
+ * @param bufferInfo the given object need to be reset
+ * @return the filled bufferInfo to facilitate the chain access
+ */
+ BufferInfo getFrame(int frameIndex, BufferInfo bufferInfo);
+
+ /**
+ * @return the number of frames in this buffer
+ */
+ int getNumFrames();
+
+ /**
+ * Writes the whole frame into the buffer.
+ *
+ * @param frame source frame
+ * @return the id of the inserted frame. return -1 if it failed to insert
+ */
+ int insertFrame(ByteBuffer frame) throws HyracksDataException;
+
+ void close();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
new file mode 100644
index 0000000..8a1e004
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+public interface IFrameFreeSlotPolicy {
+
+ /**
+ * Find the best fit frame id which can hold the data, and then pop it out from the index.
+ * Return -1 is failed to find any.
+ *
+ * @param tobeInsertedSize the actual size of the data which should include
+ * the meta data like the field offset and the tuple
+ * count extra size
+ * @return the best fit frame id
+ */
+ int popBestFit(int tobeInsertedSize);
+
+ /**
+ * Register the new free slot into the index
+ *
+ * @param frameID
+ * @param freeSpace
+ */
+ void pushNewFrame(int frameID, int freeSpace);
+
+ /**
+ * Clear all the existing free slot information.
+ */
+ void reset();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFramePool.java
new file mode 100644
index 0000000..2c6b01e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFramePool.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFramePool {
+
+ int getMinFrameSize();
+
+ int getMemoryBudgetBytes();
+
+ /**
+ * Get a frame of given size. <br>
+ * Returns {@code null} if failed to allocate the required size of frame
+ *
+ * @param frameSize the actual size of the frame.
+ * @return the allocated frame
+ * @throws HyracksDataException
+ */
+ ByteBuffer allocateFrame(int frameSize) throws HyracksDataException;
+
+ /**
+ * Reset the counters to initial status. This method should not release the pre-allocated resources.
+ */
+ void reset();
+
+ /**
+ * Release the pre-allocated resources.
+ */
+ void close();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedMemoryConstrain.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedMemoryConstrain.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedMemoryConstrain.java
new file mode 100644
index 0000000..1ba0745
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedMemoryConstrain.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+/**
+ * Provide a customized constraint for different partitions.
+ */
+public interface IPartitionedMemoryConstrain {
+ /**
+ * Set number of frames that can be used by the specific partition.
+ *
+ * @param partitionId
+ * @return
+ */
+ int frameLimit(int partitionId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
new file mode 100644
index 0000000..70ca2dd
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public interface IPartitionedTupleBufferManager {
+
+ int getNumPartitions();
+
+ int getNumTuples(int partition);
+
+ int getPhysicalSize(int partition);
+
+ /**
+ * Insert tuple from (byte[] byteArray,int[] fieldEndOffsets, int start, int size) into
+ * specified partition. The handle is written into the tuplepointer.
+ * <br>
+ * If {@code byteArray} contains the {@code fieldEndOffsets} already, then please set the {@code fieldEndOffsets} as NULL
+ *
+ * @param partition
+ * the id of the partition to insert the tuple into
+ * @param byteArray
+ * the byteArray which contains the tuple
+ * @param fieldEndOffsets
+ * the fieldEndOffsets which comes from the ArrayTupleBuilder, please set it to NULL if the {@code byteArray} already contains the fieldEndOffsets
+ * @param start
+ * the start offset in the {@code byteArray}
+ * @param size
+ * the size of the tuple
+ * @param pointer
+ * the returned pointer indicating the handler inside this buffer manager
+ * @return a boolean value to indicate if the insertion succeed or not
+ */
+ boolean insertTuple(int partition, byte[] byteArray, int[] fieldEndOffsets, int start, int size,
+ TuplePointer pointer) throws HyracksDataException;
+
+ /**
+ * Insert tuple {@code tupleId} from the {@code tupleAccessor} into the given partition.
+ * The returned handle is written into the tuplepointer
+ *
+ * @param partition
+ * the id of the partition to insert the tuple
+ * @param tupleAccessor
+ * the FrameTupleAccessor storage
+ * @param tupleId
+ * the id of the tuple from the tupleAccessor
+ * @param pointer
+ * the returned pointer indicating the handler to later fetch the tuple from the buffer maanager
+ * @return true if the insertion succeed. Otherwise return false.
+ * @throws HyracksDataException
+ */
+ boolean insertTuple(int partition, IFrameTupleAccessor tupleAccessor, int tupleId, TuplePointer pointer)
+ throws HyracksDataException;
+
+ /**
+ * Reset to the initial states. The previous allocated resources won't be released in order to be used in the next round.
+ *
+ * @throws HyracksDataException
+ */
+ void reset() throws HyracksDataException;
+
+ /**
+ * Close the managers which will explicitly release all the allocated resources.
+ */
+ void close();
+
+ ITuplePointerAccessor getTupleAccessor(RecordDescriptor recordDescriptor);
+
+ /**
+ * Flush the particular partition {@code pid} to {@code writer}.
+ * This partition will not be cleared.
+ * Currently it is used by Join where we flush the inner partition to the join (as a frameWriter),
+ * but we will still keep the inner for the next outer partition.
+ *
+ * @param pid
+ * @param writer
+ * @throws HyracksDataException
+ */
+ void flushPartition(int pid, IFrameWriter writer) throws HyracksDataException;
+
+ /**
+ * Clear the memory occupation of the particular partition.
+ *
+ * @param partition
+ * @throws HyracksDataException
+ */
+ void clearPartition(int partition) throws HyracksDataException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferManager.java
new file mode 100644
index 0000000..324401a
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * Manage the buffer space. Different from the {@link IFrameBufferManager}, this one allows the record level manipulation.
+ */
+public interface ITupleBufferManager {
+ /**
+ * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
+ *
+ * @throws org.apache.hyracks.api.exceptions.HyracksDataException
+ */
+ void reset() throws HyracksDataException;
+
+ /**
+ * @return the number of tuples in this buffer
+ */
+ int getNumTuples();
+
+ boolean insertTuple(IFrameTupleAccessor accessor, int idx, TuplePointer tuplePointer) throws HyracksDataException;
+
+ void close() throws HyracksDataException;
+
+ ITuplePointerAccessor createTupleAccessor();
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITuplePointerAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITuplePointerAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITuplePointerAccessor.java
new file mode 100644
index 0000000..a8ab7eb
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITuplePointerAccessor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * A cursor-like tuple level accessor to point to a tuple physical byte location inside the {@link ITupleBufferManager}
+ * Some of the BufferManger (e.g. {@link VariableDeletableTupleMemoryManager}are using the different frame structure as
+ * the common {@link org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor} does.
+ * In order to hide the complexity inside the buffer manager, clients can use this Accessor to navigate the internal record.
+ */
+public interface ITuplePointerAccessor extends IFrameTupleAccessor {
+ void reset(TuplePointer tuplePointer);
+
+ int getTupleStartOffset();
+
+ int getTupleLength();
+
+ int getAbsFieldStartOffset(int fieldId);
+
+ int getFieldLength(int fieldId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
new file mode 100644
index 0000000..f9387a9
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.util.BitSet;
+import java.util.function.IntUnaryOperator;
+
+/**
+ * This policy is used to decide which partition in {@link VPartitionTupleBufferManager} should be a victim when
+ * there is not enough space to insert new element.
+ */
+public class PreferToSpillFullyOccupiedFramePolicy {
+
+ private final IPartitionedTupleBufferManager bufferManager;
+ private final BitSet spilledStatus;
+ private final int minFrameSize;
+
+ public PreferToSpillFullyOccupiedFramePolicy(IPartitionedTupleBufferManager bufferManager, BitSet spilledStatus,
+ int minFrameSize) {
+ this.bufferManager = bufferManager;
+ this.spilledStatus = spilledStatus;
+ this.minFrameSize = minFrameSize;
+ }
+
+ public int selectVictimPartition(int failedToInsertPartition) {
+ // To avoid flush the half-full frame, it's better to spill itself.
+ if (bufferManager.getNumTuples(failedToInsertPartition) > 0) {
+ return failedToInsertPartition;
+ }
+ int partitionToSpill = findSpilledPartitionWithMaxMemoryUsage();
+ int maxToSpillPartSize = 0;
+ // if we couldn't find the already spilled partition, or it is too small to flush that one,
+ // try to flush an in memory partition.
+ if (partitionToSpill < 0
+ || (maxToSpillPartSize = bufferManager.getPhysicalSize(partitionToSpill)) == minFrameSize) {
+ int partitionInMem = findInMemPartitionWithMaxMemoryUsage();
+ if (partitionInMem >= 0 && bufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
+ partitionToSpill = partitionInMem;
+ }
+ }
+ return partitionToSpill;
+ }
+
+ public int findInMemPartitionWithMaxMemoryUsage() {
+ return findMaxSize(spilledStatus.nextClearBit(0), (i) -> spilledStatus.nextClearBit(i + 1));
+ }
+
+ public int findSpilledPartitionWithMaxMemoryUsage() {
+ return findMaxSize(spilledStatus.nextSetBit(0), (i) -> spilledStatus.nextSetBit(i + 1));
+ }
+
+ private int findMaxSize(int startIndex, IntUnaryOperator nextIndexOp) {
+ int pid = -1;
+ int max = 0;
+ for (int i = startIndex; i >= 0 && i < bufferManager.getNumPartitions(); i = nextIndexOp.applyAsInt(i)) {
+ int partSize = bufferManager.getPhysicalSize(i);
+ if (partSize > max) {
+ max = partSize;
+ pid = i;
+ }
+ }
+ return pid;
+ }
+
+ /**
+ * Create an constrain for the already spilled partition that it can only use at most one frame.
+ *
+ * @param spillStatus
+ * @return
+ */
+ public static IPartitionedMemoryConstrain createAtMostOneFrameForSpilledPartitionConstrain(BitSet spillStatus) {
+ return new IPartitionedMemoryConstrain() {
+ @Override
+ public int frameLimit(int partitionId) {
+ if (spillStatus.get(partitionId)) {
+ return 1;
+ }
+ return Integer.MAX_VALUE;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
new file mode 100644
index 0000000..ed54973
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FixedSizeFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * This buffer manager will dived the buffers into given number of partitions.
+ * The cleared partition (spilled one in the caller side) can only get no more than one frame.
+ */
+public class VPartitionTupleBufferManager implements IPartitionedTupleBufferManager {
+
+ private IDeallocatableFramePool framePool;
+ private IFrameBufferManager[] partitionArray;
+ private int[] numTuples;
+ private final FixedSizeFrame appendFrame;
+ private final FixedSizeFrameTupleAppender appender;
+ private BufferInfo tempInfo;
+ private final IPartitionedMemoryConstrain constrain;
+
+ public VPartitionTupleBufferManager(IHyracksFrameMgrContext ctx, IPartitionedMemoryConstrain constrain,
+ int partitions, int frameLimitInBytes) throws HyracksDataException {
+ this.constrain = constrain;
+ this.framePool = new DeallocatableFramePool(ctx, frameLimitInBytes);
+ this.partitionArray = new IFrameBufferManager[partitions];
+ this.numTuples = new int[partitions];
+ this.appendFrame = new FixedSizeFrame();
+ this.appender = new FixedSizeFrameTupleAppender();
+ this.tempInfo = new BufferInfo(null, -1, -1);
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ for (IFrameBufferManager part : partitionArray) {
+ if (part != null) {
+ for (int i = 0; i < part.getNumFrames(); i++) {
+ framePool.deAllocateBuffer(part.getFrame(i, tempInfo).getBuffer());
+ }
+ part.reset();
+ }
+ }
+ Arrays.fill(numTuples, 0);
+ appendFrame.reset(null);
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return partitionArray.length;
+ }
+
+ @Override
+ public int getNumTuples(int partition) {
+ return numTuples[partition];
+ }
+
+ @Override
+ public int getPhysicalSize(int partitionId) {
+ int size = 0;
+ IFrameBufferManager partition = partitionArray[partitionId];
+ if (partition != null) {
+ for (int i = 0; i < partition.getNumFrames(); ++i) {
+ size += partition.getFrame(i, tempInfo).getLength();
+ }
+ }
+ return size;
+ }
+
+ @Override
+ public void clearPartition(int partitionId) throws HyracksDataException {
+ IFrameBufferManager partition = partitionArray[partitionId];
+ if (partition != null) {
+ for (int i = 0; i < partition.getNumFrames(); ++i) {
+ framePool.deAllocateBuffer(partition.getFrame(i, tempInfo).getBuffer());
+ }
+ }
+ partitionArray[partitionId].reset();
+ numTuples[partitionId] = 0;
+ }
+
+ @Override
+ public boolean insertTuple(int partition, byte[] byteArray, int[] fieldEndOffsets, int start, int size,
+ TuplePointer pointer) throws HyracksDataException {
+ int actualSize = calculateActualSize(fieldEndOffsets, size);
+ int fid = getLastBufferOrCreateNewIfNotExist(partition, actualSize);
+ if (fid < 0) {
+ return false;
+ }
+ partitionArray[partition].getFrame(fid, tempInfo);
+ int tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
+ if (tid < 0) {
+ if (partitionArray[partition].getNumFrames() >= constrain.frameLimit(partition)) {
+ return false;
+ }
+ fid = createNewBuffer(partition, actualSize);
+ if (fid < 0) {
+ return false;
+ }
+ partitionArray[partition].getFrame(fid, tempInfo);
+ tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
+ }
+ pointer.reset(makeGroupFrameId(partition, fid), tid);
+ numTuples[partition]++;
+ return true;
+ }
+
+ @Override
+ public boolean insertTuple(int partition, IFrameTupleAccessor tupleAccessor, int tupleId, TuplePointer pointer)
+ throws HyracksDataException {
+ return insertTuple(partition, tupleAccessor.getBuffer().array(), null,
+ tupleAccessor.getTupleStartOffset(tupleId), tupleAccessor.getTupleLength(tupleId), pointer);
+ }
+
+ private static int calculateActualSize(int[] fieldEndOffsets, int size) {
+ if (fieldEndOffsets != null) {
+ return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
+ }
+ return FrameHelper.calcRequiredSpace(0, size);
+ }
+
+ private int makeGroupFrameId(int partition, int fid) {
+ return fid * getNumPartitions() + partition;
+ }
+
+ private int parsePartitionId(int externalFrameId) {
+ return externalFrameId % getNumPartitions();
+ }
+
+ private int parseFrameIdInPartition(int externalFrameId) {
+ return externalFrameId / getNumPartitions();
+ }
+
+ private int createNewBuffer(int partition, int size) throws HyracksDataException {
+ ByteBuffer newBuffer = requestNewBufferFromPool(size);
+ if (newBuffer == null) {
+ return -1;
+ }
+ appendFrame.reset(newBuffer);
+ appender.reset(appendFrame, true);
+ return partitionArray[partition].insertFrame(newBuffer);
+ }
+
+ private ByteBuffer requestNewBufferFromPool(int recordSize) throws HyracksDataException {
+ int frameSize = FrameHelper.calcAlignedFrameSizeToStore(0, recordSize, framePool.getMinFrameSize());
+ return framePool.allocateFrame(frameSize);
+ }
+
+ private int appendTupleToBuffer(BufferInfo bufferInfo, int[] fieldEndOffsets, byte[] byteArray, int start, int size)
+ throws HyracksDataException {
+ assert (bufferInfo.getStartOffset() == 0) : "Haven't supported yet in FrameTupleAppender";
+ if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
+ appendFrame.reset(bufferInfo.getBuffer());
+ appender.reset(appendFrame, false);
+ }
+ if (fieldEndOffsets == null) {
+ if (appender.append(byteArray, start, size)) {
+ return appender.getTupleCount() - 1;
+ }
+ } else {
+ if (appender.append(fieldEndOffsets, byteArray, start, size)) {
+ return appender.getTupleCount() - 1;
+ }
+ }
+
+ return -1;
+ }
+
+ private int getLastBufferOrCreateNewIfNotExist(int partition, int actualSize) throws HyracksDataException {
+ if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) {
+ partitionArray[partition] = new PartitionFrameBufferManager();
+ return createNewBuffer(partition, actualSize);
+ }
+ return partitionArray[partition].getNumFrames() - 1;
+ }
+
+ @Override
+ public void close() {
+ framePool.close();
+ Arrays.fill(partitionArray, null);
+ }
+
+ private class PartitionFrameBufferManager implements IFrameBufferManager {
+
+ ArrayList<ByteBuffer> buffers = new ArrayList<>();
+
+ @Override
+ public void reset() throws HyracksDataException {
+ buffers.clear();
+ }
+
+ @Override
+ public BufferInfo getFrame(int frameIndex, BufferInfo returnedInfo) {
+ returnedInfo.reset(buffers.get(frameIndex), 0, buffers.get(frameIndex).capacity());
+ return returnedInfo;
+ }
+
+ @Override
+ public int getNumFrames() {
+ return buffers.size();
+ }
+
+ @Override
+ public int insertFrame(ByteBuffer frame) throws HyracksDataException {
+ buffers.add(frame);
+ return buffers.size() - 1;
+ }
+
+ @Override
+ public void close() {
+ buffers = null;
+ }
+
+ }
+
+ @Override
+ public ITuplePointerAccessor getTupleAccessor(final RecordDescriptor recordDescriptor) {
+ return new AbstractTuplePointerAccessor() {
+ FrameTupleAccessor innerAccessor = new FrameTupleAccessor(recordDescriptor);
+
+ @Override
+ IFrameTupleAccessor getInnerAccessor() {
+ return innerAccessor;
+ }
+
+ @Override
+ void resetInnerAccessor(TuplePointer tuplePointer) {
+ partitionArray[parsePartitionId(tuplePointer.frameIndex)]
+ .getFrame(parseFrameIdInPartition(tuplePointer.frameIndex), tempInfo);
+ innerAccessor.reset(tempInfo.getBuffer(), tempInfo.getStartOffset(), tempInfo.getLength());
+ }
+ };
+ }
+
+ @Override
+ public void flushPartition(int pid, IFrameWriter writer) throws HyracksDataException {
+ IFrameBufferManager partition = partitionArray[pid];
+ if (partition != null && getNumTuples(pid) > 0) {
+ for (int i = 0; i < partition.getNumFrames(); ++i) {
+ partition.getFrame(i, tempInfo);
+ tempInfo.getBuffer().position(tempInfo.getStartOffset());
+ tempInfo.getBuffer().limit(tempInfo.getStartOffset() + tempInfo.getLength());
+ writer.nextFrame(tempInfo.getBuffer());
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
new file mode 100644
index 0000000..4359e49
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
+import org.apache.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * Enable the delete record operation in the memory management. This is only used in the {@link org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator}
+ */
+public class VariableDeletableTupleMemoryManager implements IDeletableTupleBufferManager {
+
+ private final static Logger LOG = Logger.getLogger(VariableDeletableTupleMemoryManager.class.getName());
+
+ private final int MIN_FREE_SPACE;
+ private final IFramePool pool;
+ private final IFrameFreeSlotPolicy policy;
+ private final IAppendDeletableFrameTupleAccessor accessor;
+ private final ArrayList<ByteBuffer> frames;
+ private final RecordDescriptor recordDescriptor;
+ private int numTuples;
+ private int statsReOrg;
+
+ public VariableDeletableTupleMemoryManager(IFramePool framePool, RecordDescriptor recordDescriptor) {
+ this.pool = framePool;
+ int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
+ this.policy = new FrameFreeSlotLastFit(maxFrames);
+ this.accessor = new DeletableFrameTupleAppender(recordDescriptor);
+ this.frames = new ArrayList<>();
+ this.MIN_FREE_SPACE = calculateMinFreeSpace(recordDescriptor);
+ this.recordDescriptor = recordDescriptor;
+ this.numTuples = 0;
+ this.statsReOrg = 0;
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ pool.reset();
+ policy.reset();
+ frames.clear();
+ numTuples = 0;
+ }
+
+ @Override
+ public int getNumTuples() {
+ return numTuples;
+ }
+
+ @Override
+ public boolean insertTuple(IFrameTupleAccessor fta, int idx, TuplePointer tuplePointer)
+ throws HyracksDataException {
+ int requiredFreeSpace = calculatePhysicalSpace(fta, idx);
+ int frameId = findAvailableFrame(requiredFreeSpace);
+ if (frameId < 0) {
+ if (canBeInsertedAfterCleanUpFragmentation(requiredFreeSpace)) {
+ reOrganizeFrames();
+ frameId = findAvailableFrame(requiredFreeSpace);
+ statsReOrg++;
+ } else {
+ return false;
+ }
+ }
+ assert frameId >= 0;
+ accessor.reset(frames.get(frameId));
+ assert accessor.getContiguousFreeSpace() >= requiredFreeSpace;
+ int tid = accessor.append(fta, idx);
+ assert tid >= 0;
+ tuplePointer.reset(frameId, tid);
+ if (accessor.getContiguousFreeSpace() > MIN_FREE_SPACE) {
+ policy.pushNewFrame(frameId, accessor.getContiguousFreeSpace());
+ }
+ numTuples++;
+ return true;
+ }
+
+ private void reOrganizeFrames() {
+ policy.reset();
+ for (int i = 0; i < frames.size(); i++) {
+ accessor.reset(frames.get(i));
+ accessor.reOrganizeBuffer();
+ policy.pushNewFrame(i, accessor.getContiguousFreeSpace());
+ }
+ }
+
+ private boolean canBeInsertedAfterCleanUpFragmentation(int requiredFreeSpace) {
+ for (int i = 0; i < frames.size(); i++) {
+ accessor.reset(frames.get(i));
+ if (accessor.getTotalFreeSpace() >= requiredFreeSpace) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private int findAvailableFrame(int requiredFreeSpace) throws HyracksDataException {
+ int frameId = policy.popBestFit(requiredFreeSpace);
+ if (frameId >= 0) {
+ return frameId;
+ }
+
+ int frameSize = calculateMinFrameSizeToPlaceTuple(requiredFreeSpace, pool.getMinFrameSize());
+ ByteBuffer buffer = pool.allocateFrame(frameSize);
+ if (buffer != null) {
+ accessor.clear(buffer);
+ frames.add(buffer);
+ return frames.size() - 1;
+ }
+ return -1;
+ }
+
+ private static int calculateMinFrameSizeToPlaceTuple(int requiredFreeSpace, int minFrameSize) {
+ return (1 + (requiredFreeSpace + 4 - 1) / minFrameSize) * minFrameSize;
+ }
+
+ private static int calculatePhysicalSpace(IFrameTupleAccessor fta, int idx) {
+ // 4 bytes to store the offset
+ return 4 + fta.getTupleLength(idx);
+ }
+
+ private static int calculateMinFreeSpace(RecordDescriptor recordDescriptor) {
+ // + 4 for the tuple offset
+ return recordDescriptor.getFieldCount() * 4 + 4;
+ }
+
+ @Override
+ public void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException {
+ accessor.reset(frames.get(tuplePointer.frameIndex));
+ accessor.delete(tuplePointer.tupleIndex);
+ numTuples--;
+ }
+
+ @Override
+ public void close() {
+ pool.close();
+ policy.reset();
+ frames.clear();
+ numTuples = 0;
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("VariableTupleMemoryManager has reorganized " + statsReOrg + " times");
+ }
+ statsReOrg = 0;
+ }
+
+ @Override
+ public ITuplePointerAccessor createTupleAccessor() {
+ return new AbstractTuplePointerAccessor() {
+ private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender(
+ recordDescriptor);
+
+ @Override
+ IFrameTupleAccessor getInnerAccessor() {
+ return bufferAccessor;
+ }
+
+ @Override
+ void resetInnerAccessor(TuplePointer tuplePointer) {
+ bufferAccessor.reset(frames.get(tuplePointer.frameIndex));
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
new file mode 100644
index 0000000..fe7de23
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class VariableFrameMemoryManager implements IFrameBufferManager {
+
+ class PhysicalFrameOffset {
+ ByteBuffer physicalFrame;
+ int physicalOffset;
+
+ PhysicalFrameOffset(ByteBuffer frame, int offset) {
+ physicalFrame = frame;
+ physicalOffset = offset;
+ }
+ }
+
+ private final IFramePool framePool;
+ private List<PhysicalFrameOffset> physicalFrameOffsets;
+ private List<BufferInfo> logicalFrameStartSizes;
+ private final IFrameFreeSlotPolicy freeSlotPolicy;
+
+ public VariableFrameMemoryManager(IFramePool framePool, IFrameFreeSlotPolicy freeSlotPolicy) {
+ this.framePool = framePool;
+ this.freeSlotPolicy = freeSlotPolicy;
+ this.physicalFrameOffsets = new ArrayList<>();
+ this.logicalFrameStartSizes = new ArrayList<>();
+ }
+
+ private int findAvailableFrame(int frameSize) throws HyracksDataException {
+ int frameId = freeSlotPolicy.popBestFit(frameSize);
+ if (frameId >= 0) {
+ return frameId;
+ }
+ ByteBuffer buffer = framePool.allocateFrame(frameSize);
+ if (buffer != null) {
+ IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), 0);
+ physicalFrameOffsets.add(new PhysicalFrameOffset(buffer, 0));
+ return physicalFrameOffsets.size() - 1;
+ }
+ return -1;
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ physicalFrameOffsets.clear();
+ logicalFrameStartSizes.clear();
+ freeSlotPolicy.reset();
+ framePool.reset();
+ }
+
+ @Override
+ public BufferInfo getFrame(int frameIndex, BufferInfo info) {
+ info.reset(logicalFrameStartSizes.get(frameIndex));
+ return info;
+ }
+
+ @Override
+ public int getNumFrames() {
+ return logicalFrameStartSizes.size();
+ }
+
+ @Override
+ public int insertFrame(ByteBuffer frame) throws HyracksDataException {
+ int frameSize = frame.capacity();
+ int physicalFrameId = findAvailableFrame(frameSize);
+ if (physicalFrameId < 0) {
+ return -1;
+ }
+ PhysicalFrameOffset frameOffset = physicalFrameOffsets.get(physicalFrameId);
+ ByteBuffer buffer = frameOffset.physicalFrame;
+ int offset = frameOffset.physicalOffset;
+ System.arraycopy(frame.array(), 0, buffer.array(), offset, frameSize);
+ if (offset + frameSize < buffer.capacity()) {
+ freeSlotPolicy.pushNewFrame(physicalFrameId, buffer.capacity() - offset - frameSize);
+ }
+ frameOffset.physicalOffset = offset + frameSize;
+ logicalFrameStartSizes.add(new BufferInfo(buffer, offset, frameSize));
+ return logicalFrameStartSizes.size() - 1;
+ }
+
+ @Override
+ public void close() {
+ physicalFrameOffsets.clear();
+ logicalFrameStartSizes.clear();
+ freeSlotPolicy.reset();
+ framePool.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePool.java
new file mode 100644
index 0000000..f83eb8b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePool.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class VariableFramePool implements IFramePool {
+ public static final int UNLIMITED_MEMORY = -1;
+
+ private final IHyracksFrameMgrContext ctx;
+ private final int minFrameSize;
+ private final int memBudget;
+
+ private int allocateMem;
+ private ArrayList<ByteBuffer> buffers; // the unused slots were sorted by size increasingly.
+ private BitSet used; // the merged one also marked as used.
+
+ /**
+ * The constructor of the VariableFramePool.
+ *
+ * @param ctx
+ * @param memBudgetInBytes
+ * the given memory budgets to allocate the frames. If it less than 0, it will be treated as unlimited budgets
+ */
+ public VariableFramePool(IHyracksFrameMgrContext ctx, int memBudgetInBytes) {
+ this.ctx = ctx;
+ this.minFrameSize = ctx.getInitialFrameSize();
+ this.allocateMem = 0;
+ if (memBudgetInBytes == UNLIMITED_MEMORY) {
+ this.memBudget = Integer.MAX_VALUE;
+ this.buffers = new ArrayList<>();
+ this.used = new BitSet();
+ } else {
+ this.memBudget = memBudgetInBytes;
+ this.buffers = new ArrayList<>(memBudgetInBytes / minFrameSize);
+ this.used = new BitSet(memBudgetInBytes / minFrameSize);
+ }
+ }
+
+ @Override
+ public int getMinFrameSize() {
+ return minFrameSize;
+ }
+
+ @Override
+ public int getMemoryBudgetBytes() {
+ return memBudget;
+ }
+
+ @Override
+ public ByteBuffer allocateFrame(int frameSize) throws HyracksDataException {
+ int frameId = findExistingFrame(frameSize);
+ if (frameId >= 0) {
+ return reuseFrame(frameId);
+ }
+ if (haveEnoughFreeSpace(frameSize)) {
+ return createNewFrame(frameSize);
+ }
+ return mergeExistingFrames(frameSize);
+
+ }
+
+ private boolean haveEnoughFreeSpace(int frameSize) {
+ return frameSize + allocateMem <= memBudget;
+ }
+
+ private static int getFirstUnusedPos(BitSet used) {
+ return used.nextClearBit(0);
+ }
+
+ private static int getLastUnusedPos(BitSet used, int lastPos) {
+ return used.previousClearBit(lastPos);
+ }
+
+ private static int binarySearchUnusedBuffer(ArrayList<ByteBuffer> buffers, BitSet used, int frameSize) {
+ int l = getFirstUnusedPos(used); // to skip the merged null buffers
+ int h = getLastUnusedPos(used, (buffers.size() - 1)) + 1; // to skip the newly created buffers
+ if (l >= h) {
+ return -1;
+ }
+ int highest = h;
+ int mid = (l + h) / 2;
+ while (l < h) {
+ ByteBuffer buffer = buffers.get(mid);
+ if (buffer.capacity() == frameSize) {
+ break;
+ }
+ if (buffer.capacity() < frameSize) {
+ l = mid + 1;
+ } else {
+ h = mid;
+ }
+ mid = (l + h) / 2;
+ }
+ mid = used.nextClearBit(mid);
+ return mid < highest ? mid : -1;
+ }
+
+ private int findExistingFrame(int frameSize) {
+ return binarySearchUnusedBuffer(buffers, used, frameSize);
+ }
+
+ private ByteBuffer reuseFrame(int id) {
+ used.set(id);
+ buffers.get(id).clear();
+ return buffers.get(id);
+ }
+
+ private ByteBuffer createNewFrame(int frameSize) throws HyracksDataException {
+ buffers.add(ctx.allocateFrame(frameSize));
+ allocateMem += frameSize;
+ return reuseFrame(buffers.size() - 1);
+ }
+
+ /**
+ * The merging sequence is from the smallest to the largest order.
+ * Once the buffer get merged, it will be remove from the list in order to free the object.
+ * And the index spot of it will be marked as used.
+ *
+ * @param frameSize
+ * @return
+ * @throws HyracksDataException
+ */
+ private ByteBuffer mergeExistingFrames(int frameSize) throws HyracksDataException {
+ int mergedSize = memBudget - allocateMem;
+ int highBound = getLastUnusedPos(used, buffers.size() - 1) + 1;
+ for (int i = getFirstUnusedPos(used); i < highBound; ++i) {
+ if (!used.get(i)) {
+ mergedSize += deAllocateFrame(i);
+ if (mergedSize >= frameSize) {
+ return createNewFrame(mergedSize);
+ }
+ }
+ }
+ return null;
+ }
+
+ private int deAllocateFrame(int id) {
+ ByteBuffer frame = buffers.get(id);
+ ctx.deallocateFrames(frame.capacity());
+ buffers.set(id, null);
+ used.set(id);
+ allocateMem -= frame.capacity();
+ return frame.capacity();
+ }
+
+ @Override
+ public void reset() {
+ removeEmptySpot(buffers);
+ Collections.sort(buffers, sizeByteBufferComparator);
+ used.clear();
+ }
+
+ private static void removeEmptySpot(List<ByteBuffer> buffers) {
+ for (int i = 0; i < buffers.size();) {
+ if (buffers.get(i) == null) {
+ buffers.remove(i);
+ } else {
+ i++;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ buffers.clear();
+ used.clear();
+ allocateMem = 0;
+ }
+
+ private static Comparator<ByteBuffer> sizeByteBufferComparator = new Comparator<ByteBuffer>() {
+ @Override
+ public int compare(ByteBuffer o1, ByteBuffer o2) {
+ if (o1.capacity() == o2.capacity()) {
+ return 0;
+ }
+ return o1.capacity() < o2.capacity() ? -1 : 1;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
deleted file mode 100644
index 06c46b0..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public abstract class AbstractRunningAggregatorDescriptor implements IAggregatorDescriptor {
-
- /* (non-Javadoc)
- * @see org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor#outputPartialResult(org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, org.apache.hyracks.api.comm.IFrameTupleAccessor, int, org.apache.hyracks.dataflow.std.group.AggregateState)
- */
- @Override
- public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor#outputFinalResult(org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, org.apache.hyracks.api.comm.IFrameTupleAccessor, int, org.apache.hyracks.dataflow.std.group.AggregateState)
- */
- @Override
- public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AggregateType.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AggregateType.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AggregateType.java
new file mode 100644
index 0000000..830ab89
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AggregateType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.group;
+
+public enum AggregateType {
+ PARTIAL,
+ FINAL
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/FrameToolsForGroupers.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/FrameToolsForGroupers.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/FrameToolsForGroupers.java
deleted file mode 100644
index 7416677..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/FrameToolsForGroupers.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.FrameHelper;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-/**
- *
- */
-public class FrameToolsForGroupers {
-
- public static void writeFields(byte[] buf, int offset, int length, ArrayTupleBuilder tupleBuilder)
- throws HyracksDataException {
- writeFields(buf, offset, length, tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize());
- }
-
- public static void writeFields(byte[] buf, int offset, int length, int[] fieldsOffset, byte[] data, int dataOffset,
- int dataLength) throws HyracksDataException {
- if (dataLength + 4 * fieldsOffset.length > length) {
- throw new HyracksDataException("Out of buffer bound: try to write too much data (" + dataLength
- + ") to the given bound (" + length + ").");
- }
-
- ByteBuffer buffer = ByteBuffer.wrap(buf, offset, length);
- for (int i = 0; i < fieldsOffset.length; i++) {
- buffer.putInt(fieldsOffset[i]);
- }
- buffer.put(data, dataOffset, dataLength);
- }
-
- public static void updateFrameMetaForNewTuple(ByteBuffer buffer, int addedTupleLength) throws HyracksDataException {
- int currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
- int currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
- * currentTupleCount);
- int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
-
- // update tuple end offset
- buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4 * (currentTupleCount + 1),
- newTupleEndOffset);
- // Update the tuple count
- buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()), currentTupleCount + 1);
- }
-
- public static void updateFrameMetaForNewTuple(ByteBuffer buffer, int addedTupleLength, boolean isReset)
- throws HyracksDataException {
- int currentTupleCount;
- int currentTupleEndOffset;
- if (isReset) {
- currentTupleCount = 0;
- currentTupleEndOffset = 0;
- } else {
- currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
- currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
- * currentTupleCount);
- }
- int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
-
- // update tuple end offset
- buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4 * (currentTupleCount + 1),
- newTupleEndOffset);
- // Update the tuple count
- buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()), currentTupleCount + 1);
- }
-
- public static boolean isFrameOverflowing(ByteBuffer buffer, int length, boolean isReset)
- throws HyracksDataException {
-
- int currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
- if (currentTupleCount == 0 || isReset) {
- return length + 4 + 4 > buffer.capacity();
- }
- int currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
- * currentTupleCount);
- return currentTupleEndOffset + length + 4 + (currentTupleCount + 1) * 4 > buffer.capacity();
- }
-}