You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:38 UTC

[48/50] [abbrv] asterixdb git commit: snapshot - preparing for write once read many process on sort merge join

snapshot - preparing for write once read many process on sort merge join


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b4a3fd56
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b4a3fd56
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b4a3fd56

Branch: refs/heads/ecarm002/interval_join_merge
Commit: b4a3fd56b637b20ec5be667261002772d8d76603
Parents: e256e63
Author: Preston Carman <pr...@apache.org>
Authored: Fri Oct 14 17:50:23 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Fri Oct 14 17:50:23 2016 -0700

----------------------------------------------------------------------
 .../intervalindex/IntervalIndexJoiner.java      |  8 +--
 .../IntervalPartitionJoiner.java                | 20 +++++++-
 .../hyracks/dataflow/std/join/MergeJoiner.java  | 49 +++++++++++++-----
 .../dataflow/std/join/RunFileStream.java        | 52 ++++++++++++++------
 4 files changed, 96 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index 8b7a12e..d3303b6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -435,7 +435,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
                     + bufferManager.getNumTuples(RIGHT_PARTITION) + " memory].");
         }
         if (bufferManager.getNumTuples(LEFT_PARTITION) > bufferManager.getNumTuples(RIGHT_PARTITION)) {
-            runFileStream[RIGHT_PARTITION].startRunFile();
+            runFileStream[RIGHT_PARTITION].startRunFileWriting();
             if (LOGGER.isLoggable(Level.FINE)) {
                 LOGGER.fine("Memory is full. Freezing the left branch. (Left memory tuples: "
                         + bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: "
@@ -444,7 +444,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
             bufferManager.printStats("memory details");
             rightSpillCount++;
         } else {
-            runFileStream[LEFT_PARTITION].startRunFile();
+            runFileStream[LEFT_PARTITION].startRunFileWriting();
             if (LOGGER.isLoggable(Level.FINE)) {
                 LOGGER.fine("Memory is full. Freezing the right branch. (Left memory tuples: "
                         + bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: "
@@ -456,7 +456,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
     }
 
     private void continueStream(int diskPartition, ITupleAccessor accessor) throws HyracksDataException {
-        runFileStream[diskPartition].closeRunFile();
+        runFileStream[diskPartition].closeRunFileReading();
         accessor.reset(inputBuffer[diskPartition]);
         accessor.setTupleId(streamIndex[diskPartition]);
         if (LOGGER.isLoggable(Level.FINE)) {
@@ -490,7 +490,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
                 || (RIGHT_PARTITION == frozenPartition && !status.branch[RIGHT_PARTITION].isRunFileReading())) {
             streamIndex[frozenPartition] = accessor.getTupleId();
         }
-        runFileStream[frozenPartition].openRunFile(accessor);
+        runFileStream[frozenPartition].startReadingRunFile(accessor);
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Unfreezing (" + frozenPartition + ").");
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index 984db20..f57d205 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -64,6 +64,8 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
     private final int numOfPartitions;
     private long buildSize = 0;
     private long probeSize = 0;
+    private long[] buildPartitionSizes;
+    private long[] probePartitionSizes;
     private final TreeMap<RunFilePointer, Integer> probeRunFilePointers;
 
     private final VPartitionTupleBufferManager buildBufferManager;
@@ -92,8 +94,10 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
         this.accessorProbe = new FrameTupleAccessor(leftRd);
         reloadBuffer = new VSizeFrame(ctx);
 
-        this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);;
+        this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);
         this.imjc = imjc;
+        buildPartitionSizes = new long[numOfPartitions];
+        probePartitionSizes = new long[numOfPartitions];
 
         // TODO fix available memory size
         this.buildMemory = memorySize;
@@ -135,6 +139,7 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
             }
             inputAccessor[LEFT_PARTITION].next();
             probeSize++;
+            probePartitionSizes[pid]++;
         }
         inputBuffer[LEFT_PARTITION].rewind();
         probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]);
@@ -151,6 +156,16 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
             LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, "
                     + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
         }
+        System.err.print("build: [");
+        for (int i = 0; i < buildPartitionSizes.length; i++) {
+            System.err.print(buildPartitionSizes[i] + ", ");
+        }
+        System.err.println("]");
+        System.err.print("probe: [");
+        for (int i = 0; i < probePartitionSizes.length; i++) {
+            System.err.print(probePartitionSizes[i] + ", ");
+        }
+        System.err.println("]");
     }
 
     private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException {
@@ -181,6 +196,8 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
                 Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k);
                 if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) {
                     fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId));
