You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:31 UTC

[41/50] [abbrv] asterixdb git commit: Interface clean up.

Interface clean up.


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ce593414
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ce593414
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ce593414

Branch: refs/heads/ecarm002/interval_join_merge
Commit: ce5934148227e57eeebb369d57ae64d87b908959
Parents: 19f0997
Author: Preston Carman <pr...@apache.org>
Authored: Thu Sep 29 16:05:18 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Sep 29 16:05:18 2016 -0700

----------------------------------------------------------------------
 .../IntervalIndexJoinOperatorDescriptor.java    |  5 +--
 .../intervalindex/IntervalIndexJoiner.java      | 46 ++++++++++----------
 ...IntervalPartitionJoinOperatorDescriptor.java |  9 ++--
 .../dataflow/std/join/AbstractMergeJoiner.java  | 38 +++++++++-------
 .../hyracks/dataflow/std/join/IMergeJoiner.java |  6 +--
 .../std/join/MergeJoinOperatorDescriptor.java   |  6 +--
 .../hyracks/dataflow/std/join/MergeJoiner.java  | 35 +++++++--------
 7 files changed, 72 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
index d84fabc..fbddfef 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
@@ -151,7 +151,7 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip
                 }
                 try {
                     state.indexJoiner.setFrame(LEFT_ACTIVITY_ID, buffer);
-                    state.indexJoiner.processMergeUsingLeftTuple(writer);
+                    state.indexJoiner.processLeftFrame(writer);
                 } finally {
                     locks.getLock(partition).unlock();
                 }
@@ -175,8 +175,7 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip
                     if (state.failed) {
                         writer.fail();
                     } else {
-                        state.indexJoiner.processMergeUsingLeftTuple(writer);
-                        state.indexJoiner.closeResult(writer);
+                        state.indexJoiner.processLeftClose(writer);
                         writer.close();
                     }
                     state.status.branch[LEFT_ACTIVITY_ID].setStageClose();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index a4ad666..bac2f45 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -141,24 +141,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         joinResultCount++;
     }
 
-    @Override
-    public void closeResult(IFrameWriter writer) throws HyracksDataException {
-        resultAppender.write(writer, true);
-        activeManager[LEFT_PARTITION].clear();
-        activeManager[RIGHT_PARTITION].clear();
-        runFileStream[LEFT_PARTITION].close();
-        runFileStream[RIGHT_PARTITION].close();
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
-                    + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount()
-                    + " files, " + runFileStream[LEFT_PARTITION].getWriteCount() + " written, "
-                    + runFileStream[LEFT_PARTITION].getReadCount() + " read]. right[" + rightSpillCount + " spills, "
-                    + runFileStream[RIGHT_PARTITION].getFileCount() + " files, "
-                    + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, "
-                    + runFileStream[RIGHT_PARTITION].getReadCount() + " read].");
-        }
-    }
-
     private void flushMemory(int partition) throws HyracksDataException {
         activeManager[partition].clear();
     }
@@ -209,7 +191,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
     }
 
     @Override
-    public void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException {
+    public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
         TupleStatus leftTs = loadLeftTuple();
         TupleStatus rightTs = loadRightTuple();
         while (leftTs.isKnown() && checkHasMoreProcessing(leftTs, LEFT_PARTITION, RIGHT_PARTITION)
@@ -234,6 +216,27 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         }
     }
 
+    @Override
+    public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
+        processLeftFrame(writer);
+
+        resultAppender.write(writer, true);
+        activeManager[LEFT_PARTITION].clear();
+        activeManager[RIGHT_PARTITION].clear();
+        runFileStream[LEFT_PARTITION].close();
+        runFileStream[RIGHT_PARTITION].close();
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+                    + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount()
+                    + " files, " + runFileStream[LEFT_PARTITION].getWriteCount() + " written, "
+                    + runFileStream[LEFT_PARTITION].getReadCount() + " read]. right[" + rightSpillCount + " spills, "
+                    + runFileStream[RIGHT_PARTITION].getFileCount() + " files, "
+                    + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, "
+                    + runFileStream[RIGHT_PARTITION].getReadCount() + " read].");
+        }
+
+    }
+
     private boolean checkHasMoreProcessing(TupleStatus ts, int partition, int joinPartition) {
         return ts.isLoaded() || status.branch[partition].isRunFileWriting()
                 || (checkHasMoreTuples(joinPartition) && activeManager[partition].hasRecords());
@@ -493,9 +496,4 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         }
     }
 
