You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:16 UTC

[26/50] [abbrv] asterixdb git commit: Add stats tracking for interval joins.

Add stats tracking for interval joins.


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/a9729514
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/a9729514
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/a9729514

Branch: refs/heads/ecarm002/interval_join_merge
Commit: a9729514d7d517a24df2d290d15e854af6d65a51
Parents: 1487f2b
Author: Preston Carman <pr...@apache.org>
Authored: Thu Aug 25 11:43:57 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Aug 25 11:43:57 2016 -0700

----------------------------------------------------------------------
 .../intervalindex/IntervalIndexJoiner.java      | 54 ++++++++++++--------
 .../InMemoryIntervalPartitionJoin.java          | 18 ++++++-
 .../IntervalPartitionJoiner.java                | 44 ++++++++++++----
 .../IPartitionedTupleBufferManager.java         |  2 +
 .../VPartitionTupleBufferManager.java           |  5 ++
 .../dataflow/std/join/AbstractMergeJoiner.java  | 10 ++--
 .../hyracks/dataflow/std/join/MergeJoiner.java  | 34 +++++++-----
 .../dataflow/std/join/NestedLoopJoin.java       | 23 ++++++++-
 .../join/NestedLoopJoinOperatorDescriptor.java  | 28 ++--------
 .../dataflow/std/join/RunFileStream.java        | 27 ++++++++--
 10 files changed, 160 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index 6f04cad..e4c4cbe 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -29,11 +29,9 @@ import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFacto
 import org.apache.asterix.runtime.operators.joins.IntervalJoinUtil;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedDeletableTupleBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
@@ -57,23 +55,27 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
 
     private static final Logger LOGGER = Logger.getLogger(IntervalIndexJoiner.class.getName());
 
-    private IPartitionedDeletableTupleBufferManager bufferManager;
+    private final IPartitionedDeletableTupleBufferManager bufferManager;
 
-    private ActiveSweepManager[] activeManager;
-    private ITuplePointerAccessor[] memoryAccessor;
-    private int[] streamIndex;
-    private RunFileStream[] runFileStream;
+    private final ActiveSweepManager[] activeManager;
+    private final ITuplePointerAccessor[] memoryAccessor;
+    private final int[] streamIndex;
+    private final RunFileStream[] runFileStream;
 
-    private LinkedList<TuplePointer> buffer = new LinkedList<>();
+    private final LinkedList<TuplePointer> buffer = new LinkedList<>();
 
-    private IIntervalMergeJoinChecker imjc;
+    private final IIntervalMergeJoinChecker imjc;
 
-    protected byte point;
+    private final byte point;
 
-    private MergeStatus status;
+    private final int leftKey;
+    private final int rightKey;
 
-    private int leftKey;
-    private int rightKey;
+    private long joinComparisonCount = 0;
+    private long joinResultCount = 0;
+    private long spillCount = 0;
+    private long spillReadCount = 0;
+    private long spillWriteCount = 0;
 
     public IntervalIndexJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status,
             MergeJoinLocks locks, Comparator<EndPointIndexItem> endPointComparator,
@@ -87,8 +89,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         this.leftKey = leftKeys[0];
         this.rightKey = rightKeys[0];
 
-        this.status = status;
-
         RecordDescriptor[] recordDescriptors = new RecordDescriptor[JOIN_PARTITIONS];
         recordDescriptors[LEFT_PARTITION] = leftRd;
         recordDescriptors[RIGHT_PARTITION] = rightRd;
@@ -119,8 +119,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         runFileStream[LEFT_PARTITION] = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
         runFileStream[RIGHT_PARTITION] = new RunFileStream(ctx, "right", status.branch[RIGHT_PARTITION]);
 
-        // Result
-        resultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize
                     + " frames of memory.");
@@ -134,6 +132,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         } else {
             FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, index1, accessor2, index2);
         }
+        joinResultCount++;
     }
 
     @Override
