You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:38 UTC
[48/50] [abbrv] asterixdb git commit: snapshot - preparing for write
once read many process on sort merge join
snapshot - preparing for write once read many process on sort merge join
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b4a3fd56
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b4a3fd56
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b4a3fd56
Branch: refs/heads/ecarm002/interval_join_merge
Commit: b4a3fd56b637b20ec5be667261002772d8d76603
Parents: e256e63
Author: Preston Carman <pr...@apache.org>
Authored: Fri Oct 14 17:50:23 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Fri Oct 14 17:50:23 2016 -0700
----------------------------------------------------------------------
.../intervalindex/IntervalIndexJoiner.java | 8 +--
.../IntervalPartitionJoiner.java | 20 +++++++-
.../hyracks/dataflow/std/join/MergeJoiner.java | 49 +++++++++++++-----
.../dataflow/std/join/RunFileStream.java | 52 ++++++++++++++------
4 files changed, 96 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index 8b7a12e..d3303b6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -435,7 +435,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
+ bufferManager.getNumTuples(RIGHT_PARTITION) + " memory].");
}
if (bufferManager.getNumTuples(LEFT_PARTITION) > bufferManager.getNumTuples(RIGHT_PARTITION)) {
- runFileStream[RIGHT_PARTITION].startRunFile();
+ runFileStream[RIGHT_PARTITION].startRunFileWriting();
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Memory is full. Freezing the left branch. (Left memory tuples: "
+ bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: "
@@ -444,7 +444,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
bufferManager.printStats("memory details");
rightSpillCount++;
} else {
- runFileStream[LEFT_PARTITION].startRunFile();
+ runFileStream[LEFT_PARTITION].startRunFileWriting();
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Memory is full. Freezing the right branch. (Left memory tuples: "
+ bufferManager.getNumTuples(LEFT_PARTITION) + ", Right memory tuples: "
@@ -456,7 +456,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
}
private void continueStream(int diskPartition, ITupleAccessor accessor) throws HyracksDataException {
- runFileStream[diskPartition].closeRunFile();
+ runFileStream[diskPartition].closeRunFileReading();
accessor.reset(inputBuffer[diskPartition]);
accessor.setTupleId(streamIndex[diskPartition]);
if (LOGGER.isLoggable(Level.FINE)) {
@@ -490,7 +490,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
|| (RIGHT_PARTITION == frozenPartition && !status.branch[RIGHT_PARTITION].isRunFileReading())) {
streamIndex[frozenPartition] = accessor.getTupleId();
}
- runFileStream[frozenPartition].openRunFile(accessor);
+ runFileStream[frozenPartition].startReadingRunFile(accessor);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Unfreezing (" + frozenPartition + ").");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index 984db20..f57d205 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -64,6 +64,8 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
private final int numOfPartitions;
private long buildSize = 0;
private long probeSize = 0;
+ private long[] buildPartitionSizes;
+ private long[] probePartitionSizes;
private final TreeMap<RunFilePointer, Integer> probeRunFilePointers;
private final VPartitionTupleBufferManager buildBufferManager;
@@ -92,8 +94,10 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
this.accessorProbe = new FrameTupleAccessor(leftRd);
reloadBuffer = new VSizeFrame(ctx);
- this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);;
+ this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);
this.imjc = imjc;
+ buildPartitionSizes = new long[numOfPartitions];
+ probePartitionSizes = new long[numOfPartitions];
// TODO fix available memory size
this.buildMemory = memorySize;
@@ -135,6 +139,7 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
}
inputAccessor[LEFT_PARTITION].next();
probeSize++;
+ probePartitionSizes[pid]++;
}
inputBuffer[LEFT_PARTITION].rewind();
probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]);
@@ -151,6 +156,16 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, "
+ joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
}
+ System.err.print("build: [");
+ for (int i = 0; i < buildPartitionSizes.length; i++) {
+ System.err.print(buildPartitionSizes[i] + ", ");
+ }
+ System.err.println("]");
+ System.err.print("probe: [");
+ for (int i = 0; i < probePartitionSizes.length; i++) {
+ System.err.print(probePartitionSizes[i] + ", ");
+ }
+ System.err.println("]");
}
private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException {
@@ -181,6 +196,8 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k);
if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) {
fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId));
+ System.err.println("join " + probe + "(" + probePartitionSizes[probeRunFilePointers.get(probeId)]
+ + ") with " + build + "(" + buildPartitionSizes[buildId] + ")");
}
}
if (!fbms.isEmpty()) {
@@ -262,6 +279,7 @@ public class IntervalPartitionJoiner extends AbstractMergeJoiner {
inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) {
return;
}
+ buildPartitionSizes[pid]++;
if (buildPid != pid) {
// Track new partitions in memory.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index d4195ec..efb3756 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -35,6 +35,7 @@ import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor;
import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
+import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
/**
@@ -54,7 +55,10 @@ public class MergeJoiner extends AbstractMergeJoiner {
private final LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
private int leftStreamIndex;
+ private final RunFileStream runFileStreamOld;
private final RunFileStream runFileStream;
+ private ITupleAccessor tmpAccessor;
+ private final RunFilePointer runFilePointer;
private final IMergeJoinChecker mjc;
@@ -69,6 +73,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
IMergeJoinChecker mjc, RecordDescriptor leftRd, RecordDescriptor rightRd) throws HyracksDataException {
super(ctx, partition, status, locks, leftRd, rightRd);
this.mjc = mjc;
+ tmpAccessor = new TupleAccessor(leftRd);
+ runFilePointer = new RunFilePointer();
// Memory (right buffer)
if (memorySize < 1) {
@@ -81,6 +87,7 @@ public class MergeJoiner extends AbstractMergeJoiner {
// Run File and frame cache (left buffer)
leftStreamIndex = TupleAccessor.UNSET;
+ runFileStreamOld = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
runFileStream = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
if (LOGGER.isLoggable(Level.FINE)) {
@@ -158,7 +165,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
private TupleStatus loadSpilledTuple(int partition) throws HyracksDataException {
if (!inputAccessor[partition].exists()) {
- if (!runFileStream.loadNextBuffer(inputAccessor[partition])) {
+ runFileStream.loadNextBuffer(tmpAccessor);
+ if (!runFileStreamOld.loadNextBuffer(inputAccessor[partition])) {
return TupleStatus.EMPTY;
}
}
@@ -200,8 +208,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
resultAppender.write(writer, true);
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
- + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, "
- + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount()
+ + " results, " + spillCount + " spills, " + runFileStreamOld.getFileCount() + " files, "
+ + runFileStreamOld.getWriteCount() + " spill frames written, " + runFileStreamOld.getReadCount()
+ " spill frames read.");
}
}
@@ -209,7 +217,11 @@ public class MergeJoiner extends AbstractMergeJoiner {
private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException {
// System.err.print("Spill ");
- runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]);
+ runFileStreamOld.addToRunFile(inputAccessor[LEFT_PARTITION]);
+ if (true) {
+ runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]);
+ }
+
processLeftTuple(writer);
// Memory is empty and we can start processing the run file.
@@ -264,7 +276,13 @@ public class MergeJoiner extends AbstractMergeJoiner {
+ bufferManager.getNumTuples() + " tuples memory].");
}
- runFileStream.startRunFile();
+ if (runFilePointer.getFileOffset() > 0) {
+
+ } else {
+ runFilePointer.reset(0, 0);
+ runFileStream.startRunFileWriting();
+ }
+ runFileStreamOld.startRunFileWriting();
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"Memory is full. Freezing the right branch. (memory tuples: " + bufferManager.getNumTuples() + ")");
@@ -275,7 +293,7 @@ public class MergeJoiner extends AbstractMergeJoiner {
private void continueStream(ITupleAccessor accessor) throws HyracksDataException {
// System.err.println("continueStream");
- runFileStream.closeRunFile();
+ runFileStreamOld.closeRunFileReading();
accessor.reset(inputBuffer[LEFT_PARTITION]);
accessor.setTupleId(leftStreamIndex);
if (LOGGER.isLoggable(Level.FINE)) {
@@ -289,20 +307,25 @@ public class MergeJoiner extends AbstractMergeJoiner {
LOGGER.warning("snapshot: " + frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
+ " left, " + joinComparisonCount + " comparisons, " + joinResultCount + " results, ["
+ bufferManager.getNumTuples() + " tuples memory, " + spillCount + " spills, "
- + (runFileStream.getFileCount() - spillFileCount) + " files, "
- + (runFileStream.getWriteCount() - spillWriteCount) + " written, "
- + (runFileStream.getReadCount() - spillReadCount) + " read].");
- spillFileCount = runFileStream.getFileCount();
- spillReadCount = runFileStream.getReadCount();
- spillWriteCount = runFileStream.getWriteCount();
+ + (runFileStreamOld.getFileCount() - spillFileCount) + " files, "
+ + (runFileStreamOld.getWriteCount() - spillWriteCount) + " written, "
+ + (runFileStreamOld.getReadCount() - spillReadCount) + " read].");
+ spillFileCount = runFileStreamOld.getFileCount();
+ spillReadCount = runFileStreamOld.getReadCount();
+ spillWriteCount = runFileStreamOld.getWriteCount();
}
+ runFileStreamOld.flushAndStopRunFile(accessor);
runFileStream.flushAndStopRunFile(accessor);
flushMemory();
if (!status.branch[LEFT_PARTITION].isRunFileReading()) {
leftStreamIndex = accessor.getTupleId();
}
- runFileStream.openRunFile(accessor);
+ runFileStreamOld.startReadingRunFile(accessor);
+
+ runFileStream.resetReadPointer(runFilePointer.getFileOffset());
+ accessor.setTupleId(runFilePointer.getTupleIndex());
+ runFileStream.startReadingRunFile(accessor);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Unfreezing right partition.");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4a3fd56/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index 7e8a8d1..f50c34a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -31,6 +31,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
public class RunFileStream {
@@ -79,7 +80,7 @@ public class RunFileStream {
return writeCount;
}
- public void startRunFile() throws HyracksDataException {
+ public void startRunFileWriting() throws HyracksDataException {
runFileCounter++;
status.setRunFileWriting(true);
@@ -94,6 +95,19 @@ public class RunFileStream {
}
}
+ public void resumeRunFileWriting() throws HyracksDataException {
+ status.setRunFileWriting(true);
+ String prefix = this.getClass().getSimpleName() + '-' + key + '-' + Long.toString(runFileCounter) + '-'
+ + this.toString();
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(prefix);
+ runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+ runFileWriter.open();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("A new run file has been started (key: " + key + ", number: " + runFileCounter + ", file: "
+ + file + ").");
+ }
+ }
+
public void addToRunFile(ITupleAccessor accessor) throws HyracksDataException {
int idx = accessor.getTupleId();
if (!runFileAppender.append(accessor, idx)) {
@@ -105,28 +119,28 @@ public class RunFileStream {
tupleCount++;
}
- public void openRunFile(ITupleAccessor accessor) throws HyracksDataException {
+ public void startReadingRunFile(ITupleAccessor accessor) throws HyracksDataException {
status.setRunFileReading(true);
// Create reader
- runFileReader = runFileWriter.createDeleteOnCloseReader();
+ runFileReader = runFileWriter.createReader();
runFileReader.open();
// Load first frame
loadNextBuffer(accessor);
}
- public void resetReader(ITupleAccessor accessor) throws HyracksDataException {
- if (status.isRunFileWriting()) {
- flushAndStopRunFile(accessor);
- openRunFile(accessor);
- } else {
- runFileReader.reset();
-
- // Load first frame
- loadNextBuffer(accessor);
- }
- }
+// public void resetReader(ITupleAccessor accessor) throws HyracksDataException {
+// if (status.isRunFileWriting()) {
+// flushAndStopRunFile(accessor);
+// startReadingRunFile(accessor);
+// } else {
+// runFileReader.reset();
+//
+// // Load first frame
+// loadNextBuffer(accessor);
+// }
+// }
public boolean loadNextBuffer(ITupleAccessor accessor) throws HyracksDataException {
if (runFileReader.nextFrame(runFileBuffer)) {
@@ -138,6 +152,14 @@ public class RunFileStream {
return false;
}
+ public long getReadPointer() throws HyracksDataException {
+ return runFileReader.getReadPointer();
+ }
+
+ public void resetReadPointer(long fileOffset) throws HyracksDataException {
+ runFileReader.reset(fileOffset);
+ }
+
public void flushAndStopRunFile(ITupleAccessor accessor) throws HyracksDataException {
status.setRunFileWriting(false);
@@ -163,7 +185,7 @@ public class RunFileStream {
}
}
- public void closeRunFile() throws HyracksDataException {
+ public void closeRunFileReading() throws HyracksDataException {
status.setRunFileReading(false);
runFileReader.close();
}