You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:18 UTC
[28/50] [abbrv] asterixdb git commit: snapshot with logging
snapshot with logging
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1df9a9c7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1df9a9c7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1df9a9c7
Branch: refs/heads/ecarm002/interval_join_merge
Commit: 1df9a9c7db4b686e75b870ee02080f8fe15ca666
Parents: 1fae6ac
Author: Preston Carman <pr...@apache.org>
Authored: Tue Sep 6 15:51:12 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Sep 6 15:51:12 2016 -0700
----------------------------------------------------------------------
.../joins/AbstractIntervalMergeJoinChecker.java | 6 +-
.../IntervalIndexJoinOperatorDescriptor.java | 6 +-
.../intervalindex/IntervalIndexJoiner.java | 86 +++++++++++++-------
.../dataflow/std/join/AbstractMergeJoiner.java | 24 +++---
.../hyracks/dataflow/std/join/IMergeJoiner.java | 2 +
.../std/join/MergeJoinOperatorDescriptor.java | 18 +++-
.../hyracks/dataflow/std/join/MergeJoiner.java | 80 +++++++++++++-----
.../dataflow/std/join/NestedLoopJoin.java | 8 +-
.../dataflow/std/join/RunFileStream.java | 9 +-
9 files changed, 162 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
index b461799..ec8ecda 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
@@ -69,7 +69,7 @@ public abstract class AbstractIntervalMergeJoinChecker implements IIntervalMerge
IntervalJoinUtil.getIntervalPointable(accessorRight, idRight, tvp, ipRight);
ipLeft.getEnd(endLeft);
ipRight.getStart(startRight);
- return ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), endLeft, startRight) >= 0;
+ return ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), endLeft, startRight) > 0;
}
@Override
@@ -79,7 +79,7 @@ public abstract class AbstractIntervalMergeJoinChecker implements IIntervalMerge
IntervalJoinUtil.getIntervalPointable(accessorRight, idRight, tvp, ipRight);
ipLeft.getStart(startLeft);
ipRight.getEnd(endRight);
- return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) <= 0);
+ return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) < 0);
}
@Override
@@ -102,7 +102,7 @@ public abstract class AbstractIntervalMergeJoinChecker implements IIntervalMerge
IntervalJoinUtil.getIntervalPointable(accessorRight, rightTupleIndex, idRight, tvp, ipRight);
ipLeft.getStart(startLeft);
ipRight.getEnd(endRight);
- return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) <= 0);
+ return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) < 0);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
index be44df3..d84fabc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -191,7 +192,7 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip
private static final long serialVersionUID = 1L;
private final ActivityId joinAid;
- private MergeJoinLocks locks;
+ private final MergeJoinLocks locks;
public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
super(id);
@@ -253,7 +254,8 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip
first = false;
}
try {
- while (!state.status.continueRightLoad && state.status.branch[LEFT_ACTIVITY_ID].hasMore()) {
+ while (!state.status.continueRightLoad
+ && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
// Wait for the state to request right frame unless left has finished.
locks.getRight(partition).await();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index e4c4cbe..965411b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -74,8 +74,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
private long joinComparisonCount = 0;
private long joinResultCount = 0;
private long spillCount = 0;
- private long spillReadCount = 0;
- private long spillWriteCount = 0;
+ private long leftSpillCount = 0;
+ private long rightSpillCount = 0;
+ private long[] spillFileCount = { 0, 0 };
+ private long[] spillReadCount = { 0, 0 };
+ private long[] spillWriteCount = { 0, 0 };
public IntervalIndexJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status,
MergeJoinLocks locks, Comparator<EndPointIndexItem> endPointComparator,
@@ -109,10 +112,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
memoryAccessor[RIGHT_PARTITION] = bufferManager.getTuplePointerAccessor(rightRd);
activeManager = new ActiveSweepManager[JOIN_PARTITIONS];
- activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, leftKey, LEFT_PARTITION,
- endPointComparator);
- activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, rightKey, RIGHT_PARTITION,
- endPointComparator);
+ activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, leftKey, LEFT_PARTITION, endPointComparator);
+ activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, rightKey, RIGHT_PARTITION, endPointComparator);
// Run files for both branches
runFileStream = new RunFileStream[JOIN_PARTITIONS];
@@ -143,9 +144,12 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
runFileStream[LEFT_PARTITION].close();
runFileStream[RIGHT_PARTITION].close();
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
- + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, "
- + spillReadCount + " spill frames read.");
+ LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+ + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount() + " files, "
+ + runFileStream[LEFT_PARTITION].getWriteCount() + " written, " + runFileStream[LEFT_PARTITION].getReadCount()
+ + " read]. right[" + rightSpillCount + " spills, " + runFileStream[RIGHT_PARTITION].getFileCount()
+ + " files, " + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, "
+ + runFileStream[RIGHT_PARTITION].getReadCount() + " read].");
}
}
@@ -237,11 +241,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
long leftStart = IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey);
long rightStart = IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey);
if (leftStart < rightStart) {
- return activeManager[RIGHT_PARTITION].hasRecords()
- && activeManager[RIGHT_PARTITION].getTopPoint() < leftStart;
+ return activeManager[RIGHT_PARTITION].hasRecords() && activeManager[RIGHT_PARTITION].getTopPoint() < leftStart;
} else {
- return !(activeManager[LEFT_PARTITION].hasRecords()
- && activeManager[LEFT_PARTITION].getTopPoint() < rightStart);
+ return !(activeManager[LEFT_PARTITION].hasRecords() && activeManager[LEFT_PARTITION].getTopPoint() < rightStart);
}
}
@@ -258,8 +260,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
if (checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), sweep)
|| !imjc.checkToRemoveRightActive()) {
// Add individual tuples.
- processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION],
- inputAccessor[LEFT_PARTITION], true, writer);
+ processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], inputAccessor[LEFT_PARTITION], true,
+ writer);
runFileStream[LEFT_PARTITION].addToRunFile(inputAccessor[LEFT_PARTITION]);
inputAccessor[LEFT_PARTITION].next();
ts = loadLeftTuple();
@@ -276,7 +278,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
// Memory is empty and we can start processing the run file.
if (activeManager[RIGHT_PARTITION].isEmpty() || ts.isEmpty()) {
- unfreezeAndContinue(LEFT_PARTITION, inputAccessor[LEFT_PARTITION], RIGHT_PARTITION);
+ unfreezeAndContinue(LEFT_PARTITION, inputAccessor[LEFT_PARTITION]);
ts = loadLeftTuple();
}
return ts;
@@ -291,8 +293,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
if (checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey), sweep)
|| !imjc.checkToRemoveLeftActive()) {
// Add individual tuples.
- processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION],
- inputAccessor[RIGHT_PARTITION], false, writer);
+ processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION], false,
+ writer);
runFileStream[RIGHT_PARTITION].addToRunFile(inputAccessor[RIGHT_PARTITION]);
inputAccessor[RIGHT_PARTITION].next();
ts = loadRightTuple();
@@ -309,7 +311,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
// Memory is empty and we can start processing the run file.
if (!activeManager[LEFT_PARTITION].hasRecords() || ts.isEmpty()) {
- unfreezeAndContinue(RIGHT_PARTITION, inputAccessor[RIGHT_PARTITION], LEFT_PARTITION);
+ unfreezeAndContinue(RIGHT_PARTITION, inputAccessor[RIGHT_PARTITION]);
ts = loadRightTuple();
}
return ts;
@@ -318,9 +320,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
private void processLeftTuple(IFrameWriter writer) throws HyracksDataException {
// Process endpoints
do {
- if ((!activeManager[LEFT_PARTITION].hasRecords()
- || checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey),
- activeManager[LEFT_PARTITION].getTopPoint()))
+ if ((!activeManager[LEFT_PARTITION].hasRecords() || checkToProcessAdd(
+ IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), activeManager[LEFT_PARTITION].getTopPoint()))
|| !imjc.checkToRemoveLeftActive()) {
// Add to active, end point index and buffer.
TuplePointer tp = new TuplePointer();
@@ -340,8 +341,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
// Add Results
if (!buffer.isEmpty()) {
- processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer,
- memoryAccessor[LEFT_PARTITION], true, writer);
+ processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, memoryAccessor[LEFT_PARTITION],
+ true, writer);
}
}
@@ -370,8 +371,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
// Add Results
if (!buffer.isEmpty()) {
- processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer,
- memoryAccessor[RIGHT_PARTITION], false, writer);
+ processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, memoryAccessor[RIGHT_PARTITION],
+ false, writer);
}
}
@@ -417,6 +418,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
+ bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: "
+ bufferManager.getNumTuples(RIGHT_PARTITION) + ")");
}
+ rightSpillCount++;
} else {
runFileStream[LEFT_PARTITION].startRunFile();
if (LOGGER.isLoggable(Level.FINE)) {
@@ -424,7 +426,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
+ bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: "
+ bufferManager.getNumTuples(RIGHT_PARTITION) + ")");
}
+ leftSpillCount++;
}
+ spillCount++;
}
private void continueStream(int diskPartition, ITupleAccessor accessor) throws HyracksDataException {
@@ -434,13 +438,27 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Continue with stream (" + diskPartition + ").");
}
- spillCount++;
- spillReadCount += runFileStream[diskPartition].getReadCount();
- spillWriteCount += runFileStream[diskPartition].getWriteCount();
}
- private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor, int flushPartition)
- throws HyracksDataException {
+ private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor) throws HyracksDataException {
+ int flushPartition = frozenPartition == LEFT_PARTITION ? RIGHT_PARTITION : LEFT_PARTITION;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("snapshot(" + frozenPartition + "): " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
+ + " left, left[" + leftSpillCount + " spills, "
+ + (runFileStream[LEFT_PARTITION].getFileCount() - spillFileCount[LEFT_PARTITION]) + " files, "
+ + (runFileStream[LEFT_PARTITION].getWriteCount() - spillWriteCount[LEFT_PARTITION]) + " written, "
+ + (runFileStream[LEFT_PARTITION].getReadCount() - spillReadCount[LEFT_PARTITION]) + " read]. right[" + rightSpillCount
+ + " spills, " + (runFileStream[RIGHT_PARTITION].getFileCount() - spillFileCount[RIGHT_PARTITION]) + " files, "
+ + (runFileStream[RIGHT_PARTITION].getWriteCount() - spillWriteCount[RIGHT_PARTITION]) + " written, "
+ + (runFileStream[RIGHT_PARTITION].getReadCount() - spillReadCount[RIGHT_PARTITION]) + " read].");
+ spillFileCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getFileCount();
+ spillReadCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getReadCount();
+ spillWriteCount[LEFT_PARTITION] = runFileStream[LEFT_PARTITION].getWriteCount();
+ spillFileCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getFileCount();
+ spillReadCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getReadCount();
+ spillWriteCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getWriteCount();
+ }
+
runFileStream[frozenPartition].flushAndStopRunFile(accessor);
flushMemory(flushPartition);
if ((LEFT_PARTITION == frozenPartition && !status.branch[LEFT_PARTITION].isRunFileReading())
@@ -453,4 +471,10 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
}
}
+ @Override
+ public void closeInput(int partition) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
index 8006790..aa065cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
@@ -60,6 +60,7 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner {
private final int partition;
private final MergeJoinLocks locks;
+ protected long[] frameCounts = { 0, 0 };
public AbstractMergeJoiner(IHyracksTaskContext ctx, int partition, MergeStatus status, MergeJoinLocks locks,
RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException {
@@ -98,12 +99,13 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner {
return TupleStatus.LOADED;
}
- protected TupleStatus loadMemoryTuple(int joinId) {
+ protected TupleStatus loadMemoryTuple(int branch) {
TupleStatus loaded;
- if (inputAccessor[joinId] != null && inputAccessor[joinId].exists()) {
+ if (inputAccessor[branch] != null && inputAccessor[branch].exists()) {
// Still processing frame.
+ int test = inputAccessor[branch].getTupleCount();
loaded = TupleStatus.LOADED;
- } else if (status.branch[joinId].hasMore()) {
+ } else if (status.branch[branch].hasMore()) {
loaded = TupleStatus.UNKNOWN;
} else {
// No more frames or tuples to process.
@@ -113,14 +115,14 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner {
}
@Override
- public void setFrame(int partition, ByteBuffer buffer) {
- inputBuffer[partition].clear();
- if (inputBuffer[partition].capacity() < buffer.capacity()) {
- inputBuffer[partition].limit(buffer.capacity());
+ public void setFrame(int branch, ByteBuffer buffer) {
+ inputBuffer[branch].clear();
+ if (inputBuffer[branch].capacity() < buffer.capacity()) {
+ inputBuffer[branch].limit(buffer.capacity());
}
- inputBuffer[partition].put(buffer.array(), 0, buffer.capacity());
- inputAccessor[partition].reset(inputBuffer[partition]);
- inputAccessor[partition].next();
+ inputBuffer[branch].put(buffer.array(), 0, buffer.capacity());
+ inputAccessor[branch].reset(inputBuffer[branch]);
+ inputAccessor[branch].next();
+ frameCounts[branch]++;
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
index 61ddde1..4268ec9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
@@ -31,4 +31,6 @@ public interface IMergeJoiner {
void setFrame(int partition, ByteBuffer buffer);
+ void closeInput(int partition) throws HyracksDataException;
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
index 5624bb5..cbe1a66 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
@@ -34,6 +34,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
/**
* The merge join is made up of two operators: left and right.
@@ -105,6 +106,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private final RecordDescriptor leftRd;
private MergeJoinTaskState state;
private boolean first = true;
+ int count = 0;
public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
this.ctx = ctx;
@@ -141,6 +143,8 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
locks.getLock(partition).lock();
+
+ count++;
if (first) {
state.status.branch[LEFT_ACTIVITY_ID].setStageData();
first = false;
@@ -171,6 +175,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
if (state.failed) {
writer.fail();
} else {
+ state.joiner.closeInput(LEFT_ACTIVITY_ID);
state.joiner.processMergeUsingLeftTuple(writer);
state.joiner.closeResult(writer);
writer.close();
@@ -180,6 +185,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
} finally {
locks.getLock(partition).unlock();
}
+// System.err.println("Left next calls: " + count);
}
}
}
@@ -188,7 +194,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final long serialVersionUID = 1L;
private final ActivityId joinAid;
- private MergeJoinLocks locks;
+ private final MergeJoinLocks locks;
public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
super(id);
@@ -202,8 +208,8 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
throws HyracksDataException {
locks.setPartitions(nPartitions);
RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys,
- partition, ctx);
+ final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys, partition,
+ ctx);
return new RightDataOperator(ctx, partition, inRecordDesc, mjc);
}
@@ -215,6 +221,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private final IMergeJoinChecker mjc;
private MergeJoinTaskState state;
private boolean first = true;
+ int count = 0;
public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc,
IMergeJoinChecker mjc) {
@@ -250,12 +257,14 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
locks.getLock(partition).lock();
+ count++;
if (first) {
state.status.branch[RIGHT_ACTIVITY_ID].setStageData();
first = false;
}
try {
- while (!state.status.continueRightLoad && state.status.branch[LEFT_ACTIVITY_ID].hasMore()) {
+ while (!state.status.continueRightLoad
+ && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
// Wait for the state to request right frame unless left has finished.
locks.getRight(partition).await();
}
@@ -289,6 +298,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
} finally {
locks.getLock(partition).unlock();
}
+// System.err.println("Right next calls: " + count);
}
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index 03283d3..c1a828f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -60,8 +60,9 @@ public class MergeJoiner extends AbstractMergeJoiner {
private long joinComparisonCount = 0;
private long joinResultCount = 0;
- private long spillWriteCount = 0;
- private long spillReadCount = 0;
+// private long spillFileCount = 0;
+// private long spillWriteCount = 0;
+// private long spillReadCount = 0;
private long spillCount = 0;
public MergeJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, MergeJoinLocks locks,
@@ -114,12 +115,14 @@ public class MergeJoiner extends AbstractMergeJoiner {
resultAppender.write(writer, true);
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
- + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, "
- + spillReadCount + " spill frames read.");
+ + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, "
+ + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount()
+ + " spill frames read.");
}
}
private void flushMemory() throws HyracksDataException {
+ memoryBuffer.clear();
bufferManager.reset();
}
@@ -151,7 +154,11 @@ public class MergeJoiner extends AbstractMergeJoiner {
if (status.branch[LEFT_PARTITION].isRunFileReading()) {
loaded = loadSpilledTuple(LEFT_PARTITION);
if (loaded.isEmpty()) {
- continueStream(inputAccessor[LEFT_PARTITION]);
+ if (status.branch[LEFT_PARTITION].isRunFileWriting() && !status.branch[LEFT_PARTITION].hasMore()) {
+ unfreezeAndContinue(inputAccessor[LEFT_PARTITION]);
+ } else {
+ continueStream(inputAccessor[LEFT_PARTITION]);
+ }
loaded = loadLeftTuple();
}
} else {
@@ -169,6 +176,13 @@ public class MergeJoiner extends AbstractMergeJoiner {
return TupleStatus.LOADED;
}
+ @Override
+ public void closeInput(int partition) throws HyracksDataException {
+ if (status.branch[partition].isRunFileWriting()) {
+ unfreezeAndContinue(inputAccessor[partition]);
+ }
+ }
+
/**
* Left
*
@@ -176,48 +190,54 @@ public class MergeJoiner extends AbstractMergeJoiner {
*/
@Override
public void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException {
- TupleStatus ts = loadLeftTuple();
- while (ts.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() || memoryHasTuples())) {
+ TupleStatus leftTs = loadLeftTuple();
+ TupleStatus rightTs = loadRightTuple();
+ while (leftTs.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() || memoryHasTuples())) {
if (status.branch[LEFT_PARTITION].isRunFileWriting()) {
// Left side from disk
- processLeftTupleSpill(writer);
- ts = loadLeftTuple();
- } else if (loadRightTuple().isLoaded()
- && mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION])) {
+ leftTs = processLeftTupleSpill(writer);
+ } else if (rightTs.isLoaded() && mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION])) {
// Right side from stream
processRightTuple();
+ rightTs = loadRightTuple();
} else {
// Left side from stream
processLeftTuple(writer);
- ts = loadLeftTuple();
+ leftTs = loadLeftTuple();
}
}
}
- private void processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException {
+ private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException {
+ // System.err.print("Spill ");
+
runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]);
processLeftTuple(writer);
+
// Memory is empty and we can start processing the run file.
if (!memoryHasTuples() && status.branch[LEFT_PARTITION].isRunFileWriting()) {
unfreezeAndContinue(inputAccessor[LEFT_PARTITION]);
}
+ return loadLeftTuple();
}
private void processLeftTuple(IFrameWriter writer) throws HyracksDataException {
+ // TuplePrinterUtil.printTuple("Left", inputAccessor[LEFT]);
// Check against memory (right)
if (memoryHasTuples()) {
for (int i = memoryBuffer.size() - 1; i > -1; --i) {
memoryAccessor.reset(memoryBuffer.get(i));
- if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
- memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) {
+ if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
+ memoryBuffer.get(i).getTupleIndex(), false)) {
// add to result
- addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
- memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer);
+ addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
+ memoryBuffer.get(i).getTupleIndex(), writer);
}
joinComparisonCount++;
- if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
- memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
+ if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
+ memoryBuffer.get(i).getTupleIndex())) {
// remove from memory
+ // TuplePrinterUtil.printTuple("Remove Memory", memoryAccessor, memoryBuffer.get(i).getTupleIndex());
removeFromMemory(memoryBuffer.get(i));
}
}
@@ -234,30 +254,45 @@ public class MergeJoiner extends AbstractMergeJoiner {
return;
}
}
+ // TuplePrinterUtil.printTuple("Memory", inputAccessor[RIGHT]);
inputAccessor[RIGHT_PARTITION].next();
}
private void freezeAndSpill() throws HyracksDataException {
+// System.err.println("freezeAndSpill");
+
runFileStream.startRunFile();
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"Memory is full. Freezing the right branch. (memory tuples: " + bufferManager.getNumTuples() + ")");
}
+ spillCount++;
}
private void continueStream(ITupleAccessor accessor) throws HyracksDataException {
+// System.err.println("continueStream");
+
runFileStream.closeRunFile();
accessor.reset(inputBuffer[LEFT_PARTITION]);
accessor.setTupleId(leftStreamIndex);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Continue with left stream.");
}
- spillCount++;
- spillReadCount += runFileStream.getReadCount();
- spillWriteCount += runFileStream.getWriteCount();
}
private void unfreezeAndContinue(ITupleAccessor accessor) throws HyracksDataException {
+// System.err.println("unfreezeAndContinue");
+// if (LOGGER.isLoggable(Level.WARNING)) {
+// LOGGER.warning("snapshot: " + frameCounts[RIGHT] + " right, " + frameCounts[LEFT] + " left, "
+// + joinComparisonCount + " comparisons, " + joinResultCount + " results, " + spillCount + " spills, "
+// + (runFileStream.getFileCount() - spillFileCount) + " files, "
+// + (runFileStream.getWriteCount() - spillWriteCount) + " spill frames written, "
+// + (runFileStream.getReadCount() - spillReadCount) + " spill frames read.");
+// spillFileCount = runFileStream.getFileCount();
+// spillReadCount = runFileStream.getReadCount();
+// spillWriteCount = runFileStream.getWriteCount();
+// }
+
runFileStream.flushAndStopRunFile(accessor);
flushMemory();
if (!status.branch[LEFT_PARTITION].isRunFileReading()) {
@@ -267,6 +302,7 @@ public class MergeJoiner extends AbstractMergeJoiner {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Unfreezing right partition.");
}
+
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 3d99d6c..5cc36ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -62,6 +62,7 @@ public class NestedLoopJoin {
private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
private BufferInfo tempInfo = new BufferInfo(null, -1, -1);
+ private final int partition;
private long joinComparisonCount = 0;
private long joinResultCount = 0;
private long spillWriteCount = 0;
@@ -104,6 +105,8 @@ public class NestedLoopJoin {
.createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
runFileWriter = new RunFileWriter(file, ctx.getIOManager());
runFileWriter.open();
+
+ partition = ctx.getTaskAttemptId().getTaskId().getPartition();
}
public void cache(ByteBuffer buffer) throws HyracksDataException {
@@ -202,8 +205,9 @@ public class NestedLoopJoin {
appender.write(writer, true);
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("NestedLoopJoin statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
- + " results, " + spillWriteCount + " frames written, " + spillReadCount + " frames read.");
+ LOGGER.warning("NestedLoopJoin statitics: " + partition + " partition, " + joinComparisonCount
+ + " comparisons, " + joinResultCount + " results, " + spillWriteCount + " frames written, "
+ + spillReadCount + " frames read.");
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index aaaaaf4..042b85e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -48,6 +48,7 @@ public class RunFileStream {
private long runFileCounter = 0;
private long readCount = 0;
private long writeCount = 0;
+ private long tupleCount = 0;
public RunFileStream(IHyracksTaskContext ctx, String key, IRunFileStreamStatus status) throws HyracksDataException {
this.ctx = ctx;
@@ -58,6 +59,10 @@ public class RunFileStream {
runFileAppender = new FrameTupleAppender(new VSizeFrame(ctx));
}
+ public long getFileCount() {
+ return runFileCounter;
+ }
+
public long getReadCount() {
return readCount;
}
@@ -67,8 +72,6 @@ public class RunFileStream {
}
public void startRunFile() throws HyracksDataException {
- readCount = 0;
- writeCount = 0;
runFileCounter++;
status.setRunFileWriting(true);
@@ -89,7 +92,9 @@ public class RunFileStream {
runFileAppender.write(runFileWriter, true);
writeCount++;
runFileAppender.append(accessor, idx);
+ tupleCount = 0;
}
+ tupleCount++;
}
public void openRunFile(ITupleAccessor accessor) throws HyracksDataException {