You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:30 UTC
[40/50] [abbrv] asterixdb git commit: updated partition algorithm
with build only side memory
updated partition algorithm with build only side memory
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/19f0997f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/19f0997f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/19f0997f
Branch: refs/heads/ecarm002/interval_join_merge
Commit: 19f0997ff0a758e5193e531b220ea7c7e3424a43
Parents: f6dba46
Author: Preston Carman <pr...@apache.org>
Authored: Wed Sep 28 16:16:05 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Wed Sep 28 16:16:05 2016 -0700
----------------------------------------------------------------------
...IntervalPartitionJoinOperatorDescriptor.java | 12 +-
.../IntervalPartitionJoiner.java | 273 +++++++++++--------
.../dataflow/common/io/RunFileReader.java | 13 +
.../dataflow/std/join/RunFileStream.java | 8 +
4 files changed, 184 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19f0997f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index c7986e6..60a4697 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -159,7 +159,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
- state.ipj.initBuild();
+ state.ipj.buildInit();
LOGGER.setLevel(Level.FINE);
System.out.println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel());
if (LOGGER.isLoggable(Level.FINE)) {
@@ -171,13 +171,13 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.ipj.build(buffer);
+ state.ipj.buildStep(buffer);
}
@Override
public void close() throws HyracksDataException {
if (!failure) {
- state.ipj.closeBuild();
+ state.ipj.buildClose();
ctx.setStateObject(state);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("IntervalPartitionJoin closed its build phase");
@@ -216,7 +216,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
- state.ipj.initProbe();
+ state.ipj.probeInit();
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("IntervalPartitionJoin is starting the probe phase.");
@@ -225,7 +225,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.ipj.probe(buffer, writer);
+ state.ipj.probeStep(buffer, writer);
}
@Override
@@ -235,7 +235,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
@Override
public void close() throws HyracksDataException {
- state.ipj.closeProbe(writer);
+ state.ipj.probeClose(writer);
state.ipj.joinSpilledPartitions(writer);
state.ipj.closeAndDeleteRunFiles();
writer.close();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19f0997f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index 31e200b..9c5a872 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -31,6 +31,8 @@ import java.util.logging.Logger;
import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -47,8 +49,10 @@ import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManage
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
/**
- * This class mainly applies one level of HHJ on a pair of
- * relations. It is always called by the descriptor.
+ * The Interval Partition Join runs in three stages: build, probe-in-memory, probe-spill.
+ * build: Saves all build partitions either to memory or disk.
+ * probe-in-memory: Joins all in memory partitions and saves the necessary partitions to disk.
+ * probe-spill: Spilled build partitions are loaded into memory and joined with all probe remaining partitions.
*/
public class IntervalPartitionJoiner {
@@ -59,9 +63,6 @@ public class IntervalPartitionJoiner {
PROBE
}
- // Used for special probe BigObject which can not be held into the Join memory
- private FrameTupleAppender bigProbeFrameAppender;
-
private IHyracksTaskContext ctx;
private final String buildRelName;
@@ -76,13 +77,12 @@ public class IntervalPartitionJoiner {
private RunFileWriter[] buildRFWriters; //writing spilled build partitions
private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
- private final int memForJoin;
+ private final int buildMemory;
private final int k;
private final int numOfPartitions;
private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions
private VPartitionTupleBufferManager buildBufferManager;
- private VPartitionTupleBufferManager probeBufferManager;
private final FrameTupleAccessor accessorBuild;
private final FrameTupleAccessor accessorProbe;
@@ -97,17 +97,23 @@ public class IntervalPartitionJoiner {
private long joinComparisonCount = 0;
private long joinResultCount = 0;
- private long spillCount = 0;
private long spillReadCount = 0;
private long spillWriteCount = 0;
private long buildSize;
+ private long probeSize;
private int tmp = -1;
+ private RunFileWriter probeRunFileWriter = null;
+ private final IFrameTupleAppender probeRunFileAppender;
+ private int probeRunFilePid = -1;
+
public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
- RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) {
+ RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc)
+ throws HyracksDataException {
this.ctx = ctx;
- this.memForJoin = memForJoin;
+ // TODO fix available memory size
+ this.buildMemory = memForJoin;
this.k = k;
this.buildRd = buildRd;
this.probeRd = probeRd;
@@ -125,21 +131,19 @@ public class IntervalPartitionJoiner {
this.accessorBuild = new FrameTupleAccessor(buildRd);
this.accessorProbe = new FrameTupleAccessor(probeRd);
+ reloadBuffer = new VSizeFrame(ctx);
+ probeRunFileAppender = new FrameTupleAppender(new VSizeFrame(ctx));
ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions);
}
- public void initBuild() throws HyracksDataException {
- buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
- memForJoin * ctx.getInitialFrameSize());
+ public void buildInit() throws HyracksDataException {
+ buildBufferManager = new VPartitionTupleBufferManager(ctx, VPartitionTupleBufferManager.NO_CONSTRAIN,
+ numOfPartitions, buildMemory * ctx.getInitialFrameSize());
System.err.println("k: " + k);
buildSize = 0;
}
- private IPartitionedMemoryConstrain getPartitionMemoryConstrain() {
- return VPartitionTupleBufferManager.NO_CONSTRAIN;
- }
-
- public void build(ByteBuffer buffer) throws HyracksDataException {
+ public void buildStep(ByteBuffer buffer) throws HyracksDataException {
accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
@@ -148,7 +152,8 @@ public class IntervalPartitionJoiner {
pid = buildHpc.partition(accessorBuild, i, k);
if (tmp != pid) {
- System.err.println("buildSize: " + buildSize + " pid: " + pid + " k: " + k + " pair: " + IntervalPartitionUtil.getIntervalPartition(pid, k));
+ System.err.println("buildSize: " + buildSize + " pid: " + pid + " k: " + k + " pair: "
+ + IntervalPartitionUtil.getIntervalPartition(pid, k));
tmp = pid;
}
processTuple(i, pid);
@@ -157,7 +162,7 @@ public class IntervalPartitionJoiner {
}
}
- public void closeBuild() throws HyracksDataException {
+ public void buildClose() throws HyracksDataException {
System.err.println("buildSize: " + buildSize);
int inMemoryPartitions = 0;
@@ -165,7 +170,7 @@ public class IntervalPartitionJoiner {
flushAndClearBuildSpilledPartition();
// Trying to bring back as many spilled partitions as possible, making them resident
- bringBackSpilledPartitionIfHasMoreMemory();
+ bringBackSpilledPartitionIfHasMoreMemory(false);
// Update build partition join map based on partitions with actual data.
for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) {
@@ -239,7 +244,6 @@ public class IntervalPartitionJoiner {
private void spillPartition(int pid) throws HyracksDataException {
RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
- spillCount++;
spillWriteCount += buildBufferManager.getNumFrames(pid);
buildBufferManager.flushPartition(pid, writer);
buildBufferManager.clearPartition(pid);
@@ -274,9 +278,9 @@ public class IntervalPartitionJoiner {
for (int pid = 0; pid < numOfPartitions; ++pid) {
if (buildBufferManager.getNumTuples(pid) > 0) {
buildBufferManager.clearPartition(pid);
- ipjd.buildRemoveFromJoin(pid);
}
}
+ ipjd.buildClearMemory();
}
private void flushAndClearBuildSpilledPartition() throws HyracksDataException {
@@ -291,63 +295,88 @@ public class IntervalPartitionJoiner {
}
}
- private void flushAndClearProbeSpilledPartition() throws HyracksDataException {
- for (int pid = 0; pid < numOfPartitions; ++pid) {
- if (probeBufferManager.getNumTuples(pid) > 0) {
- spillWriteCount += probeBufferManager.getNumFrames(pid);
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
- probeBufferManager.flushPartition(pid, runFileWriter);
- probeBufferManager.clearPartition(pid);
- probeRFWriters[pid].close();
- }
+ private void flushProbeSpilledPartition() throws HyracksDataException {
+ if (probeRunFileWriter != null) {
+ // flush previous runFile
+ probeRunFileAppender.write(probeRunFileWriter, true);
+ probeRunFileWriter.close();
+ spillWriteCount++;
}
}
- private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException {
- // we need number of |spilledPartitions| buffers to store the probe data
- int freeSpace = (memForJoin - ipjd.buildGetSpilledCount()) * ctx.getInitialFrameSize();
+ private void bringBackSpilledPartitionIfHasMoreMemory(boolean partitalLoad) throws HyracksDataException {
+ int freeFrames = buildMemory;
for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
- freeSpace -= buildBufferManager.getPhysicalSize(i);
+ freeFrames -= buildBufferManager.getNumFrames(i);
}
int pid = 0;
- while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) {
+ while ((pid = selectPartitionsToReload(freeFrames, pid, partitalLoad)) >= 0 && freeFrames > 0) {
+ if (pid == 225) {
+ int i = 0;
+ }
if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
return;
}
- freeSpace -= buildBufferManager.getPhysicalSize(pid);
+ freeFrames -= buildBufferManager.getNumFrames(pid);
}
}
+ int buildParitialLoadPid = -1;
+ int buildParitialNextTid = -1;
+ long buildParitialResetReader = -1;
+
private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
- RunFileReader r = wr.createDeleteOnCloseReader();
+ if (pid == 225) {
+ int i = 0;
+ }
+ RunFileReader r = wr.createReader();
r.open();
- if (reloadBuffer == null) {
- reloadBuffer = new VSizeFrame(ctx);
+ if (buildParitialLoadPid == pid && buildParitialResetReader > 0) {
+ r.reset(buildParitialResetReader);
}
+ int framesLoaded = 0;
while (r.nextFrame(reloadBuffer)) {
+ framesLoaded++;
accessorBuild.reset(reloadBuffer.getBuffer());
- for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+ spillReadCount++;
+ for (int tid = buildParitialNextTid > 0 ? buildParitialNextTid : 0; tid < accessorBuild
+ .getTupleCount(); tid++) {
if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames
- buildBufferManager.clearPartition(pid);
+ // for some reason (e.g. due to fragmentation) if the inserting failed
+ // we need to start this partition from this location on the next round.
+ buildParitialLoadPid = pid;
+ buildParitialNextTid = tid;
+ buildParitialResetReader = r.getReadPointer();
+ ipjd.buildLoad(pid);
+ createInMemoryJoiner(pid);
r.close();
return false;
}
}
- spillReadCount++;
+ }
+ if (framesLoaded == 0) {
+ int t = 0;
}
- r.close();
ipjd.buildLoad(pid);
+ createInMemoryJoiner(pid);
+ r.close();
buildRFWriters[pid] = null;
+ buildParitialLoadPid = -1;
+ buildParitialNextTid = -1;
+ buildParitialResetReader = -1;
return true;
}
- private int selectPartitionsToReload(int freeSpace, int pid) {
- for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
+ private int selectPartitionsToReload(int freeFrames, int pid, boolean partitalLoad) {
+ int freeSpace = freeFrames * ctx.getInitialFrameSize();
+ if (freeSpace > 0 && buildParitialLoadPid > 0 && buildParitialResetReader > 0) {
+ return buildParitialLoadPid;
+ }
+ for (int id = ipjd.buildNextSpilled(pid); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
assert buildRFWriters[id].getFileSize() > 0 : "How come a spilled partition have size 0?";
- if (freeSpace >= buildRFWriters[id].getFileSize()) {
+ if (partitalLoad || freeSpace >= buildRFWriters[id].getFileSize()) {
return id;
}
}
@@ -366,20 +395,24 @@ public class IntervalPartitionJoiner {
inMemJoiner[pid] = null;
}
- public void initProbe() throws HyracksDataException {
- int probeMemory = numOfPartitions > memForJoin ? memForJoin : numOfPartitions;
- probeBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
- (probeMemory) * ctx.getInitialFrameSize());
-
+ public void probeInit() throws HyracksDataException {
probeRFWriters = new RunFileWriter[numOfPartitions];
+ probeSize = 0;
}
- public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+ public void probeStep(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
accessorProbe.reset(buffer);
int tupleCount = accessorProbe.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
int pid = probeHpc.partition(accessorProbe, i, k);
+
+ if (tmp != pid) {
+ System.err.println("probeSize: " + probeSize + " pid: " + pid + " k: " + k + " pair: "
+ + IntervalPartitionUtil.getIntervalPartition(pid, k));
+ tmp = pid;
+ }
+
if (!ipjd.hasProbeJoinMap(pid)) {
// Set probe join map
ipjd.setProbeJoinMap(pid,
@@ -390,23 +423,7 @@ public class IntervalPartitionJoiner {
if (!ipjd.isProbeJoinMapEmpty(pid)) {
if (ipjd.probeHasSpilled(pid)) {
// pid is Spilled
- while (!probeBufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
- int victim = pid;
- if (probeBufferManager.getNumTuples(pid) == 0) {
- // current pid is empty, choose the biggest one
- victim = selectLargestSpilledPartition();
- }
- if (victim < 0) {
- // current tuple is too big for all the free space
- flushBigProbeObjectToDisk(pid, accessorProbe, i);
- break;
- }
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
- spillCount++;
- spillWriteCount += probeBufferManager.getNumFrames(pid);
- probeBufferManager.flushPartition(victim, runFileWriter);
- probeBufferManager.clearPartition(victim);
- }
+ probeSpillTuple(accessorProbe, i, pid);
}
for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) {
// pid has join partitions that are Resident
@@ -417,33 +434,43 @@ public class IntervalPartitionJoiner {
}
}
ipjd.probeIncrementCount(pid);
+ probeSize++;
}
}
- public void closeProbe(IFrameWriter writer) throws HyracksDataException {
- // We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+ /**
+ * Closes the probe process.
+ * We do NOT join the spilled partitions here, use {@link joinSpilledPartitions}.
+ *
+ * @param writer
+ * @throws HyracksDataException
+ */
+ public void probeClose(IFrameWriter writer) throws HyracksDataException {
+ System.err.println("probeSize: " + probeSize);
+
for (int i = 0; i < inMemJoiner.length; ++i) {
if (inMemJoiner[i] != null) {
closeInMemoryJoiner(i, writer);
ipjd.buildLogJoined(i);
+ ipjd.buildRemoveFromJoin(i);
}
}
clearBuildMemory();
- flushAndClearProbeSpilledPartition();
- probeBufferManager.close();
- probeBufferManager = null;
+ flushProbeSpilledPartition();
}
- private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
+ private void probeSpillTuple(IFrameTupleAccessor accessorProbe, int probeTupleIndex, int pid)
throws HyracksDataException {
- if (bigProbeFrameAppender == null) {
- bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+ if (pid != probeRunFilePid) {
+ flushProbeSpilledPartition();
+ probeRunFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+ probeRunFilePid = pid;
}
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
- if (!bigProbeFrameAppender.append(accessorProbe, i)) {
- throw new HyracksDataException("The given tuple is too big");
+ if (!probeRunFileAppender.append(accessorProbe, probeTupleIndex)) {
+ probeRunFileAppender.write(probeRunFileWriter, true);
+ probeRunFileAppender.append(accessorProbe, probeTupleIndex);
+ spillWriteCount++;
}
- bigProbeFrameAppender.write(runFileWriter, true);
}
public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
@@ -456,42 +483,28 @@ public class IntervalPartitionJoiner {
public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException {
LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap;
- if (reloadBuffer == null) {
- reloadBuffer = new VSizeFrame(ctx);
- }
- HashSet<Integer> inMemory = new HashSet<>();
while (ipjd.buildGetSpilledCount() > 0) {
// Load back spilled build partitions.
// TODO only load partition required for spill join. Consider both sides.
- bringBackSpilledPartitionIfHasMoreMemory();
-
- probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
+ bringBackSpilledPartitionIfHasMoreMemory(true);
// Create in memory joiners.
- for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
- .buildNextInMemoryWithResults(pid + 1)) {
- createInMemoryJoiner(pid);
- inMemory.add(pid);
- }
+ // for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
+ // .buildNextInMemoryWithResults(pid + 1)) {
+ // createInMemoryJoiner(pid);
+ // }
+
+ probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
// Join all build partitions with disk probe partitions.
for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) {
- if (ipjd.probeGetCount(entry.getKey()) > 0 && probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
- RunFileReader pReader = getProbeRFReader(entry.getKey());
- pReader.open();
- while (pReader.nextFrame(reloadBuffer)) {
- accessorProbe.reset(reloadBuffer.getBuffer());
- for (int i = 0; i < accessorProbe.getTupleCount(); ++i) {
- // Tuple has potential match from build phase
- for (Integer j : probeInMemoryJoinMap.get(entry.getKey())) {
- // j has join partitions that are Resident
- if (inMemJoiner[j] != null) {
- inMemJoiner[j].join(accessorProbe, i, writer);
- }
- }
- }
- }
- pReader.close();
+ if (entry.getKey() == 221) {
+ int t = 0;
+ }
+ System.err.println(" join pid: " + entry.getKey() + " with : " + probeInMemoryJoinMap);
+
+ if (ipjd.probeGetCount(entry.getKey()) > 0 && !probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
+ joinSpilledProbeWithBuildMemory(writer, probeInMemoryJoinMap, entry.getKey());
}
}
@@ -499,13 +512,38 @@ public class IntervalPartitionJoiner {
for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
.buildNextInMemoryWithResults(pid + 1)) {
closeInMemoryJoiner(pid, writer);
- ipjd.buildLogJoined(pid);
+ if (pid != buildParitialLoadPid) {
+ ipjd.buildLogJoined(pid);
+ ipjd.buildRemoveFromJoin(pid);
+ } else {
+ int t = 0;
+ }
}
- inMemory.clear();
clearBuildMemory();
}
}
+ private void joinSpilledProbeWithBuildMemory(IFrameWriter writer,
+ LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap, int probePid)
+ throws HyracksDataException {
+ RunFileReader pReader = getProbeRFReader(probePid);
+ pReader.open();
+ while (pReader.nextFrame(reloadBuffer)) {
+ accessorProbe.reset(reloadBuffer.getBuffer());
+ spillReadCount++;
+ for (int i = 0; i < accessorProbe.getTupleCount(); ++i) {
+ // Tuple has potential match from build phase
+ for (Integer j : probeInMemoryJoinMap.get(probePid)) {
+ // j has join partitions that are Resident
+ if (inMemJoiner[j] != null) {
+ inMemJoiner[j].join(accessorProbe, i, writer);
+ }
+ }
+ }
+ }
+ pReader.close();
+ }
+
class IntervalPartitionJoinData {
private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap;
@@ -555,6 +593,10 @@ public class IntervalPartitionJoiner {
}
}
+ public void buildClearMemory() {
+ buildInMemoryStatus.clear();
+ }
+
public void buildIncrementCount(int pid) {
buildInMemoryStatus.set(pid);
buildPSizeInTups[pid]++;
@@ -676,8 +718,7 @@ public class IntervalPartitionJoiner {
}
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("IntervalPartitionJoiner statitics: " + joinComparisonCount + " comparisons, "
- + joinResultCount + " results, " + spillCount + " spills, " + spillWriteCount
- + " spill frames written, " + spillReadCount + " spill frames read.");
+ + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19f0997f/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index f68a49c..ae2f0b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -32,6 +32,7 @@ public class RunFileReader implements IFrameReader {
private IFileHandle handle;
private final IIOManager ioManager;
private final long size;
+ private long readPreviousPtr;
private long readPtr;
private boolean deleteAfterClose;
@@ -46,6 +47,7 @@ public class RunFileReader implements IFrameReader {
public void open() throws HyracksDataException {
handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_ONLY, null);
readPtr = 0;
+ readPreviousPtr = 0;
}
@Override
@@ -53,6 +55,7 @@ public class RunFileReader implements IFrameReader {
if (readPtr >= size) {
return false;
}
+ readPreviousPtr = readPtr;
frame.reset();
int readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer());
@@ -79,6 +82,12 @@ public class RunFileReader implements IFrameReader {
public void reset() throws HyracksDataException {
readPtr = 0;
+ readPreviousPtr = readPtr;
+ }
+
+ public void reset(long pointer) throws HyracksDataException {
+ readPtr = pointer;
+ readPreviousPtr = readPtr;
}
@Override
@@ -92,4 +101,8 @@ public class RunFileReader implements IFrameReader {
public long getFileSize() {
return size;
}
+
+ public long getReadPointer() {
+ return readPreviousPtr;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19f0997f/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index 2513b1b..7e8a8d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -50,6 +50,14 @@ public class RunFileStream {
private long writeCount = 0;
private long tupleCount = 0;
+ /**
+ * The RunFileSream uses two frames to buffer read and write operations.
+ *
+ * @param ctx
+ * @param key
+ * @param status
+ * @throws HyracksDataException
+ */
public RunFileStream(IHyracksTaskContext ctx, String key, IRunFileStreamStatus status) throws HyracksDataException {
this.ctx = ctx;
this.key = key;