You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:16 UTC
[26/50] [abbrv] asterixdb git commit: Add stats tracking for interval
joins.
Add stats tracking for interval joins.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a9729514
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a9729514
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a9729514
Branch: refs/heads/ecarm002/interval_join_merge
Commit: a9729514d7d517a24df2d290d15e854af6d65a51
Parents: 1487f2b
Author: Preston Carman <pr...@apache.org>
Authored: Thu Aug 25 11:43:57 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Aug 25 11:43:57 2016 -0700
----------------------------------------------------------------------
.../intervalindex/IntervalIndexJoiner.java | 54 ++++++++++++--------
.../InMemoryIntervalPartitionJoin.java | 18 ++++++-
.../IntervalPartitionJoiner.java | 44 ++++++++++++----
.../IPartitionedTupleBufferManager.java | 2 +
.../VPartitionTupleBufferManager.java | 5 ++
.../dataflow/std/join/AbstractMergeJoiner.java | 10 ++--
.../hyracks/dataflow/std/join/MergeJoiner.java | 34 +++++++-----
.../dataflow/std/join/NestedLoopJoin.java | 23 ++++++++-
.../join/NestedLoopJoinOperatorDescriptor.java | 28 ++--------
.../dataflow/std/join/RunFileStream.java | 27 ++++++++--
10 files changed, 160 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index 6f04cad..e4c4cbe 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -29,11 +29,9 @@ import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFacto
import org.apache.asterix.runtime.operators.joins.IntervalJoinUtil;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedDeletableTupleBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
@@ -57,23 +55,27 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
private static final Logger LOGGER = Logger.getLogger(IntervalIndexJoiner.class.getName());
- private IPartitionedDeletableTupleBufferManager bufferManager;
+ private final IPartitionedDeletableTupleBufferManager bufferManager;
- private ActiveSweepManager[] activeManager;
- private ITuplePointerAccessor[] memoryAccessor;
- private int[] streamIndex;
- private RunFileStream[] runFileStream;
+ private final ActiveSweepManager[] activeManager;
+ private final ITuplePointerAccessor[] memoryAccessor;
+ private final int[] streamIndex;
+ private final RunFileStream[] runFileStream;
- private LinkedList<TuplePointer> buffer = new LinkedList<>();
+ private final LinkedList<TuplePointer> buffer = new LinkedList<>();
- private IIntervalMergeJoinChecker imjc;
+ private final IIntervalMergeJoinChecker imjc;
- protected byte point;
+ private final byte point;
- private MergeStatus status;
+ private final int leftKey;
+ private final int rightKey;
- private int leftKey;
- private int rightKey;
+ private long joinComparisonCount = 0;
+ private long joinResultCount = 0;
+ private long spillCount = 0;
+ private long spillReadCount = 0;
+ private long spillWriteCount = 0;
public IntervalIndexJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status,
MergeJoinLocks locks, Comparator<EndPointIndexItem> endPointComparator,
@@ -87,8 +89,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
this.leftKey = leftKeys[0];
this.rightKey = rightKeys[0];
- this.status = status;
-
RecordDescriptor[] recordDescriptors = new RecordDescriptor[JOIN_PARTITIONS];
recordDescriptors[LEFT_PARTITION] = leftRd;
recordDescriptors[RIGHT_PARTITION] = rightRd;
@@ -119,8 +119,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
runFileStream[LEFT_PARTITION] = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
runFileStream[RIGHT_PARTITION] = new RunFileStream(ctx, "right", status.branch[RIGHT_PARTITION]);
- // Result
- resultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize
+ " frames of memory.");
@@ -134,6 +132,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
} else {
FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, index1, accessor2, index2);
}
+ joinResultCount++;
}
@Override
@@ -143,6 +142,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
activeManager[RIGHT_PARTITION].clear();
runFileStream[LEFT_PARTITION].close();
runFileStream[RIGHT_PARTITION].close();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+ + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, "
+ + spillReadCount + " spill frames read.");
+ }
}
private void flushMemory(int partition) throws HyracksDataException {
@@ -378,10 +382,12 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
outerAccessor.reset(outerTp);
for (TuplePointer innerTp : inner) {
innerAccessor.reset(innerTp);
- if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(),
- reversed)) {
- addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), reversed, writer);
+ if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor,
+ innerTp.getTupleIndex(), reversed)) {
+ addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(),
+ reversed, writer);
}
+ joinComparisonCount++;
}
}
if (LOGGER.isLoggable(Level.FINE)) {
@@ -394,11 +400,12 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter writer) throws HyracksDataException {
for (TuplePointer outerTp : outer) {
outerAccessor.reset(outerTp);
- if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor, tupleAccessor.getTupleId(),
- reversed)) {
+ if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor,
+ tupleAccessor.getTupleId(), reversed)) {
addToResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor, tupleAccessor.getTupleId(), reversed,
writer);
}
+ joinComparisonCount++;
}
}
@@ -427,6 +434,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Continue with stream (" + diskPartition + ").");
}
+ spillCount++;
+ spillReadCount += runFileStream[diskPartition].getReadCount();
+ spillWriteCount += runFileStream[diskPartition].getWriteCount();
}
private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor, int flushPartition)
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
index b5256b5..88ff727 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
@@ -35,13 +35,16 @@ import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
public class InMemoryIntervalPartitionJoin {
+ private static final Logger LOGGER = Logger.getLogger(InMemoryIntervalPartitionJoin.class.getName());
+
private final FrameTupleAccessor accessorBuild;
private final FrameTupleAppender appender;
private final IFrameBufferManager fbm;
private BufferInfo bufferInfo;
private final IIntervalMergeJoinChecker imjc;
- private static final Logger LOGGER = Logger.getLogger(InMemoryIntervalPartitionJoin.class.getName());
+ private long joinComparisonCount = 0;
+ private long joinResultCount = 0;
public InMemoryIntervalPartitionJoin(IHyracksTaskContext ctx, IFrameBufferManager fbm,
IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, RecordDescriptor probeRd)
@@ -55,6 +58,14 @@ public class InMemoryIntervalPartitionJoin {
"InMemoryIntervalPartitionJoin has been created for Thread ID " + Thread.currentThread().getId() + ".");
}
+ public long getComparisonCount() {
+ return joinComparisonCount;
+ }
+
+ public long getResultCount() {
+ return joinResultCount;
+ }
+
public void join(IFrameTupleAccessor accessorProbe, int probeTupleIndex, IFrameWriter writer)
throws HyracksDataException {
if (fbm.getNumFrames() != 0) {
@@ -62,9 +73,11 @@ public class InMemoryIntervalPartitionJoin {
fbm.getFrame(frameIndex, bufferInfo);
accessorBuild.reset(bufferInfo.getBuffer());
for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
- if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) {
+ if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex,
+ false)) {
appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer);
}
+ joinComparisonCount++;
}
}
}
@@ -77,5 +90,6 @@ public class InMemoryIntervalPartitionJoin {
private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe,
int probeSidetIx, IFrameWriter writer) throws HyracksDataException {
FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe, probeSidetIx);
+ joinResultCount++;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index fe49d2f..e943a48 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -52,14 +52,16 @@ import org.apache.hyracks.dataflow.std.structures.TuplePointer;
*/
public class IntervalPartitionJoiner {
- // Used for special probe BigObject which can not be held into the Join memory
- private FrameTupleAppender bigProbeFrameAppender;
+ private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
- enum SIDE {
+ private enum SIDE {
BUILD,
PROBE
}
+ // Used for special probe BigObject which can not be held into the Join memory
+ private FrameTupleAppender bigProbeFrameAppender;
+
private IHyracksTaskContext ctx;
private final String buildRelName;
@@ -85,8 +87,6 @@ public class IntervalPartitionJoiner {
private final FrameTupleAccessor accessorBuild;
private final FrameTupleAccessor accessorProbe;
- private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
-
// stats information
private IntervalPartitionJoinData ipjd;
@@ -95,6 +95,12 @@ public class IntervalPartitionJoiner {
private IIntervalMergeJoinChecker imjc;
+ private long joinComparisonCount = 0;
+ private long joinResultCount = 0;
+ private long spillCount = 0;
+ private long spillReadCount = 0;
+ private long spillWriteCount = 0;
+
public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) {
@@ -220,6 +226,8 @@ public class IntervalPartitionJoiner {
private void spillPartition(int pid) throws HyracksDataException {
RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
+ spillCount++;
+ spillWriteCount += buildBufferManager.getNumFrames(pid);
buildBufferManager.flushPartition(pid, writer);
buildBufferManager.clearPartition(pid);
ipjd.buildSpill(pid);
@@ -261,7 +269,9 @@ public class IntervalPartitionJoiner {
private void flushAndClearBuildSpilledPartition() throws HyracksDataException {
for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) {
if (buildBufferManager.getNumTuples(pid) > 0) {
- buildBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD));
+ spillWriteCount += buildBufferManager.getNumFrames(pid);
+ RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
+ buildBufferManager.flushPartition(pid, runFileWriter);
buildBufferManager.clearPartition(pid);
buildRFWriters[pid].close();
}
@@ -271,7 +281,9 @@ public class IntervalPartitionJoiner {
private void flushAndClearProbeSpilledPartition() throws HyracksDataException {
for (int pid = 0; pid < numOfPartitions; ++pid) {
if (probeBufferManager.getNumTuples(pid) > 0) {
- probeBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE));
+ spillWriteCount += probeBufferManager.getNumFrames(pid);
+ RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+ probeBufferManager.flushPartition(pid, runFileWriter);
probeBufferManager.clearPartition(pid);
probeRFWriters[pid].close();
}
@@ -310,6 +322,7 @@ public class IntervalPartitionJoiner {
return false;
}
}
+ spillReadCount++;
}
r.close();
@@ -329,13 +342,15 @@ public class IntervalPartitionJoiner {
}
private void createInMemoryJoiner(int pid) throws HyracksDataException {
- this.inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
+ inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd);
}
private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException {
- this.inMemJoiner[pid].closeJoin(writer);
- this.inMemJoiner[pid] = null;
+ joinComparisonCount += inMemJoiner[pid].getComparisonCount();
+ joinResultCount += inMemJoiner[pid].getResultCount();
+ inMemJoiner[pid].closeJoin(writer);
+ inMemJoiner[pid] = null;
}
public void initProbe() throws HyracksDataException {
@@ -374,6 +389,8 @@ public class IntervalPartitionJoiner {
break;
}
RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
+ spillCount++;
+ spillWriteCount += probeBufferManager.getNumFrames(pid);
probeBufferManager.flushPartition(victim, runFileWriter);
probeBufferManager.clearPartition(victim);
}
@@ -582,7 +599,7 @@ public class IntervalPartitionJoiner {
}
public int buildNextInMemory(int pid) {
- int nextPid = buildSpilledStatus.nextClearBit(pid);
+ int nextPid = buildSpilledStatus.nextClearBit(pid);
if (nextPid >= numOfPartitions) {
return -1;
}
@@ -644,6 +661,11 @@ public class IntervalPartitionJoiner {
FileUtils.deleteQuietly(rfw.getFileReference().getFile());
}
}
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("IntervalPartitionJoiner statitics: " + joinComparisonCount + " comparisons, "
+ + joinResultCount + " results, " + spillCount + " spills, " + spillWriteCount
+ + " spill frames written, " + spillReadCount + " spill frames read.");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index d844e7d..c0c96b3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -31,6 +31,8 @@ public interface IPartitionedTupleBufferManager {
int getNumTuples(int partition);
+ int getNumFrames(int partition);
+
int getPhysicalSize(int partition);
/**
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 104f1ce..4a4cb5d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -90,6 +90,11 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
}
@Override
+ public int getNumFrames(int partition) {
+ return partitionArray[partition].getNumFrames();
+ }
+
+ @Override
public int getPhysicalSize(int partitionId) {
int size = 0;
IFrameBufferManager partition = partitionArray[partitionId];
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
index f8d328b..8006790 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
@@ -53,15 +53,13 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner {
protected static final int LEFT_PARTITION = 0;
protected static final int RIGHT_PARTITION = 1;
+ protected final ByteBuffer[] inputBuffer;
+ protected final FrameTupleAppender resultAppender;
protected final ITupleAccessor[] inputAccessor;
- protected ByteBuffer[] inputBuffer;
-
- private MergeJoinLocks locks;
- private MergeStatus status;
+ protected final MergeStatus status;
private final int partition;
-
- protected FrameTupleAppender resultAppender;
+ private final MergeJoinLocks locks;
public AbstractMergeJoiner(IHyracksTaskContext ctx, int partition, MergeStatus status, MergeJoinLocks locks,
RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index d94a63e..03283d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -24,11 +24,9 @@ import java.util.logging.Logger;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
@@ -48,24 +46,27 @@ import org.apache.hyracks.dataflow.std.structures.TuplePointer;
*/
public class MergeJoiner extends AbstractMergeJoiner {
- private MergeStatus status;
+ private static final Logger LOGGER = Logger.getLogger(MergeJoiner.class.getName());
private final IDeallocatableFramePool framePool;
- private IDeletableTupleBufferManager bufferManager;
- private ITuplePointerAccessor memoryAccessor;
- private LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
+ private final IDeletableTupleBufferManager bufferManager;
+ private final ITuplePointerAccessor memoryAccessor;
+ private final LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
private int leftStreamIndex;
- private RunFileStream runFileStream;
+ private final RunFileStream runFileStream;
private final IMergeJoinChecker mjc;
- private static final Logger LOGGER = Logger.getLogger(MergeJoiner.class.getName());
+ private long joinComparisonCount = 0;
+ private long joinResultCount = 0;
+ private long spillWriteCount = 0;
+ private long spillReadCount = 0;
+ private long spillCount = 0;
public MergeJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, MergeJoinLocks locks,
IMergeJoinChecker mjc, RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException {
super(ctx, partition, status, locks, leftRd, rightRd);
- this.status = status;
this.mjc = mjc;
// Memory (right buffer)
@@ -81,8 +82,6 @@ public class MergeJoiner extends AbstractMergeJoiner {
leftStreamIndex = TupleAccessor.UNSET;
runFileStream = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
- // Result
- resultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"MergeJoiner has started partition " + partition + " with " + memorySize + " frames of memory.");
@@ -107,11 +106,17 @@ public class MergeJoiner extends AbstractMergeJoiner {
int rightTupleIndex, IFrameWriter writer) throws HyracksDataException {
FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, leftTupleIndex, accessorRight,
rightTupleIndex);
+ joinResultCount++;
}
@Override
public void closeResult(IFrameWriter writer) throws HyracksDataException {
resultAppender.write(writer, true);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+ + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, "
+ + spillReadCount + " spill frames read.");
+ }
}
private void flushMemory() throws HyracksDataException {
@@ -203,15 +208,13 @@ public class MergeJoiner extends AbstractMergeJoiner {
if (memoryHasTuples()) {
for (int i = memoryBuffer.size() - 1; i > -1; --i) {
memoryAccessor.reset(memoryBuffer.get(i));
- // TuplePrinterUtil.printTuple(" --- A outer", inputAccessor[LEFT_PARTITION]);
- // TuplePrinterUtil.printTuple(" --- A inner", memoryAccessor);
if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) {
// add to result
- // System.err.println(" -- Matched --");
addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer);
}
+ joinComparisonCount++;
if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
// remove from memory
@@ -249,6 +252,9 @@ public class MergeJoiner extends AbstractMergeJoiner {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Continue with left stream.");
}
+ spillCount++;
+ spillReadCount += runFileStream.getReadCount();
+ spillWriteCount += runFileStream.getWriteCount();
}
private void unfreezeAndContinue(ITupleAccessor accessor) throws HyracksDataException {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 202aac6..3d99d6c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.dataflow.std.join;
import java.io.DataOutput;
import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -43,6 +45,8 @@ import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager;
import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
public class NestedLoopJoin {
+ private static final Logger LOGGER = Logger.getLogger(NestedLoopJoin.class.getName());
+
private final FrameTupleAccessor accessorInner;
private final FrameTupleAccessor accessorOuter;
private final FrameTupleAppender appender;
@@ -58,6 +62,11 @@ public class NestedLoopJoin {
private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
private BufferInfo tempInfo = new BufferInfo(null, -1, -1);
+ private long joinComparisonCount = 0;
+ private long joinResultCount = 0;
+ private long spillWriteCount = 0;
+ private long spillReadCount = 0;
+
public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner,
ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
IMissingWriter[] missingWriters) throws HyracksDataException {
@@ -99,6 +108,7 @@ public class NestedLoopJoin {
public void cache(ByteBuffer buffer) throws HyracksDataException {
runFileWriter.nextFrame(buffer);
+ spillWriteCount++;
}
public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
@@ -109,6 +119,7 @@ public class NestedLoopJoin {
for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
}
+ spillReadCount++;
}
runFileReader.close();
outerBufferMngr.reset();
@@ -135,6 +146,7 @@ public class NestedLoopJoin {
matchFound = true;
appendToResults(i, j, writer);
}
+ joinComparisonCount++;
}
if (!matchFound && isLeftOuter) {
@@ -149,9 +161,9 @@ public class NestedLoopJoin {
private boolean evaluatePredicate(int tIx1, int tIx2) {
if (isReversed) { //Role Reversal Optimization is triggered
- return ((predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1));
+ return (predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1);
} else {
- return ((predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2));
+ return (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2);
}
}
@@ -166,6 +178,7 @@ public class NestedLoopJoin {
private void appendResultToFrame(FrameTupleAccessor accessor1, int tupleId1, FrameTupleAccessor accessor2,
int tupleId2, IFrameWriter writer) throws HyracksDataException {
FrameUtils.appendConcatToWriter(writer, appender, accessor1, tupleId1, accessor2, tupleId2);
+ joinResultCount++;
}
public void closeCache() throws HyracksDataException {
@@ -181,11 +194,17 @@ public class NestedLoopJoin {
for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
}
+ spillReadCount++;
}
runFileReader.close();
outerBufferMngr.reset();
appender.write(writer, true);
+
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("NestedLoopJoin statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+ + " results, " + spillWriteCount + " frames written, " + spillReadCount + " frames read.");
+ }
}
private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 09207b9..4fa1498 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -19,9 +19,6 @@
package org.apache.hyracks.dataflow.std.join;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -98,22 +95,9 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
public static class JoinCacheTaskState extends AbstractStateObject {
private NestedLoopJoin joiner;
- public JoinCacheTaskState() {
- }
-
private JoinCacheTaskState(JobId jobId, TaskId taskId) {
super(jobId, taskId);
}
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
-
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
-
- }
}
private class JoinCacheActivityNode extends AbstractActivityNode {
@@ -132,8 +116,8 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
- final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null)
- ? predEvaluatorFactory.createPredicateEvaluator() : null);
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory != null)
+ ? predEvaluatorFactory.createPredicateEvaluator() : null;
final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
@@ -142,7 +126,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
}
}
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
private JoinCacheTaskState state;
@Override
@@ -170,9 +154,9 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
@Override
public void fail() throws HyracksDataException {
+ // No variables to update.
}
};
- return op;
}
}
@@ -186,8 +170,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-
- IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private JoinCacheTaskState state;
@Override
@@ -216,7 +199,6 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
writer.fail();
}
};
- return op;
}
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index afeed07..aaaaaf4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -37,7 +37,6 @@ public class RunFileStream {
private static final Logger LOGGER = Logger.getLogger(RunFileStream.class.getName());
private final String key;
- private int runFileCounter;
private final IFrame runFileBuffer;
private final IFrameTupleAppender runFileAppender;
private RunFileWriter runFileWriter;
@@ -46,26 +45,41 @@ public class RunFileStream {
private final IHyracksTaskContext ctx;
+ private long runFileCounter = 0;
+ private long readCount = 0;
+ private long writeCount = 0;
+
public RunFileStream(IHyracksTaskContext ctx, String key, IRunFileStreamStatus status) throws HyracksDataException {
this.ctx = ctx;
this.key = key;
this.status = status;
- runFileCounter = 0;
runFileBuffer = new VSizeFrame(ctx);
runFileAppender = new FrameTupleAppender(new VSizeFrame(ctx));
}
+ public long getReadCount() {
+ return readCount;
+ }
+
+ public long getWriteCount() {
+ return writeCount;
+ }
+
public void startRunFile() throws HyracksDataException {
+ readCount = 0;
+ writeCount = 0;
+ runFileCounter++;
+
status.setRunFileWriting(true);
- String prefix = this.getClass().getSimpleName() + '-' + key + '-' + Integer.toString(runFileCounter) + '-'
+ String prefix = this.getClass().getSimpleName() + '-' + key + '-' + Long.toString(runFileCounter) + '-'
+ this.toString();
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(prefix);
runFileWriter = new RunFileWriter(file, ctx.getIOManager());
runFileWriter.open();
- ++runFileCounter;
if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("A new run file has been started (key: " + key + ", number: " + runFileCounter + ", file: " + file + ").");
+ LOGGER.fine("A new run file has been started (key: " + key + ", number: " + runFileCounter + ", file: "
+ + file + ").");
}
}
@@ -73,6 +87,7 @@ public class RunFileStream {
int idx = accessor.getTupleId();
if (!runFileAppender.append(accessor, idx)) {
runFileAppender.write(runFileWriter, true);
+ writeCount++;
runFileAppender.append(accessor, idx);
}
}
@@ -104,6 +119,7 @@ public class RunFileStream {
if (runFileReader.nextFrame(runFileBuffer)) {
accessor.reset(runFileBuffer.getBuffer());
accessor.next();
+ readCount++;
return true;
}
return false;
@@ -129,6 +145,7 @@ public class RunFileStream {
// Flush buffer.
if (runFileAppender.getTupleCount() > 0) {
runFileAppender.write(runFileWriter, true);
+ writeCount++;
}
}