-    @Override
-    public void closeInput(int partition) throws HyracksDataException {
-        // No changes are required.
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 60a4697..b4965ef 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -67,11 +67,11 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
 
     private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
 
-    public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int k, int[] leftKeys,
-            int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
+    public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memoryForJoin, int k,
+            int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
             RangeId rangeId) {
         super(spec, 2, 1);
-        this.memsize = memsize;
+        this.memsize = memoryForJoin;
         this.buildKey = leftKeys[0];
         this.probeKey = rightKeys[0];
         this.k = k;
@@ -161,7 +161,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
 
                     state.ipj.buildInit();
                     LOGGER.setLevel(Level.FINE);
-                    System.out.println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel());
+                    System.out
+                            .println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel());
                     if (LOGGER.isLoggable(Level.FINE)) {
                         LOGGER.fine("IntervalPartitionJoin is starting the build phase with " + state.k
                                 + " granules repesenting " + state.intervalPartitions + " interval partitions using "

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
index aa065cd..b1b4075 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
@@ -80,6 +80,29 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner {
         resultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
     }
 
+    public void setLeftFrame(ByteBuffer buffer) {
+        setFrame(LEFT_PARTITION, buffer);
+    }
+
+    public void setRightFrame(ByteBuffer buffer) {
+        setFrame(RIGHT_PARTITION, buffer);
+    }
+
+    protected TupleStatus loadMemoryTuple(int branch) {
+        TupleStatus loaded;
+        if (inputAccessor[branch] != null && inputAccessor[branch].exists()) {
+            // Still processing frame.
+            int test = inputAccessor[branch].getTupleCount();
+            loaded = TupleStatus.LOADED;
+        } else if (status.branch[branch].hasMore()) {
+            loaded = TupleStatus.UNKNOWN;
+        } else {
+            // No more frames or tuples to process.
+            loaded = TupleStatus.EMPTY;
+        }
+        return loaded;
+    }
+
     protected TupleStatus pauseAndLoadRightTuple() {
         status.continueRightLoad = true;
         locks.getRight(partition).signal();
@@ -99,21 +122,6 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner {
         return TupleStatus.LOADED;
     }
 
-    protected TupleStatus loadMemoryTuple(int branch) {
-        TupleStatus loaded;
-        if (inputAccessor[branch] != null && inputAccessor[branch].exists()) {
-            // Still processing frame.
-            int test = inputAccessor[branch].getTupleCount();
-            loaded = TupleStatus.LOADED;
-        } else if (status.branch[branch].hasMore()) {
-            loaded = TupleStatus.UNKNOWN;
-        } else {
-            // No more frames or tuples to process.
-            loaded = TupleStatus.EMPTY;
-        }
-        return loaded;
-    }
-
     @Override
     public void setFrame(int branch, ByteBuffer buffer) {
         inputBuffer[branch].clear();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
index 4268ec9..051d3e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
@@ -25,12 +25,10 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IMergeJoiner {
 
-    void closeResult(IFrameWriter writer) throws HyracksDataException;
+    void processLeftFrame(IFrameWriter writer) throws HyracksDataException;
 
-    void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException;
+    void processLeftClose(IFrameWriter writer) throws HyracksDataException;
 
     void setFrame(int partition, ByteBuffer buffer);
 
-    void closeInput(int partition) throws HyracksDataException;
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
index cbe1a66..c5f612f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
@@ -151,7 +151,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
                 }
                 try {
                     state.joiner.setFrame(LEFT_ACTIVITY_ID, buffer);
-                    state.joiner.processMergeUsingLeftTuple(writer);
+                    state.joiner.processLeftFrame(writer);
                 } finally {
                     locks.getLock(partition).unlock();
                 }
@@ -175,9 +175,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
                     if (state.failed) {
                         writer.fail();
                     } else {
-                        state.joiner.closeInput(LEFT_ACTIVITY_ID);
-                        state.joiner.processMergeUsingLeftTuple(writer);
-                        state.joiner.closeResult(writer);
+                        state.joiner.processLeftClose(writer);
                         writer.close();
                     }
                     state.status.branch[LEFT_ACTIVITY_ID].setStageClose();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index 9c93828..d4195ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -110,17 +110,6 @@ public class MergeJoiner extends AbstractMergeJoiner {
         joinResultCount++;
     }
 
-    @Override
-    public void closeResult(IFrameWriter writer) throws HyracksDataException {
-        resultAppender.write(writer, true);
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
-                    + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, "
-                    + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount()
-                    + " spill frames read.");
-        }
-    }
-
     private void flushMemory() throws HyracksDataException {
         memoryBuffer.clear();
         bufferManager.reset();
@@ -176,20 +165,13 @@ public class MergeJoiner extends AbstractMergeJoiner {
         return TupleStatus.LOADED;
     }
 
-    @Override
-    public void closeInput(int partition) throws HyracksDataException {
-        if (status.branch[partition].isRunFileWriting()) {
-            unfreezeAndContinue(inputAccessor[partition]);
-        }
-    }
-
     /**
      * Left
      *
      * @throws HyracksDataException
      */
     @Override
-    public void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException {
+    public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
         TupleStatus leftTs = loadLeftTuple();
         TupleStatus rightTs = loadRightTuple();
         while (leftTs.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() || memoryHasTuples())) {
@@ -209,6 +191,21 @@ public class MergeJoiner extends AbstractMergeJoiner {
         }
     }
 
+    @Override
+    public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
+        if (status.branch[LEFT_PARTITION].isRunFileWriting()) {
+            unfreezeAndContinue(inputAccessor[LEFT_PARTITION]);
+        }
+        processLeftFrame(writer);
+        resultAppender.write(writer, true);
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+                    + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, "
+                    + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount()
+                    + " spill frames read.");
+        }
+    }
+
     private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException {
         //        System.err.print("Spill ");