You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:31 UTC
[41/50] [abbrv] asterixdb git commit: Interface clean up.
Interface clean up.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ce593414
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ce593414
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ce593414
Branch: refs/heads/ecarm002/interval_join_merge
Commit: ce5934148227e57eeebb369d57ae64d87b908959
Parents: 19f0997
Author: Preston Carman <pr...@apache.org>
Authored: Thu Sep 29 16:05:18 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Sep 29 16:05:18 2016 -0700
----------------------------------------------------------------------
.../IntervalIndexJoinOperatorDescriptor.java | 5 +--
.../intervalindex/IntervalIndexJoiner.java | 46 ++++++++++----------
...IntervalPartitionJoinOperatorDescriptor.java | 9 ++--
.../dataflow/std/join/AbstractMergeJoiner.java | 38 +++++++++-------
.../hyracks/dataflow/std/join/IMergeJoiner.java | 6 +--
.../std/join/MergeJoinOperatorDescriptor.java | 6 +--
.../hyracks/dataflow/std/join/MergeJoiner.java | 35 +++++++--------
7 files changed, 72 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
index d84fabc..fbddfef 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
@@ -151,7 +151,7 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip
}
try {
state.indexJoiner.setFrame(LEFT_ACTIVITY_ID, buffer);
- state.indexJoiner.processMergeUsingLeftTuple(writer);
+ state.indexJoiner.processLeftFrame(writer);
} finally {
locks.getLock(partition).unlock();
}
@@ -175,8 +175,7 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip
if (state.failed) {
writer.fail();
} else {
- state.indexJoiner.processMergeUsingLeftTuple(writer);
- state.indexJoiner.closeResult(writer);
+ state.indexJoiner.processLeftClose(writer);
writer.close();
}
state.status.branch[LEFT_ACTIVITY_ID].setStageClose();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index a4ad666..bac2f45 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -141,24 +141,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
joinResultCount++;
}
- @Override
- public void closeResult(IFrameWriter writer) throws HyracksDataException {
- resultAppender.write(writer, true);
- activeManager[LEFT_PARTITION].clear();
- activeManager[RIGHT_PARTITION].clear();
- runFileStream[LEFT_PARTITION].close();
- runFileStream[RIGHT_PARTITION].close();
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
- + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount()
- + " files, " + runFileStream[LEFT_PARTITION].getWriteCount() + " written, "
- + runFileStream[LEFT_PARTITION].getReadCount() + " read]. right[" + rightSpillCount + " spills, "
- + runFileStream[RIGHT_PARTITION].getFileCount() + " files, "
- + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, "
- + runFileStream[RIGHT_PARTITION].getReadCount() + " read].");
- }
- }
-
private void flushMemory(int partition) throws HyracksDataException {
activeManager[partition].clear();
}
@@ -209,7 +191,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
}
@Override
- public void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException {
+ public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
TupleStatus leftTs = loadLeftTuple();
TupleStatus rightTs = loadRightTuple();
while (leftTs.isKnown() && checkHasMoreProcessing(leftTs, LEFT_PARTITION, RIGHT_PARTITION)
@@ -234,6 +216,27 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
}
}
+ @Override
+ public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
+ processLeftFrame(writer);
+
+ resultAppender.write(writer, true);
+ activeManager[LEFT_PARTITION].clear();
+ activeManager[RIGHT_PARTITION].clear();
+ runFileStream[LEFT_PARTITION].close();
+ runFileStream[RIGHT_PARTITION].close();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+ + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount()
+ + " files, " + runFileStream[LEFT_PARTITION].getWriteCount() + " written, "
+ + runFileStream[LEFT_PARTITION].getReadCount() + " read]. right[" + rightSpillCount + " spills, "
+ + runFileStream[RIGHT_PARTITION].getFileCount() + " files, "
+ + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, "
+ + runFileStream[RIGHT_PARTITION].getReadCount() + " read].");
+ }
+
+ }
+
private boolean checkHasMoreProcessing(TupleStatus ts, int partition, int joinPartition) {
return ts.isLoaded() || status.branch[partition].isRunFileWriting()
|| (checkHasMoreTuples(joinPartition) && activeManager[partition].hasRecords());
@@ -493,9 +496,4 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
}
}
- @Override
- public void closeInput(int partition) throws HyracksDataException {
- // No changes are required.
- }
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 60a4697..b4965ef 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -67,11 +67,11 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
- public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int k, int[] leftKeys,
- int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
+ public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memoryForJoin, int k,
+ int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
RangeId rangeId) {
super(spec, 2, 1);
- this.memsize = memsize;
+ this.memsize = memoryForJoin;
this.buildKey = leftKeys[0];
this.probeKey = rightKeys[0];
this.k = k;
@@ -161,7 +161,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
state.ipj.buildInit();
LOGGER.setLevel(Level.FINE);
- System.out.println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel());
+ System.out
+ .println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel());
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("IntervalPartitionJoin is starting the build phase with " + state.k
+ " granules repesenting " + state.intervalPartitions + " interval partitions using "
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
index aa065cd..b1b4075 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
@@ -80,6 +80,29 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner {
resultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
}
+ public void setLeftFrame(ByteBuffer buffer) {
+ setFrame(LEFT_PARTITION, buffer);
+ }
+
+ public void setRightFrame(ByteBuffer buffer) {
+ setFrame(RIGHT_PARTITION, buffer);
+ }
+
+ protected TupleStatus loadMemoryTuple(int branch) {
+ TupleStatus loaded;
+ if (inputAccessor[branch] != null && inputAccessor[branch].exists()) {
+ // Still processing frame.
+ int test = inputAccessor[branch].getTupleCount();
+ loaded = TupleStatus.LOADED;
+ } else if (status.branch[branch].hasMore()) {
+ loaded = TupleStatus.UNKNOWN;
+ } else {
+ // No more frames or tuples to process.
+ loaded = TupleStatus.EMPTY;
+ }
+ return loaded;
+ }
+
protected TupleStatus pauseAndLoadRightTuple() {
status.continueRightLoad = true;
locks.getRight(partition).signal();
@@ -99,21 +122,6 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner {
return TupleStatus.LOADED;
}
- protected TupleStatus loadMemoryTuple(int branch) {
- TupleStatus loaded;
- if (inputAccessor[branch] != null && inputAccessor[branch].exists()) {
- // Still processing frame.
- int test = inputAccessor[branch].getTupleCount();
- loaded = TupleStatus.LOADED;
- } else if (status.branch[branch].hasMore()) {
- loaded = TupleStatus.UNKNOWN;
- } else {
- // No more frames or tuples to process.
- loaded = TupleStatus.EMPTY;
- }
- return loaded;
- }
-
@Override
public void setFrame(int branch, ByteBuffer buffer) {
inputBuffer[branch].clear();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
index 4268ec9..051d3e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
@@ -25,12 +25,10 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IMergeJoiner {
- void closeResult(IFrameWriter writer) throws HyracksDataException;
+ void processLeftFrame(IFrameWriter writer) throws HyracksDataException;
- void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException;
+ void processLeftClose(IFrameWriter writer) throws HyracksDataException;
void setFrame(int partition, ByteBuffer buffer);
- void closeInput(int partition) throws HyracksDataException;
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
index cbe1a66..c5f612f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
@@ -151,7 +151,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
}
try {
state.joiner.setFrame(LEFT_ACTIVITY_ID, buffer);
- state.joiner.processMergeUsingLeftTuple(writer);
+ state.joiner.processLeftFrame(writer);
} finally {
locks.getLock(partition).unlock();
}
@@ -175,9 +175,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
if (state.failed) {
writer.fail();
} else {
- state.joiner.closeInput(LEFT_ACTIVITY_ID);
- state.joiner.processMergeUsingLeftTuple(writer);
- state.joiner.closeResult(writer);
+ state.joiner.processLeftClose(writer);
writer.close();
}
state.status.branch[LEFT_ACTIVITY_ID].setStageClose();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index 9c93828..d4195ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -110,17 +110,6 @@ public class MergeJoiner extends AbstractMergeJoiner {
joinResultCount++;
}
- @Override
- public void closeResult(IFrameWriter writer) throws HyracksDataException {
- resultAppender.write(writer, true);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
- + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, "
- + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount()
- + " spill frames read.");
- }
- }
-
private void flushMemory() throws HyracksDataException {
memoryBuffer.clear();
bufferManager.reset();
@@ -176,20 +165,13 @@ public class MergeJoiner extends AbstractMergeJoiner {
return TupleStatus.LOADED;
}
- @Override
- public void closeInput(int partition) throws HyracksDataException {
- if (status.branch[partition].isRunFileWriting()) {
- unfreezeAndContinue(inputAccessor[partition]);
- }
- }
-
/**
* Left
*
* @throws HyracksDataException
*/
@Override
- public void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException {
+ public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
TupleStatus leftTs = loadLeftTuple();
TupleStatus rightTs = loadRightTuple();
while (leftTs.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() || memoryHasTuples())) {
@@ -209,6 +191,21 @@ public class MergeJoiner extends AbstractMergeJoiner {
}
}
+ @Override
+ public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
+ if (status.branch[LEFT_PARTITION].isRunFileWriting()) {
+ unfreezeAndContinue(inputAccessor[LEFT_PARTITION]);
+ }
+ processLeftFrame(writer);
+ resultAppender.write(writer, true);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount
+ + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, "
+ + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount()
+ + " spill frames read.");
+ }
+ }
+
private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException {
// System.err.print("Spill ");