@@ -143,6 +142,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         activeManager[RIGHT_PARTITION].clear();
         runFileStream[LEFT_PARTITION].close();
         runFileStream[RIGHT_PARTITION].close();
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+                    + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, "
+                    + spillReadCount + " spill frames read.");
+        }
     }
 
     private void flushMemory(int partition) throws HyracksDataException {
@@ -378,10 +382,12 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
             outerAccessor.reset(outerTp);
             for (TuplePointer innerTp : inner) {
                 innerAccessor.reset(innerTp);
-                if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(),
-                        reversed)) {
-                    addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(), reversed, writer);
+                if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor,
+                        innerTp.getTupleIndex(), reversed)) {
+                    addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(),
+                            reversed, writer);
                 }
+                joinComparisonCount++;
             }
         }
         if (LOGGER.isLoggable(Level.FINE)) {
@@ -394,11 +400,12 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
             ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter writer) throws HyracksDataException {
         for (TuplePointer outerTp : outer) {
             outerAccessor.reset(outerTp);
-            if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor, tupleAccessor.getTupleId(),
-                    reversed)) {
+            if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor,
+                    tupleAccessor.getTupleId(), reversed)) {
                 addToResult(outerAccessor, outerTp.getTupleIndex(), tupleAccessor, tupleAccessor.getTupleId(), reversed,
                         writer);
             }
+            joinComparisonCount++;
         }
     }
 
@@ -427,6 +434,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Continue with stream (" + diskPartition + ").");
         }
+        spillCount++;
+        spillReadCount += runFileStream[diskPartition].getReadCount();
+        spillWriteCount += runFileStream[diskPartition].getWriteCount();
     }
 
     private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor, int flushPartition)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
index b5256b5..88ff727 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
@@ -35,13 +35,16 @@ import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
 
 public class InMemoryIntervalPartitionJoin {
 
+    private static final Logger LOGGER = Logger.getLogger(InMemoryIntervalPartitionJoin.class.getName());
+
     private final FrameTupleAccessor accessorBuild;
     private final FrameTupleAppender appender;
     private final IFrameBufferManager fbm;
     private BufferInfo bufferInfo;
     private final IIntervalMergeJoinChecker imjc;
 
-    private static final Logger LOGGER = Logger.getLogger(InMemoryIntervalPartitionJoin.class.getName());
+    private long joinComparisonCount = 0;
+    private long joinResultCount = 0;
 
     public InMemoryIntervalPartitionJoin(IHyracksTaskContext ctx, IFrameBufferManager fbm,
             IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, RecordDescriptor probeRd)
@@ -55,6 +58,14 @@ public class InMemoryIntervalPartitionJoin {
                 "InMemoryIntervalPartitionJoin has been created for Thread ID " + Thread.currentThread().getId() + ".");
     }
 
+    public long getComparisonCount() {
+        return joinComparisonCount;
+    }
+
+    public long getResultCount() {
+        return joinResultCount;
+    }
+
     public void join(IFrameTupleAccessor accessorProbe, int probeTupleIndex, IFrameWriter writer)
             throws HyracksDataException {
         if (fbm.getNumFrames() != 0) {
@@ -62,9 +73,11 @@ public class InMemoryIntervalPartitionJoin {
                 fbm.getFrame(frameIndex, bufferInfo);
                 accessorBuild.reset(bufferInfo.getBuffer());
                 for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
-                    if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) {
+                    if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex,
+                            false)) {
                         appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer);
                     }
+                    joinComparisonCount++;
                 }
             }
         }
@@ -77,5 +90,6 @@ public class InMemoryIntervalPartitionJoin {
     private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe,
             int probeSidetIx, IFrameWriter writer) throws HyracksDataException {
         FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe, probeSidetIx);