+                    System.err.println("join " + probe + "(" + probePartitionSizes[probeRunFilePointers.get(probeId)]
+                            + ") with " + build + "(" + buildPartitionSizes[buildId] + ")");
                 }
             }
             if (!fbms.isEmpty()) {
@@ -262,6 +279,7 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
                     inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) {
                 return;
             }
+            buildPartitionSizes[pid]++;
 
             if (buildPid != pid) {
                 // Track new partitions in memory.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index d4195ec..efb3756 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -35,6 +35,7 @@ import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
+import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 
 /**
@@ -54,7 +55,10 @@ public class MergeJoiner extends AbstractMergeJoiner {
     private final LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
 
     private int leftStreamIndex;
+    private final RunFileStream runFileStreamOld;
     private final RunFileStream runFileStream;
+    private ITupleAccessor tmpAccessor;
+    private final RunFilePointer runFilePointer;
 
     private final IMergeJoinChecker mjc;
 
@@ -69,6 +73,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
             IMergeJoinChecker mjc, RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException {
         super(ctx, partition, status, locks, leftRd, rightRd);
         this.mjc = mjc;
+        tmpAccessor = new TupleAccessor(leftRd);
+        runFilePointer = new RunFilePointer();
 
         // Memory (right buffer)
         if (memorySize < 1) {
@@ -81,6 +87,7 @@ public class MergeJoiner extends AbstractMergeJoiner {
 
         // Run File and frame cache (left buffer)
         leftStreamIndex = TupleAccessor.UNSET;
+        runFileStreamOld = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
         runFileStream = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
 
         if (LOGGER.isLoggable(Level.FINE)) {
@@ -158,7 +165,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
 
     private TupleStatus loadSpilledTuple(int partition) throws HyracksDataException {
         if (!inputAccessor[partition].exists()) {
-            if (!runFileStream.loadNextBuffer(inputAccessor[partition])) {
+            runFileStream.loadNextBuffer(tmpAccessor);
+            if (!runFileStreamOld.loadNextBuffer(inputAccessor[partition])) {
                 return TupleStatus.EMPTY;
             }
         }
@@ -200,8 +208,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
         resultAppender.write(writer, true);
         if (LOGGER.isLoggable(Level.WARNING)) {
             LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
-                    + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, "
-                    + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount()
+                    + " results, " + spillCount + " spills, " + runFileStreamOld.getFileCount() + " files, "
+                    + runFileStreamOld.getWriteCount() + " spill frames written, " + runFileStreamOld.getReadCount()
                     + " spill frames read.");
         }
     }
@@ -209,7 +217,11 @@ public class MergeJoiner extends AbstractMergeJoiner {
     private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException {
         //        System.err.print("Spill ");
 
-        runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]);
+        runFileStreamOld.addToRunFile(inputAccessor[LEFT_PARTITION]);
+        if (true) {
+            runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]);
+        }
+
         processLeftTuple(writer);
 
         // Memory is empty and we can start processing the run file.
@@ -264,7 +276,13 @@ public class MergeJoiner extends AbstractMergeJoiner {
                     + bufferManager.getNumTuples() + " tuples memory].");
         }
 
-        runFileStream.startRunFile();
+        if (runFilePointer.getFileOffset() > 0) {
+
+        } else {
+            runFilePointer.reset(0, 0);
+            runFileStream.startRunFileWriting();
+        }
+        runFileStreamOld.startRunFileWriting();
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine(
                     "Memory is full. Freezing the right branch. (memory tuples: " + bufferManager.getNumTuples() + ")");
@@ -275,7 +293,7 @@ public class MergeJoiner extends AbstractMergeJoiner {
     private void continueStream(ITupleAccessor accessor) throws HyracksDataException {
         //        System.err.println("continueStream");
 
-        runFileStream.closeRunFile();
+        runFileStreamOld.closeRunFileReading();
         accessor.reset(inputBuffer[LEFT_PARTITION]);
         accessor.setTupleId(leftStreamIndex);
         if (LOGGER.isLoggable(Level.FINE)) {
@@ -289,20 +307,25 @@ public class MergeJoiner extends AbstractMergeJoiner {
             LOGGER.warning("snapshot: " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
                     + " left, " + joinComparisonCount + " comparisons, " + joinResultCount + " results, ["
                     + bufferManager.getNumTuples() + " tuples memory, " + spillCount + " spills, "
-                    + (runFileStream.getFileCount() - spillFileCount) + " files, "
-                    + (runFileStream.getWriteCount() - spillWriteCount) + " written, "
-                    + (runFileStream.getReadCount() - spillReadCount) + " read].");
-            spillFileCount = runFileStream.getFileCount();
-            spillReadCount = runFileStream.getReadCount();
-            spillWriteCount = runFileStream.getWriteCount();
+                    + (runFileStreamOld.getFileCount() - spillFileCount) + " files, "
+                    + (runFileStreamOld.getWriteCount() - spillWriteCount) + " written, "
+                    + (runFileStreamOld.getReadCount() - spillReadCount) + " read].");
+            spillFileCount = runFileStreamOld.getFileCount();
+            spillReadCount = runFileStreamOld.getReadCount();
+            spillWriteCount = runFileStreamOld.getWriteCount();
         }
 
+        runFileStreamOld.flushAndStopRunFile(accessor);
         runFileStream.flushAndStopRunFile(accessor);
         flushMemory();
         if (!status.branch[LEFT_PARTITION].isRunFileReading()) {
             leftStreamIndex = accessor.getTupleId();
         }
-        runFileStream.openRunFile(accessor);
+        runFileStreamOld.startReadingRunFile(accessor);
+
+        runFileStream.resetReadPointer(runFilePointer.getFileOffset());
+        accessor.setTupleId(runFilePointer.getTupleIndex());
+        runFileStream.startReadingRunFile(accessor);
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Unfreezing right partition.");
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index 7e8a8d1..f50c34a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -31,6 +31,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
 
 public class RunFileStream {
 
@@ -79,7 +80,7 @@ public class RunFileStream {
         return writeCount;
     }
 
-    public void startRunFile() throws HyracksDataException {
+    public void startRunFileWriting() throws HyracksDataException {
         runFileCounter++;
 
         status.setRunFileWriting(true);
@@ -94,6 +95,19 @@ public class RunFileStream {
         }
     }
 
+    public void resumeRunFileWriting() throws HyracksDataException {
+        status.setRunFileWriting(true);
+        String prefix = this.getClass().getSimpleName() + '-' + key + '-' + Long.toString(runFileCounter) + '-'
+                + this.toString();
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(prefix);
+        runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+        runFileWriter.open();
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("A new run file has been started (key: " + key + ", number: " + runFileCounter + ", file: "
+                    + file + ").");
+        }
+    }
+
     public void addToRunFile(ITupleAccessor accessor) throws HyracksDataException {
         int idx = accessor.getTupleId();
         if (!runFileAppender.append(accessor, idx)) {
@@ -105,28 +119,28 @@ public class RunFileStream {
         tupleCount++;
     }
 
-    public void openRunFile(ITupleAccessor accessor) throws HyracksDataException {
+    public void startReadingRunFile(ITupleAccessor accessor) throws HyracksDataException {
         status.setRunFileReading(true);
 
         // Create reader
-        runFileReader = runFileWriter.createDeleteOnCloseReader();
+        runFileReader = runFileWriter.createReader();
         runFileReader.open();
 
         // Load first frame
         loadNextBuffer(accessor);
     }
 
-    public void resetReader(ITupleAccessor accessor) throws HyracksDataException {
-        if (status.isRunFileWriting()) {
-            flushAndStopRunFile(accessor);
-            openRunFile(accessor);
-        } else {
-            runFileReader.reset();
-
-            // Load first frame
-            loadNextBuffer(accessor);
-        }
-    }
+//    public void resetReader(ITupleAccessor accessor) throws HyracksDataException {
+//        if (status.isRunFileWriting()) {
+//            flushAndStopRunFile(accessor);
+//            startReadingRunFile(accessor);
+//        } else {
+//            runFileReader.reset();
+//
+//            // Load first frame
+//            loadNextBuffer(accessor);
+//        }
+//    }
 
     public boolean loadNextBuffer(ITupleAccessor accessor) throws HyracksDataException {
         if (runFileReader.nextFrame(runFileBuffer)) {
@@ -138,6 +152,14 @@ public class RunFileStream {
         return false;
     }
 
+    public long getReadPointer() throws HyracksDataException {
+        return runFileReader.getReadPointer();
+    }
+
+    public void resetReadPointer(long fileOffset) throws HyracksDataException {
+        runFileReader.reset(fileOffset);
+    }
+
     public void flushAndStopRunFile(ITupleAccessor accessor) throws HyracksDataException {
         status.setRunFileWriting(false);
 
@@ -163,7 +185,7 @@ public class RunFileStream {
         }
     }
 
-    public void closeRunFile() throws HyracksDataException {
+    public void closeRunFileReading() throws HyracksDataException {
         status.setRunFileReading(false);
         runFileReader.close();
     }