You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:25 UTC
[35/50] [abbrv] asterixdb git commit: snapshot - reuse frames in
deletable partition manager.
snapshot - reuse frames in deletable partition manager.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/6c312141
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/6c312141
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/6c312141
Branch: refs/heads/ecarm002/interval_join_merge
Commit: 6c3121413762f3556d884e16642cefa654e42868
Parents: f186da9
Author: Preston Carman <pr...@apache.org>
Authored: Tue Sep 13 19:37:32 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Sep 13 19:37:32 2016 -0700
----------------------------------------------------------------------
.../operators/joins/IntervalJoinUtil.java | 1 +
.../intervalindex/IntervalIndexJoiner.java | 70 ++++++++++++--------
.../InMemoryIntervalPartitionJoin.java | 5 +-
.../std/buffermanager/IFrameBufferManager.java | 24 ++++++-
...IPartitionedDeletableTupleBufferManager.java | 2 +
.../VPartitionDeletableTupleBufferManager.java | 50 ++++++++++++--
.../VPartitionTupleBufferManager.java | 57 ++++++++++++++--
.../VariableFrameMemoryManager.java | 23 +++++++
.../hyracks/dataflow/std/join/MergeJoiner.java | 53 ++++++++-------
.../dataflow/std/join/RunFileStream.java | 1 +
.../sort/util/DeletableFrameTupleAppender.java | 26 +++++++-
.../IAppendDeletableFrameTupleAccessor.java | 3 +
12 files changed, 249 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java
index 3e380c7..7c37f08 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java
@@ -48,6 +48,7 @@ public class IntervalJoinUtil {
}
public static long getIntervalStart(IFrameTupleAccessor accessor, int tupleId, int fieldId) {
+ int length = accessor.getTupleLength(tupleId);
int start = accessor.getTupleStartOffset(tupleId) + accessor.getFieldSlotsLength()
+ accessor.getFieldStartOffset(tupleId, fieldId) + 1;
long intervalStart = AIntervalSerializerDeserializer.getIntervalStart(accessor.getBuffer().array(), start);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index 965411b..d3aaa65 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -73,7 +73,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
private long joinComparisonCount = 0;
private long joinResultCount = 0;
- private long spillCount = 0;
private long leftSpillCount = 0;
private long rightSpillCount = 0;
private long[] spillFileCount = { 0, 0 };
@@ -104,16 +103,21 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
throw new HyracksDataException(
"IntervalIndexJoiner does not have enough memory (needs > 4, got " + memorySize + ").");
}
+ // bufferManager = new VPartitionDeletableTupleBufferManager(ctx,
+ // VPartitionDeletableTupleBufferManager.NO_CONSTRAIN, JOIN_PARTITIONS,
+ // (memorySize - 4) * ctx.getInitialFrameSize(), recordDescriptors);
bufferManager = new VPartitionDeletableTupleBufferManager(ctx,
VPartitionDeletableTupleBufferManager.NO_CONSTRAIN, JOIN_PARTITIONS,
- (memorySize - 4) * ctx.getInitialFrameSize(), recordDescriptors);
+ memorySize * ctx.getInitialFrameSize(), recordDescriptors);
memoryAccessor = new ITuplePointerAccessor[JOIN_PARTITIONS];
memoryAccessor[LEFT_PARTITION] = bufferManager.getTuplePointerAccessor(leftRd);
memoryAccessor[RIGHT_PARTITION] = bufferManager.getTuplePointerAccessor(rightRd);
activeManager = new ActiveSweepManager[JOIN_PARTITIONS];
- activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, leftKey, LEFT_PARTITION, endPointComparator);
- activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, rightKey, RIGHT_PARTITION, endPointComparator);
+ activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, leftKey, LEFT_PARTITION,
+ endPointComparator);
+ activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, rightKey, RIGHT_PARTITION,
+ endPointComparator);
// Run files for both branches
runFileStream = new RunFileStream[JOIN_PARTITIONS];
@@ -144,11 +148,12 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
runFileStream[LEFT_PARTITION].close();
runFileStream[RIGHT_PARTITION].close();
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
- + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount() + " files, "
- + runFileStream[LEFT_PARTITION].getWriteCount() + " written, " + runFileStream[LEFT_PARTITION].getReadCount()
- + " read]. right[" + rightSpillCount + " spills, " + runFileStream[RIGHT_PARTITION].getFileCount()
- + " files, " + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, "
+ LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+ + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount()
+ + " files, " + runFileStream[LEFT_PARTITION].getWriteCount() + " written, "
+ + runFileStream[LEFT_PARTITION].getReadCount() + " read]. right[" + rightSpillCount + " spills, "
+ + runFileStream[RIGHT_PARTITION].getFileCount() + " files, "
+ + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, "
+ runFileStream[RIGHT_PARTITION].getReadCount() + " read].");
}
}
@@ -241,14 +246,16 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
long leftStart = IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey);
long rightStart = IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey);
if (leftStart < rightStart) {
- return activeManager[RIGHT_PARTITION].hasRecords() && activeManager[RIGHT_PARTITION].getTopPoint() < leftStart;
+ return activeManager[RIGHT_PARTITION].hasRecords()
+ && activeManager[RIGHT_PARTITION].getTopPoint() < leftStart;
} else {
- return !(activeManager[LEFT_PARTITION].hasRecords() && activeManager[LEFT_PARTITION].getTopPoint() < rightStart);
+ return !(activeManager[LEFT_PARTITION].hasRecords()
+ && activeManager[LEFT_PARTITION].getTopPoint() < rightStart);
}
}
private boolean checkToProcessAdd(long startMemory, long endMemory) {
- return startMemory <= endMemory;
+ return startMemory < endMemory;
}
private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException {
@@ -260,8 +267,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
if (checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), sweep)
|| !imjc.checkToRemoveRightActive()) {
// Add individual tuples.
- processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], inputAccessor[LEFT_PARTITION], true,
- writer);
+ processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION],
+ inputAccessor[LEFT_PARTITION], true, writer);
runFileStream[LEFT_PARTITION].addToRunFile(inputAccessor[LEFT_PARTITION]);
inputAccessor[LEFT_PARTITION].next();
ts = loadLeftTuple();
@@ -293,8 +300,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
if (checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey), sweep)
|| !imjc.checkToRemoveLeftActive()) {
// Add individual tuples.
- processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION], false,
- writer);
+ processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION],
+ inputAccessor[RIGHT_PARTITION], false, writer);
runFileStream[RIGHT_PARTITION].addToRunFile(inputAccessor[RIGHT_PARTITION]);
inputAccessor[RIGHT_PARTITION].next();
ts = loadRightTuple();
@@ -320,8 +327,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
private void processLeftTuple(IFrameWriter writer) throws HyracksDataException {
// Process endpoints
do {
- if ((!activeManager[LEFT_PARTITION].hasRecords() || checkToProcessAdd(
- IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), activeManager[LEFT_PARTITION].getTopPoint()))
+ if ((!activeManager[LEFT_PARTITION].hasRecords()
+ || checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey),
+ activeManager[LEFT_PARTITION].getTopPoint()))
|| !imjc.checkToRemoveLeftActive()) {
// Add to active, end point index and buffer.
TuplePointer tp = new TuplePointer();
@@ -341,8 +349,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
// Add Results
if (!buffer.isEmpty()) {
- processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, memoryAccessor[LEFT_PARTITION],
- true, writer);
+ processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer,
+ memoryAccessor[LEFT_PARTITION], true, writer);
}
}
@@ -371,8 +379,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
// Add Results
if (!buffer.isEmpty()) {
- processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, memoryAccessor[RIGHT_PARTITION],
- false, writer);
+ processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer,
+ memoryAccessor[RIGHT_PARTITION], false, writer);
}
}
@@ -411,6 +419,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
}
private void freezeAndSpill() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("freeze snapshot: " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
+ + " left, left[" + bufferManager.getNumTuples(LEFT_PARTITION) + " memory]. right["
+ + bufferManager.getNumTuples(RIGHT_PARTITION) + " memory].");
+ }
if (bufferManager.getNumTuples(LEFT_PARTITION) > bufferManager.getNumTuples(RIGHT_PARTITION)) {
runFileStream[RIGHT_PARTITION].startRunFile();
if (LOGGER.isLoggable(Level.FINE)) {
@@ -418,6 +431,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
+ bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: "
+ bufferManager.getNumTuples(RIGHT_PARTITION) + ")");
}
+ bufferManager.printStats("memory details");
rightSpillCount++;
} else {
runFileStream[LEFT_PARTITION].startRunFile();
@@ -426,9 +440,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
+ bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: "
+ bufferManager.getNumTuples(RIGHT_PARTITION) + ")");
}
+ bufferManager.printStats("memory details");
leftSpillCount++;
}
- spillCount++;
}
private void continueStream(int diskPartition, ITupleAccessor accessor) throws HyracksDataException {
@@ -443,12 +457,14 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor) throws HyracksDataException {
int flushPartition = frozenPartition == LEFT_PARTITION ? RIGHT_PARTITION : LEFT_PARTITION;
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("snapshot(" + frozenPartition + "): " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
- + " left, left[" + leftSpillCount + " spills, "
+ LOGGER.warning("snapshot(" + frozenPartition + "): " + frameCounts[RIGHT_PARTITION] + " right, "
+ + frameCounts[LEFT_PARTITION] + " left, left[" + bufferManager.getNumTuples(LEFT_PARTITION)
+ + " memory, " + leftSpillCount + " spills, "
+ (runFileStream[LEFT_PARTITION].getFileCount() - spillFileCount[LEFT_PARTITION]) + " files, "
+ (runFileStream[LEFT_PARTITION].getWriteCount() - spillWriteCount[LEFT_PARTITION]) + " written, "
- + (runFileStream[LEFT_PARTITION].getReadCount() - spillReadCount[LEFT_PARTITION]) + " read]. right[" + rightSpillCount
- + " spills, " + (runFileStream[RIGHT_PARTITION].getFileCount() - spillFileCount[RIGHT_PARTITION]) + " files, "
+ + (runFileStream[LEFT_PARTITION].getReadCount() - spillReadCount[LEFT_PARTITION]) + " read]. right["
+ + bufferManager.getNumTuples(RIGHT_PARTITION) + " memory, " + +rightSpillCount + " spills, "
+ + (runFileStream[RIGHT_PARTITION].getFileCount() - spillFileCount[RIGHT_PARTITION]) + " files, "
+ (runFileStream[RIGHT_PARTITION].getWriteCount() - spillWriteCount[RIGHT_PARTITION]) + " written, "
+ (runFileStream[RIGHT_PARTITION].getReadCount() - spillReadCount[RIGHT_PARTITION]) + " read].");
spillFileCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getFileCount();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
index 88ff727..aeea209 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
@@ -69,7 +69,9 @@ public class InMemoryIntervalPartitionJoin {
public void join(IFrameTupleAccessor accessorProbe, int probeTupleIndex, IFrameWriter writer)
throws HyracksDataException {
if (fbm.getNumFrames() != 0) {
- for (int frameIndex = 0; frameIndex < fbm.getNumFrames(); ++frameIndex) {
+ fbm.resetIterator();
+ int frameIndex = fbm.next();
+ while (fbm.exists()) {
fbm.getFrame(frameIndex, bufferInfo);
accessorBuild.reset(bufferInfo.getBuffer());
for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
@@ -79,6 +81,7 @@ public class InMemoryIntervalPartitionJoin {
}
joinComparisonCount++;
}
+ frameIndex = fbm.next();
}
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
index 1118bf3..5741071 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
@@ -36,7 +36,8 @@ public interface IFrameBufferManager {
/**
* @param frameIndex
- * @param bufferInfo the given object need to be reset
+ * @param bufferInfo
+ * the given object need to be reset
* @return the filled bufferInfo to facilitate the chain access
*/
BufferInfo getFrame(int frameIndex, BufferInfo bufferInfo);
@@ -49,11 +50,30 @@ public interface IFrameBufferManager {
/**
* Writes the whole frame into the buffer.
*
- * @param frame source frame
+ * @param frame
+ * source frame
* @return the id of the inserted frame. return -1 if it failed to insert
*/
int insertFrame(ByteBuffer frame) throws HyracksDataException;
+ /**
+ * Removes the frame from the buffer manager
+ *
+ * @param frameIndex
+ */
+ void removeFrame(int frameIndex);
+
void close();
+ /**
+ * Create a iterator for frames.
+ *
+ * Allows the reuse of frame ids.
+ */
+ int next();
+
+ boolean exists();
+
+ void resetIterator();
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java
index df98e88..52a0918 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java
@@ -26,4 +26,6 @@ public interface IPartitionedDeletableTupleBufferManager extends IPartitionedTup
void deleteTuple(int partition, TuplePointer tuplePointer) throws HyracksDataException;
+ void printStats(String string);
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java
index 1e1127d..6802e19 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java
@@ -34,7 +34,7 @@ import org.apache.hyracks.dataflow.std.structures.TuplePointer;
public class VPartitionDeletableTupleBufferManager extends VPartitionTupleBufferManager
implements IPartitionedDeletableTupleBufferManager {
- private static int[] minFreeSpace;
+ private final int[] minFreeSpace;
private final IAppendDeletableFrameTupleAccessor[] accessor;
private final IFrameFreeSlotPolicy[] policy;
@@ -85,8 +85,11 @@ public class VPartitionDeletableTupleBufferManager extends VPartitionTupleBuffer
public void clearPartition(int partitionId) throws HyracksDataException {
IFrameBufferManager partition = partitionArray[partitionId];
if (partition != null) {
- for (int i = 0; i < partition.getNumFrames(); ++i) {
+ partition.resetIterator();
+ int i = partition.next();
+ while (partition.exists()) {
accessor[partitionId].clear(partition.getFrame(i, tempInfo).getBuffer());
+ i = partition.next();
}
}
policy[partitionId].reset();
@@ -94,22 +97,36 @@ public class VPartitionDeletableTupleBufferManager extends VPartitionTupleBuffer
}
private void reOrganizeFrames(int partition) {
+ System.err.printf("reOrganizeFrames -- %d:[", partition);
policy[partition].reset();
- for (int i = 0; i < partitionArray[partition].getNumFrames(); i++) {
- partitionArray[partition].getFrame(i, tempInfo);
+ partitionArray[partition].resetIterator();
+ int f = partitionArray[partition].next();
+ while (partitionArray[partition].exists()) {
+ partitionArray[partition].getFrame(f, tempInfo);
accessor[partition].reset(tempInfo.getBuffer());
accessor[partition].reOrganizeBuffer();
- policy[partition].pushNewFrame(i, accessor[partition].getContiguousFreeSpace());
+ if (accessor[partition].getTupleCount() == 0) {
+ partitionArray[partition].removeFrame(f);
+ framePool.deAllocateBuffer(tempInfo.getBuffer());
+ } else {
+ policy[partition].pushNewFrame(f, accessor[partition].getContiguousFreeSpace());
+ accessor[partition].printStats(System.err);
+ }
+ f = partitionArray[partition].next();
}
+ System.err.println("] ");
}
private boolean canBeInsertedAfterCleanUpFragmentation(int partition, int requiredFreeSpace) {
- for (int i = 0; i < partitionArray[partition].getNumFrames(); i++) {
+ partitionArray[partition].resetIterator();
+ int i = partitionArray[partition].next();
+ while (partitionArray[partition].exists()) {
partitionArray[partition].getFrame(i, tempInfo);
accessor[partition].reset(tempInfo.getBuffer());
if (accessor[partition].getTotalFreeSpace() >= requiredFreeSpace) {
return true;
}
+ i = partitionArray[partition].next();
}
return false;
}
@@ -135,6 +152,27 @@ public class VPartitionDeletableTupleBufferManager extends VPartitionTupleBuffer
return recordDescriptor.getFieldCount() * 4 + 4;
}
+ public void printStats(String message) {
+ System.err.print(String.format("%1$-" + 15 + "s", message) + " --");
+
+ for (int p = 0; p < partitionArray.length; ++p) {
+ System.err.printf("%d:[", p);
+ IFrameBufferManager partition = partitionArray[p];
+ if (partition != null) {
+ partitionArray[p].resetIterator();
+ int f = partitionArray[p].next();
+ while (partitionArray[p].exists()) {
+ partitionArray[p].getFrame(f, tempInfo);
+ accessor[p].reset(tempInfo.getBuffer());
+ accessor[p].printStats(System.err);
+ f = partitionArray[p].next();
+ }
+ }
+ System.err.printf("] ");
+ }
+ System.err.println();
+ }
+
@Override
public void deleteTuple(int partition, TuplePointer tuplePointer) throws HyracksDataException {
partitionArray[parsePartitionId(tuplePointer.getFrameIndex())]
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4a4cb5d..a094061 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -110,8 +110,11 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
public void clearPartition(int partitionId) throws HyracksDataException {
IFrameBufferManager partition = partitionArray[partitionId];
if (partition != null) {
- for (int i = 0; i < partition.getNumFrames(); ++i) {
+ partition.resetIterator();
+ int i = partition.next();
+ while (partition.exists()) {
framePool.deAllocateBuffer(partition.getFrame(i, tempInfo).getBuffer());
+ i = partition.next();
}
partition.reset();
}
@@ -221,11 +224,13 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
static class PartitionFrameBufferManager implements IFrameBufferManager {
+ int size = 0;
ArrayList<ByteBuffer> buffers = new ArrayList<>();
@Override
public void reset() throws HyracksDataException {
buffers.clear();
+ size = 0;
}
@Override
@@ -236,13 +241,35 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
@Override
public int getNumFrames() {
- return buffers.size();
+ return size;
}
@Override
public int insertFrame(ByteBuffer frame) throws HyracksDataException {
- buffers.add(frame);
- return buffers.size() - 1;
+ int index = -1;
+ if (buffers.size() == size) {
+ buffers.add(frame);
+ index = buffers.size() - 1;
+ } else {
+ for (int i = 0; i < buffers.size(); ++i) {
+ if (buffers.get(i) == null) {
+ buffers.set(i, frame);
+ index = i;
+ break;
+ }
+ }
+ }
+ if (index == -1) {
+ throw new HyracksDataException("Did not insert frame.");
+ }
+ size++;
+ return index;
+ }
+
+ @Override
+ public void removeFrame(int frameIndex) {
+ buffers.set(frameIndex, null);
+ size--;
}
@Override
@@ -250,6 +277,28 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
buffers = null;
}
+ int iterator = -1;
+
+ @Override
+ public int next() {
+ while (++iterator < buffers.size()) {
+ if (buffers.get(iterator) != null) {
+ break;
+ }
+ }
+ return iterator;
+ }
+
+ @Override
+ public boolean exists() {
+ return iterator < buffers.size() && buffers.get(iterator) != null;
+ }
+
+ @Override
+ public void resetIterator() {
+ iterator = -1;
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
index 6604ba8..7c750d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
@@ -104,10 +104,33 @@ public class VariableFrameMemoryManager implements IFrameBufferManager {
}
@Override
+ public void removeFrame(int frameIndex) {
+ logicalFrameStartSizes.remove(frameIndex);
+ }
+
+ @Override
public void close() {
physicalFrameOffsets.clear();
logicalFrameStartSizes.clear();
freeSlotPolicy.reset();
framePool.close();
}
+
+ @Override
+ public int next() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public boolean exists() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void resetIterator() {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index c1a828f..9c93828 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -60,9 +60,9 @@ public class MergeJoiner extends AbstractMergeJoiner {
private long joinComparisonCount = 0;
private long joinResultCount = 0;
-// private long spillFileCount = 0;
-// private long spillWriteCount = 0;
-// private long spillReadCount = 0;
+ private long spillFileCount = 0;
+ private long spillWriteCount = 0;
+ private long spillReadCount = 0;
private long spillCount = 0;
public MergeJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, MergeJoinLocks locks,
@@ -196,7 +196,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
if (status.branch[LEFT_PARTITION].isRunFileWriting()) {
// Left side from disk
leftTs = processLeftTupleSpill(writer);
- } else if (rightTs.isLoaded() && mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION])) {
+ } else if (rightTs.isLoaded()
+ && mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION])) {
// Right side from stream
processRightTuple();
rightTs = loadRightTuple();
@@ -227,15 +228,15 @@ public class MergeJoiner extends AbstractMergeJoiner {
if (memoryHasTuples()) {
for (int i = memoryBuffer.size() - 1; i > -1; --i) {
memoryAccessor.reset(memoryBuffer.get(i));
- if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
- memoryBuffer.get(i).getTupleIndex(), false)) {
+ if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
+ memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) {
// add to result
- addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
- memoryBuffer.get(i).getTupleIndex(), writer);
+ addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
+ memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer);
}
joinComparisonCount++;
- if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
- memoryBuffer.get(i).getTupleIndex())) {
+ if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
+ memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
// remove from memory
// TuplePrinterUtil.printTuple("Remove Memory", memoryAccessor, memoryBuffer.get(i).getTupleIndex());
removeFromMemory(memoryBuffer.get(i));
@@ -259,7 +260,12 @@ public class MergeJoiner extends AbstractMergeJoiner {
}
private void freezeAndSpill() throws HyracksDataException {
-// System.err.println("freezeAndSpill");
+ // System.err.println("freezeAndSpill");
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("freeze snapshot: " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
+ + " left, " + joinComparisonCount + " comparisons, " + joinResultCount + " results, ["
+ + bufferManager.getNumTuples() + " tuples memory].");
+ }
runFileStream.startRunFile();
if (LOGGER.isLoggable(Level.FINE)) {
@@ -270,7 +276,7 @@ public class MergeJoiner extends AbstractMergeJoiner {
}
private void continueStream(ITupleAccessor accessor) throws HyracksDataException {
-// System.err.println("continueStream");
+ // System.err.println("continueStream");
runFileStream.closeRunFile();
accessor.reset(inputBuffer[LEFT_PARTITION]);
@@ -281,17 +287,18 @@ public class MergeJoiner extends AbstractMergeJoiner {
}
private void unfreezeAndContinue(ITupleAccessor accessor) throws HyracksDataException {
-// System.err.println("unfreezeAndContinue");
-// if (LOGGER.isLoggable(Level.WARNING)) {
-// LOGGER.warning("snapshot: " + frameCounts[RIGHT] + " right, " + frameCounts[LEFT] + " left, "
-// + joinComparisonCount + " comparisons, " + joinResultCount + " results, " + spillCount + " spills, "
-// + (runFileStream.getFileCount() - spillFileCount) + " files, "
-// + (runFileStream.getWriteCount() - spillWriteCount) + " spill frames written, "
-// + (runFileStream.getReadCount() - spillReadCount) + " spill frames read.");
-// spillFileCount = runFileStream.getFileCount();
-// spillReadCount = runFileStream.getReadCount();
-// spillWriteCount = runFileStream.getWriteCount();
-// }
+ // System.err.println("unfreezeAndContinue");
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("snapshot: " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
+ + " left, " + joinComparisonCount + " comparisons, " + joinResultCount + " results, ["
+ + bufferManager.getNumTuples() + " tuples memory, " + spillCount + " spills, "
+ + (runFileStream.getFileCount() - spillFileCount) + " files, "
+ + (runFileStream.getWriteCount() - spillWriteCount) + " written, "
+ + (runFileStream.getReadCount() - spillReadCount) + " read].");
+ spillFileCount = runFileStream.getFileCount();
+ spillReadCount = runFileStream.getReadCount();
+ spillWriteCount = runFileStream.getWriteCount();
+ }
runFileStream.flushAndStopRunFile(accessor);
flushMemory();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index 042b85e..2513b1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -145,6 +145,7 @@ public class RunFileStream {
loadNextBuffer(accessor);
}
}
+ runFileReader.close();
}
// Flush buffer.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
index 7d4db64..79aba3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.dataflow.std.sort.util;
+import java.io.PrintStream;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.comm.FrameHelper;
@@ -42,7 +43,7 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc
private int tupleCount;
private int freeDataEndOffset;
private int deletedSpace;
- private byte[] array; // to speed up the array visit a little
+ private byte[] array; // to speed up the array visit a little
public DeletableFrameTupleAppender(RecordDescriptor recordDescriptor) {
this.recordDescriptor = recordDescriptor;
@@ -146,7 +147,7 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc
endOffset = getTupleEndOffset(i);
if (endOffset >= 0) {
int length = endOffset - startOffset;
- assert ( length >= 0);
+ assert length >= 0;
if (freeDataEndOffset != startOffset) {
System.arraycopy(array, startOffset, array, freeDataEndOffset, length);
}
@@ -162,7 +163,7 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc
private void reclaimDeletedEnding() {
for (int i = tupleCount - 1; i >= 0; i--) {
int endOffset = getTupleEndOffset(i);
- if (endOffset < 0) {
+ if (endOffset <= 0) {
tupleCount--;
} else {
break;
@@ -240,9 +241,28 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc
return tupleCount;
}
+ private int getLiveTupleCount() {
+ int live = 0;
+ for (int i = tupleCount - 1; i >= 0; i--) {
+ int endOffset = getTupleEndOffset(i);
+ if (endOffset > 0) {
+ live++;
+ }
+ }
+ return live;
+ }
+
@Override
public ByteBuffer getBuffer() {
return buffer;
}
+ @Override
+ public void printStats(PrintStream ps) {
+ if (getLiveTupleCount() == 0) {
+ ps.print("");
+ }
+ ps.printf("(%d, %d)", getLiveTupleCount(), getPhysicalTupleCount());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
index 31ea07d..e7d1ceb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.dataflow.std.sort.util;
+import java.io.PrintStream;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -73,4 +74,6 @@ public interface IAppendDeletableFrameTupleAccessor extends IFrameTupleAccessor
* @return how many contiguous free space in the buffer.
*/
int getContiguousFreeSpace();
+
+ void printStats(PrintStream ps);
}