+        joinResultCount++;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index fe49d2f..e943a48 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -52,14 +52,16 @@ import org.apache.hyracks.dataflow.std.structures.TuplePointer;
  */
 public class IntervalPartitionJoiner {
 
-    // Used for special probe BigObject which can not be held into the Join memory
-    private FrameTupleAppender bigProbeFrameAppender;
+    private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
 
-    enum SIDE {
+    private enum SIDE {
         BUILD,
         PROBE
     }
 
+    // Used for special probe BigObject which can not be held into the Join memory
+    private FrameTupleAppender bigProbeFrameAppender;
+
     private IHyracksTaskContext ctx;
 
     private final String buildRelName;
@@ -85,8 +87,6 @@ public class IntervalPartitionJoiner {
     private final FrameTupleAccessor accessorBuild;
     private final FrameTupleAccessor accessorProbe;
 
-    private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
-
     // stats information
     private IntervalPartitionJoinData ipjd;
 
@@ -95,6 +95,12 @@ public class IntervalPartitionJoiner {
 
     private IIntervalMergeJoinChecker imjc;
 
+    private long joinComparisonCount = 0;
+    private long joinResultCount = 0;
+    private long spillCount = 0;
+    private long spillReadCount = 0;
+    private long spillWriteCount = 0;
+
     public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
             String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
             RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) {
@@ -220,6 +226,8 @@ public class IntervalPartitionJoiner {
 
     private void spillPartition(int pid) throws HyracksDataException {
         RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
+        spillCount++;
+        spillWriteCount += buildBufferManager.getNumFrames(pid);
         buildBufferManager.flushPartition(pid, writer);
         buildBufferManager.clearPartition(pid);
         ipjd.buildSpill(pid);
@@ -261,7 +269,9 @@ public class IntervalPartitionJoiner {
     private void flushAndClearBuildSpilledPartition() throws HyracksDataException {
         for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) {
             if (buildBufferManager.getNumTuples(pid) > 0) {
-                buildBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD));
+                spillWriteCount += buildBufferManager.getNumFrames(pid);
+                RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
+                buildBufferManager.flushPartition(pid, runFileWriter);
                 buildBufferManager.clearPartition(pid);
                 buildRFWriters[pid].close();
             }
@@ -271,7 +281,9 @@ public class IntervalPartitionJoiner {
     private void flushAndClearProbeSpilledPartition() throws HyracksDataException {
         for (int pid = 0; pid < numOfPartitions; ++pid) {
             if (probeBufferManager.getNumTuples(pid) > 0) {
-                probeBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE));
+                spillWriteCount += probeBufferManager.getNumFrames(pid);
+                RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+                probeBufferManager.flushPartition(pid, runFileWriter);
                 probeBufferManager.clearPartition(pid);
                 probeRFWriters[pid].close();
             }
@@ -310,6 +322,7 @@ public class IntervalPartitionJoiner {
                     return false;
                 }
             }
+            spillReadCount++;
         }
 
         r.close();
@@ -329,13 +342,15 @@ public class IntervalPartitionJoiner {
     }
 
     private void createInMemoryJoiner(int pid) throws HyracksDataException {
-        this.inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
+        inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
                 buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd);
     }
 
     private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException {
-        this.inMemJoiner[pid].closeJoin(writer);
-        this.inMemJoiner[pid] = null;
+        joinComparisonCount += inMemJoiner[pid].getComparisonCount();
+        joinResultCount += inMemJoiner[pid].getResultCount();
+        inMemJoiner[pid].closeJoin(writer);
+        inMemJoiner[pid] = null;
     }
 
     public void initProbe() throws HyracksDataException {
@@ -374,6 +389,8 @@ public class IntervalPartitionJoiner {
                             break;
                         }
                         RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
+                        spillCount++;
+                        spillWriteCount += probeBufferManager.getNumFrames(pid);
                         probeBufferManager.flushPartition(victim, runFileWriter);
                         probeBufferManager.clearPartition(victim);
                     }
@@ -582,7 +599,7 @@ public class IntervalPartitionJoiner {
         }
 
         public int buildNextInMemory(int pid) {
-            int nextPid =  buildSpilledStatus.nextClearBit(pid);
+            int nextPid = buildSpilledStatus.nextClearBit(pid);
             if (nextPid >= numOfPartitions) {
                 return -1;
             }
@@ -644,6 +661,11 @@ public class IntervalPartitionJoiner {
                 FileUtils.deleteQuietly(rfw.getFileReference().getFile());
             }
         }
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("IntervalPartitionJoiner statitics: " + joinComparisonCount + " comparisons, "
+                    + joinResultCount + " results, " + spillCount + " spills, " + spillWriteCount
+                    + " spill frames written, " + spillReadCount + " spill frames read.");
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index d844e7d..c0c96b3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -31,6 +31,8 @@ public interface IPartitionedTupleBufferManager {
 
     int getNumTuples(int partition);
 
+    int getNumFrames(int partition);
+
     int getPhysicalSize(int partition);
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 104f1ce..4a4cb5d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -90,6 +90,11 @@ public class VPartitionTupleBufferManager implements IPartitionedTupleBufferMana
     }
 
     @Override
+    public int getNumFrames(int partition) {
+        return partitionArray[partition].getNumFrames();
+    }
+
+    @Override
     public int getPhysicalSize(int partitionId) {
         int size = 0;
         IFrameBufferManager partition = partitionArray[partitionId];

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
index f8d328b..8006790 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
@@ -53,15 +53,13 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner {
     protected static final int LEFT_PARTITION = 0;
     protected static final int RIGHT_PARTITION = 1;
 
+    protected final ByteBuffer[] inputBuffer;
+    protected final FrameTupleAppender resultAppender;
     protected final ITupleAccessor[] inputAccessor;
-    protected ByteBuffer[] inputBuffer;
-
-    private MergeJoinLocks locks;
-    private MergeStatus status;
+    protected final MergeStatus status;
 
     private final int partition;
-
-    protected FrameTupleAppender resultAppender;
+    private final MergeJoinLocks locks;
 
     public AbstractMergeJoiner(IHyracksTaskContext ctx, int partition, MergeStatus status, MergeJoinLocks locks,
             RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index d94a63e..03283d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -24,11 +24,9 @@ import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
@@ -48,24 +46,27 @@ import org.apache.hyracks.dataflow.std.structures.TuplePointer;
  */
 public class MergeJoiner extends AbstractMergeJoiner {
 
-    private MergeStatus status;
+    private static final Logger LOGGER = Logger.getLogger(MergeJoiner.class.getName());
 
     private final IDeallocatableFramePool framePool;
-    private IDeletableTupleBufferManager bufferManager;
-    private ITuplePointerAccessor memoryAccessor;
-    private LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
+    private final IDeletableTupleBufferManager bufferManager;
+    private final ITuplePointerAccessor memoryAccessor;
+    private final LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
 
     private int leftStreamIndex;
-    private RunFileStream runFileStream;
+    private final RunFileStream runFileStream;
 
     private final IMergeJoinChecker mjc;
 
-    private static final Logger LOGGER = Logger.getLogger(MergeJoiner.class.getName());
+    private long joinComparisonCount = 0;
+    private long joinResultCount = 0;
+    private long spillWriteCount = 0;
+    private long spillReadCount = 0;
+    private long spillCount = 0;
 
     public MergeJoiner(IHyracksTaskContext ctx, int memorySize, int partition, MergeStatus status, MergeJoinLocks locks,
             IMergeJoinChecker mjc, RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException {
         super(ctx, partition, status, locks, leftRd, rightRd);
-        this.status = status;
         this.mjc = mjc;
 
         // Memory (right buffer)
@@ -81,8 +82,6 @@ public class MergeJoiner extends AbstractMergeJoiner {
         leftStreamIndex = TupleAccessor.UNSET;
         runFileStream = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
 
-        // Result
-        resultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine(
                     "MergeJoiner has started partition " + partition + " with " + memorySize + " frames of memory.");
@@ -107,11 +106,17 @@ public class MergeJoiner extends AbstractMergeJoiner {
             int rightTupleIndex, IFrameWriter writer) throws HyracksDataException {
         FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, leftTupleIndex, accessorRight,
                 rightTupleIndex);
+        joinResultCount++;
     }
 
     @Override
     public void closeResult(IFrameWriter writer) throws HyracksDataException {
         resultAppender.write(writer, true);
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+                    + " results, " + spillCount + " spills, " + spillWriteCount + " spill frames written, "
+                    + spillReadCount + " spill frames read.");
+        }
     }
 
     private void flushMemory() throws HyracksDataException {
@@ -203,15 +208,13 @@ public class MergeJoiner extends AbstractMergeJoiner {
         if (memoryHasTuples()) {
             for (int i = memoryBuffer.size() - 1; i > -1; --i) {
                 memoryAccessor.reset(memoryBuffer.get(i));
-                //                TuplePrinterUtil.printTuple("     --- A outer", inputAccessor[LEFT_PARTITION]);
-                //                TuplePrinterUtil.printTuple("     --- A inner", memoryAccessor);
                 if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
                         memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) {
                     // add to result
-                    //                    System.err.println("  -- Matched --");
                     addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
                             memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer);
                 }
+                joinComparisonCount++;
                 if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
                         memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
                     // remove from memory
@@ -249,6 +252,9 @@ public class MergeJoiner extends AbstractMergeJoiner {
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Continue with left stream.");
         }
+        spillCount++;
+        spillReadCount += runFileStream.getReadCount();
+        spillWriteCount += runFileStream.getWriteCount();
     }
 
     private void unfreezeAndContinue(ITupleAccessor accessor) throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 202aac6..3d99d6c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.dataflow.std.join;
 
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -43,6 +45,8 @@ import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager;
 import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
 
 public class NestedLoopJoin {
+    private static final Logger LOGGER = Logger.getLogger(NestedLoopJoin.class.getName());
+
     private final FrameTupleAccessor accessorInner;
     private final FrameTupleAccessor accessorOuter;
     private final FrameTupleAppender appender;
@@ -58,6 +62,11 @@ public class NestedLoopJoin {
     private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
     private BufferInfo tempInfo = new BufferInfo(null, -1, -1);
 
+    private long joinComparisonCount = 0;
+    private long joinResultCount = 0;
+    private long spillWriteCount = 0;
+    private long spillReadCount = 0;
+
     public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner,
             ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
             IMissingWriter[] missingWriters) throws HyracksDataException {
@@ -99,6 +108,7 @@ public class NestedLoopJoin {
 
     public void cache(ByteBuffer buffer) throws HyracksDataException {
         runFileWriter.nextFrame(buffer);
+        spillWriteCount++;
     }
 
     public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
@@ -109,6 +119,7 @@ public class NestedLoopJoin {
                 for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
                     blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
                 }
+                spillReadCount++;
             }
             runFileReader.close();
             outerBufferMngr.reset();
@@ -135,6 +146,7 @@ public class NestedLoopJoin {
                     matchFound = true;
                     appendToResults(i, j, writer);
                 }
+                joinComparisonCount++;
             }
 
             if (!matchFound && isLeftOuter) {
@@ -149,9 +161,9 @@ public class NestedLoopJoin {
 
     private boolean evaluatePredicate(int tIx1, int tIx2) {
         if (isReversed) { //Role Reversal Optimization is triggered
-            return ((predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1));
+            return (predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1);
         } else {
-            return ((predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2));
+            return (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2);
         }
     }
 
@@ -166,6 +178,7 @@ public class NestedLoopJoin {
     private void appendResultToFrame(FrameTupleAccessor accessor1, int tupleId1, FrameTupleAccessor accessor2,
             int tupleId2, IFrameWriter writer) throws HyracksDataException {
         FrameUtils.appendConcatToWriter(writer, appender, accessor1, tupleId1, accessor2, tupleId2);
+        joinResultCount++;
     }
 
     public void closeCache() throws HyracksDataException {
@@ -181,11 +194,17 @@ public class NestedLoopJoin {
             for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
                 blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
             }
+            spillReadCount++;
         }
         runFileReader.close();
         outerBufferMngr.reset();
 
         appender.write(writer, true);
+
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("NestedLoopJoin statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+                    + " results, " + spillWriteCount + " frames written, " + spillReadCount + " frames read.");
+        }
     }
 
     private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 09207b9..4fa1498 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -19,9 +19,6 @@
 
 package org.apache.hyracks.dataflow.std.join;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -98,22 +95,9 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
     public static class JoinCacheTaskState extends AbstractStateObject {
         private NestedLoopJoin joiner;
 
-        public JoinCacheTaskState() {
-        }
-
         private JoinCacheTaskState(JobId jobId, TaskId taskId) {
             super(jobId, taskId);
         }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
     }
 
     private class JoinCacheActivityNode extends AbstractActivityNode {
@@ -132,8 +116,8 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null)
-                    ? predEvaluatorFactory.createPredicateEvaluator() : null);
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory != null)
+                    ? predEvaluatorFactory.createPredicateEvaluator() : null;
 
             final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
             if (isLeftOuter) {
@@ -142,7 +126,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
                 }
             }
 
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+            return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private JoinCacheTaskState state;
 
                 @Override
@@ -170,9 +154,9 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    // No variables to update.
                 }
             };
-            return op;
         }
     }
 
@@ -186,8 +170,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-
-            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+            return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private JoinCacheTaskState state;
 
                 @Override
@@ -216,7 +199,6 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor
                     writer.fail();
                 }
             };
-            return op;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a9729514/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index afeed07..aaaaaf4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -37,7 +37,6 @@ public class RunFileStream {
     private static final Logger LOGGER = Logger.getLogger(RunFileStream.class.getName());
 
     private final String key;
-    private int runFileCounter;
     private final IFrame runFileBuffer;
     private final IFrameTupleAppender runFileAppender;
     private RunFileWriter runFileWriter;
@@ -46,26 +45,41 @@ public class RunFileStream {
 
     private final IHyracksTaskContext ctx;
 
+    private long runFileCounter = 0;
+    private long readCount = 0;
+    private long writeCount = 0;
+
     public RunFileStream(IHyracksTaskContext ctx, String key, IRunFileStreamStatus status) throws HyracksDataException {
         this.ctx = ctx;
         this.key = key;
         this.status = status;
 
-        runFileCounter = 0;
         runFileBuffer = new VSizeFrame(ctx);
         runFileAppender = new FrameTupleAppender(new VSizeFrame(ctx));
     }
 
+    public long getReadCount() {
+        return readCount;
+    }
+
+    public long getWriteCount() {
+        return writeCount;
+    }
+
     public void startRunFile() throws HyracksDataException {
+        readCount = 0;
+        writeCount = 0;
+        runFileCounter++;
+
         status.setRunFileWriting(true);
-        String prefix = this.getClass().getSimpleName() + '-' + key + '-' + Integer.toString(runFileCounter) + '-'
+        String prefix = this.getClass().getSimpleName() + '-' + key + '-' + Long.toString(runFileCounter) + '-'
                 + this.toString();
         FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(prefix);
         runFileWriter = new RunFileWriter(file, ctx.getIOManager());
         runFileWriter.open();
-        ++runFileCounter;
         if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("A new run file has been started (key: " + key + ", number: " + runFileCounter + ", file: " + file + ").");
+            LOGGER.fine("A new run file has been started (key: " + key + ", number: " + runFileCounter + ", file: "
+                    + file + ").");
         }
     }
 
@@ -73,6 +87,7 @@ public class RunFileStream {
         int idx = accessor.getTupleId();
         if (!runFileAppender.append(accessor, idx)) {
             runFileAppender.write(runFileWriter, true);
+            writeCount++;
             runFileAppender.append(accessor, idx);
         }
     }
@@ -104,6 +119,7 @@ public class RunFileStream {
         if (runFileReader.nextFrame(runFileBuffer)) {
             accessor.reset(runFileBuffer.getBuffer());
             accessor.next();
+            readCount++;
             return true;
         }
         return false;
@@ -129,6 +145,7 @@ public class RunFileStream {
         // Flush buffer.
         if (runFileAppender.getTupleCount() > 0) {
             runFileAppender.write(runFileWriter, true);
+            writeCount++;
         }
     }