You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:00 UTC

[10/50] [abbrv] asterixdb git commit: switch merge memory tracking method.

switch merge memory tracking method.


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4f9e6a82
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4f9e6a82
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4f9e6a82

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 4f9e6a82eaff790a50f86848e770e4b1dedf4069
Parents: 1f7ac98
Author: Preston Carman <pr...@apache.org>
Authored: Thu Jul 14 09:11:57 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Jul 14 09:11:57 2016 -0700

----------------------------------------------------------------------
 .../joins/AbstractIntervalMergeJoinChecker.java | 14 +++++++
 .../IntervalPartitionJoiner.java                |  2 +-
 .../dataflow/std/join/IMergeJoinChecker.java    |  3 ++
 .../hyracks/dataflow/std/join/MergeJoiner.java  | 41 +++++++++++---------
 4 files changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
index 0a25c25..cf0bf6a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
@@ -105,6 +105,20 @@ public abstract class AbstractIntervalMergeJoinChecker implements IIntervalMerge
     }
 
     @Override
+    public boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int leftTupleIndex,
+            IFrameTupleAccessor accessorRight, int rightTupleIndex) throws HyracksDataException {
+        try {
+            IntervalJoinUtil.getIntervalPointable(accessorLeft, leftTupleIndex, idLeft, tvp, ipLeft);
+            IntervalJoinUtil.getIntervalPointable(accessorRight, rightTupleIndex, idRight, tvp, ipRight);
+            ipLeft.getStart(startLeft);
+            ipRight.getEnd(endRight);
+            return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), startLeft, endRight) <= 0);
+        } catch (AsterixException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
     public boolean checkToSaveInResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex,
             IFrameTupleAccessor accessorRight, int rightTupleIndex, boolean reversed) throws HyracksDataException {
         try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index 5df7b0a..fe49d2f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -320,7 +320,7 @@ public class IntervalPartitionJoiner {
 
     private int selectPartitionsToReload(int freeSpace, int pid) {
         for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
-            assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled partition have size 0?";
+            assert buildRFWriters[id].getFileSize() > 0 : "How come a spilled partition have size 0?";
             if (freeSpace >= buildRFWriters[id].getFileSize()) {
                 return id;
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
index ddf04f3..49a3763 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinChecker.java
@@ -51,6 +51,9 @@ public interface IMergeJoinChecker extends Serializable {
     boolean checkToRemoveInMemory(ITupleAccessor accessorLeft, ITupleAccessor accessorRight)
             throws HyracksDataException;
 
+    boolean checkToRemoveInMemory(IFrameTupleAccessor accessorLeft, int leftTupleIndex,
+            IFrameTupleAccessor accessorRight, int rightTupleIndex) throws HyracksDataException;
+
     /**
      * Check to see if the next right tuple should be loaded during the merge join.
      * The check is true if the left tuple could match with the next right tuple.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f9e6a82/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index 625a24c..d94a63e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hyracks.dataflow.std.join;
 
+import java.util.LinkedList;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +34,7 @@ import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
@@ -47,10 +50,10 @@ public class MergeJoiner extends AbstractMergeJoiner {
 
     private MergeStatus status;
 
-    private final TuplePointer tp;
     private final IDeallocatableFramePool framePool;
     private IDeletableTupleBufferManager bufferManager;
-    private ITupleAccessor memoryAccessor;
+    private ITuplePointerAccessor memoryAccessor;
+    private LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
 
     private int leftStreamIndex;
     private RunFileStream runFileStream;
@@ -71,9 +74,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
                     "MergeJoiner does not have enough memory (needs > 0, got " + memorySize + ").");
         }
         framePool = new DeallocatableFramePool(ctx, (memorySize) * ctx.getInitialFrameSize());
-        tp = new TuplePointer();
         bufferManager = new VariableDeletableTupleMemoryManager(framePool, rightRd);
-        memoryAccessor = bufferManager.createTupleAccessor();
+        memoryAccessor = bufferManager.createTuplePointerAccessor();
 
         // Run File and frame cache (left buffer)
         leftStreamIndex = TupleAccessor.UNSET;
@@ -88,21 +90,23 @@ public class MergeJoiner extends AbstractMergeJoiner {
     }
 
     private boolean addToMemory(ITupleAccessor accessor) throws HyracksDataException {
+        TuplePointer tp = new TuplePointer();
         if (bufferManager.insertTuple(accessor, accessor.getTupleId(), tp)) {
+            memoryBuffer.add(tp);
             return true;
         }
         return false;
     }
 
-    private void removeFromMemory() throws HyracksDataException {
-        memoryAccessor.getTuplePointer(tp);
+    private void removeFromMemory(TuplePointer tp) throws HyracksDataException {
+        memoryBuffer.remove(tp);
         bufferManager.deleteTuple(tp);
     }
 
-    private void addToResult(ITupleAccessor accessor1, ITupleAccessor accessor2, IFrameWriter writer)
-            throws HyracksDataException {
-        FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, accessor1.getTupleId(), accessor2,
-                accessor2.getTupleId());
+    private void addToResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight,
+            int rightTupleIndex, IFrameWriter writer) throws HyracksDataException {
+        FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, leftTupleIndex, accessorRight,
+                rightTupleIndex);
     }
 
     @Override
@@ -197,21 +201,22 @@ public class MergeJoiner extends AbstractMergeJoiner {
     private void processLeftTuple(IFrameWriter writer) throws HyracksDataException {
         // Check against memory (right)
         if (memoryHasTuples()) {
-            memoryAccessor.reset();
-            memoryAccessor.next();
-            while (memoryAccessor.exists()) {
+            for (int i = memoryBuffer.size() - 1; i > -1; --i) {
+                memoryAccessor.reset(memoryBuffer.get(i));
                 //                TuplePrinterUtil.printTuple("     --- A outer", inputAccessor[LEFT_PARTITION]);
                 //                TuplePrinterUtil.printTuple("     --- A inner", memoryAccessor);
-                if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], memoryAccessor)) {
+                if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
+                        memoryAccessor, memoryBuffer.get(i).getTupleIndex(), false)) {
                     // add to result
                     //                    System.err.println("  -- Matched --");
-                    addToResult(inputAccessor[LEFT_PARTITION], memoryAccessor, writer);
+                    addToResult(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
+                            memoryAccessor, memoryBuffer.get(i).getTupleIndex(), writer);
                 }
-                if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], memoryAccessor)) {
+                if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(),
+                        memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
                     // remove from memory
-                    removeFromMemory();
+                    removeFromMemory(memoryBuffer.get(i));
                 }
-                memoryAccessor.next();
             }
         }
         inputAccessor[LEFT_PARTITION].next();