You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:00 UTC
[10/50] [abbrv] asterixdb git commit: switch merge memory tracking
method.
switch merge memory tracking method.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4f9e6a82
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4f9e6a82
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4f9e6a82
Branch: refs/heads/ecarm002/interval_join_merge
Commit: 4f9e6a82eaff790a50f86848e770e4b1dedf4069
Parents: 1f7ac98
Author: Preston Carman <pr...@apache.org>
Authored: Thu Jul 14 09:11:57 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Jul 14 09:11:57 2016 -0700
----------------------------------------------------------------------
.../joins/AbstractIntervalMergeJoinChecker.java | 14 +++++++
.../IntervalPartitionJoiner.java | 2 +-
.../dataflow/std/join/IMergeJoinChecker.java | 3 ++
.../hyracks/dataflow/std/join/MergeJoiner.java | 41 +++++++++++---------
4 files changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
index 0a25c25..cf0bf6a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
@@ -105,6 +105,20 @@ public abstract class AbstractIntervalMergeJoinChecker implements IIntervalMerge
}
@Override
+ public boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int leftTupleIndex,
+ IFrameTupleAccessor accessorRight, int rightTupleIndex) throws HyracksDataException {
+ try {
+ IntervalJoinUtil.getIntervalPointable(accessorLeft, leftTupleIndex, idLeft, tvp, ipLeft);
+ IntervalJoinUtil.getIntervalPointable(accessorRight, rightTupleIndex, idRight, tvp, ipRight);
+ ipLeft.getStart(startLeft);
+ ipRight.getEnd(endRight);
+ return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) <= 0);
+ } catch (AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
public boolean checkToSaveInResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex,
IFrameTupleAccessor accessorRight, int rightTupleIndex, boolean reversed) throws HyracksDataException {
try {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index 5df7b0a..fe49d2f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -320,7 +320,7 @@ public class IntervalPartitionJoiner {
private int selectPartitionsToReload(int freeSpace, int pid) {
for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
- assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled partition have size 0?";
+ assert buildRFWriters[id].getFileSize() > 0 : "How come a spilled partition have size 0?";
if (freeSpace >= buildRFWriters[id].getFileSize()) {
return id;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
index ddf04f3..49a3763 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
@@ -51,6 +51,9 @@ public interface IMergeJoinChecker extends Serializable {
boolean checkToRemoveInMemory(ITupleAccessor accessorLeft, ITupleAccessor accessorRight)
throws HyracksDataException;
+ boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int leftTupleIndex,
+ IFrameTupleAccessor accessorRight, int rightTupleIndex) throws HyracksDataException;
+
/**
* Check to see if the next right tuple should be loaded during the merge join.
* The check is true if the left tuple could match with the next right tuple.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index 625a24c..d94a63e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -18,9 +18,11 @@
*/
package org.apache.hyracks.dataflow.std.join;
+import java.util.LinkedList;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +34,7 @@ import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor;
import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
@@ -47,10 +50,10 @@ public class MergeJoiner extends AbstractMergeJoiner {
private MergeStatus status;
- private final TuplePointer tp;
private final IDeallocatableFramePool framePool;
private IDeletableTupleBufferManager bufferManager;
- private ITupleAccessor memoryAccessor;
+ private ITuplePointerAccessor memoryAccessor;
+ private LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
private int leftStreamIndex;
private RunFileStream runFileStream;
@@ -71,9 +74,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
"MergeJoiner does not have enough memory (needs > 0, got " + memorySize + ").");
}
framePool = new DeallocatableFramePool(ctx, (memorySize) * ctx.getInitialFrameSize());
- tp = new TuplePointer();
bufferManager = new VariableDeletableTupleMemoryManager(framePool, rightRd);
- memoryAccessor = bufferManager.createTupleAccessor();
+ memoryAccessor = bufferManager.createTuplePointerAccessor();
// Run File and frame cache (left buffer)
leftStreamIndex = TupleAccessor.UNSET;
@@ -88,21 +90,23 @@ public class MergeJoiner extends AbstractMergeJoiner {
}
private boolean addToMemory(ITupleAccessor accessor) throws HyracksDataException {
+ TuplePointer tp = new TuplePointer();
if (bufferManager.insertTuple(accessor, accessor.getTupleId(), tp)) {
+ memoryBuffer.add(tp);
return true;
}
return false;
}
- private void removeFromMemory() throws HyracksDataException {
- memoryAccessor.getTuplePointer(tp);
+ private void removeFromMemory(TuplePointer tp) throws HyracksDataException {
+ memoryBuffer.remove(tp);
bufferManager.deleteTuple(tp);
}
- private void addToResult(ITupleAccessor accessor1, ITupleAccessor accessor2, IFrameWriter writer)
- throws HyracksDataException {
- FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, accessor1.getTupleId(), accessor2,
- accessor2.getTupleId());
+ private void addToResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight,
+ int rightTupleIndex, IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, leftTupleIndex, accessorRight,
+ rightTupleIndex);
}
@Override
@@ -197,21 +201,22 @@ public class MergeJoiner extends AbstractMergeJoiner {
private void processLeftTuple(IFrameWriter writer) throws HyracksDataException {
// Check against memory (right)
if (memoryHasTuples()) {
- memoryAccessor.reset();
- memoryAccessor.next();
- while (memoryAccessor.exists()) {
+ for (int i = memoryBuffer.size() - 1; i > -1; --i) {
+ memoryAccessor.reset(memoryBuffer.get(i));
// TuplePrinterUtil.printTuple(" --- A outer", inputAccessor[LEFT_PARTITION]);
// TuplePrinterUtil.printTuple(" --- A inner", memoryAccessor);
- if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], memoryAccessor)) {
+ if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
+ memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) {
// add to result
// System.err.println(" -- Matched --");
- addToResult(inputAccessor[LEFT_PARTITION], memoryAccessor, writer);
+ addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
+ memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer);
}
- if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], memoryAccessor)) {
+ if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
+ memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
// remove from memory
- removeFromMemory();
+ removeFromMemory(memoryBuffer.get(i));
}
- memoryAccessor.next();
}
}
inputAccessor[LEFT_PARTITION].next();