You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:27 UTC
[37/50] [abbrv] asterixdb git commit: Snapshot after workign frame
tuple appender.
Snapshot after workign frame tuple appender.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/23eab43d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/23eab43d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/23eab43d
Branch: refs/heads/ecarm002/interval_join_merge
Commit: 23eab43dda9860030c2065343208a05dcc437486
Parents: fd514a0
Author: Preston Carman <pr...@apache.org>
Authored: Sun Sep 25 09:37:33 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Sun Sep 25 09:37:33 2016 -0700
----------------------------------------------------------------------
.../intervalindex/IntervalIndexJoiner.java | 77 +++++++++++---------
...IntervalPartitionJoinOperatorDescriptor.java | 2 +
.../IntervalPartitionJoiner.java | 15 +++-
.../sort/util/DeletableFrameTupleAppender.java | 1 +
.../util/DeletableFrameTupleAppenderTest.java | 50 +++++++++----
5 files changed, 93 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index d3aaa65..a4ad666 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -19,7 +19,6 @@
package org.apache.asterix.runtime.operators.joins.intervalindex;
import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -62,7 +61,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
private final int[] streamIndex;
private final RunFileStream[] runFileStream;
- private final LinkedList<TuplePointer> buffer = new LinkedList<>();
+// private final LinkedList<TuplePointer> buffer = new LinkedList<>();
private final IIntervalMergeJoinChecker imjc;
@@ -124,6 +123,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
runFileStream[LEFT_PARTITION] = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
runFileStream[RIGHT_PARTITION] = new RunFileStream(ctx, "right", status.branch[RIGHT_PARTITION]);
+ LOGGER.setLevel(Level.FINE);
+ System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel());
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize
+ " frames of memory.");
@@ -246,9 +247,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
long leftStart = IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey);
long rightStart = IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey);
if (leftStart < rightStart) {
+ // Left stream has next tuple, check if right active must be updated first.
return activeManager[RIGHT_PARTITION].hasRecords()
&& activeManager[RIGHT_PARTITION].getTopPoint() < leftStart;
} else {
+ // Right stream has next tuple, check if left active must be update first.
return !(activeManager[LEFT_PARTITION].hasRecords()
&& activeManager[LEFT_PARTITION].getTopPoint() < rightStart);
}
@@ -334,7 +337,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
// Add to active, end point index and buffer.
TuplePointer tp = new TuplePointer();
if (activeManager[LEFT_PARTITION].addTuple(inputAccessor[LEFT_PARTITION], tp)) {
- buffer.add(tp);
+ processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION],
+ inputAccessor[LEFT_PARTITION], true, writer);
+// buffer.add(tp);
} else {
// Spill case
freezeAndSpill();
@@ -348,10 +353,10 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
} while (loadLeftTuple().isLoaded() && loadRightTuple().isLoaded() && !checkToProcessRightTuple());
// Add Results
- if (!buffer.isEmpty()) {
- processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer,
- memoryAccessor[LEFT_PARTITION], true, writer);
- }
+// if (!buffer.isEmpty()) {
+// processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer,
+// memoryAccessor[LEFT_PARTITION], true, writer);
+// }
}
private void processRightTuple(IFrameWriter writer) throws HyracksDataException {
@@ -364,7 +369,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
// Add to active, end point index and buffer.
TuplePointer tp = new TuplePointer();
if (activeManager[RIGHT_PARTITION].addTuple(inputAccessor[RIGHT_PARTITION], tp)) {
- buffer.add(tp);
+ processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION],
+ inputAccessor[RIGHT_PARTITION], false, writer);
+// buffer.add(tp);
} else {
// Spill case
freezeAndSpill();
@@ -378,32 +385,32 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
} while (loadRightTuple().isLoaded() && checkToProcessRightTuple());
// Add Results
- if (!buffer.isEmpty()) {
- processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer,
- memoryAccessor[RIGHT_PARTITION], false, writer);
- }
+// if (!buffer.isEmpty()) {
+// processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer,
+// memoryAccessor[RIGHT_PARTITION], false, writer);
+// }
}
- private void processActiveJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor,
- List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, boolean reversed, IFrameWriter writer)
- throws HyracksDataException {
- for (TuplePointer outerTp : outer) {
- outerAccessor.reset(outerTp);
- for (TuplePointer innerTp : inner) {
- innerAccessor.reset(innerTp);
- if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor,
- innerTp.getTupleIndex(), reversed)) {
- addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(),
- reversed, writer);
- }
- joinComparisonCount++;
- }
- }
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Sweep for " + buffer.size() + " tuples");
- }
- buffer.clear();
- }
+// private void processActiveJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor,
+// List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, boolean reversed, IFrameWriter writer)
+// throws HyracksDataException {
+// for (TuplePointer outerTp : outer) {
+// outerAccessor.reset(outerTp);
+// for (TuplePointer innerTp : inner) {
+// innerAccessor.reset(innerTp);
+// if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor,
+// innerTp.getTupleIndex(), reversed)) {
+// addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(),
+// reversed, writer);
+// }
+// joinComparisonCount++;
+// }
+// }
+// if (LOGGER.isLoggable(Level.FINE)) {
+// LOGGER.fine("Sweep for " + buffer.size() + " tuples");
+// }
+// buffer.clear();
+// }
private void processTupleJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor,
ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter writer) throws HyracksDataException {
@@ -456,6 +463,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor) throws HyracksDataException {
int flushPartition = frozenPartition == LEFT_PARTITION ? RIGHT_PARTITION : LEFT_PARTITION;
+ runFileStream[frozenPartition].flushAndStopRunFile(accessor);
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("snapshot(" + frozenPartition + "): " + frameCounts[RIGHT_PARTITION] + " right, "
+ frameCounts[LEFT_PARTITION] + " left, left[" + bufferManager.getNumTuples(LEFT_PARTITION)
@@ -474,8 +482,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
spillReadCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getReadCount();
spillWriteCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getWriteCount();
}
-
- runFileStream[frozenPartition].flushAndStopRunFile(accessor);
flushMemory(flushPartition);
if ((LEFT_PARTITION == frozenPartition && !status.branch[LEFT_PARTITION].isRunFileReading())
|| (RIGHT_PARTITION == frozenPartition && !status.branch[RIGHT_PARTITION].isRunFileReading())) {
@@ -489,8 +495,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
@Override
public void closeInput(int partition) throws HyracksDataException {
- // TODO Auto-generated method stub
-
+ // No changes are required.
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 6ea1e6f..c7986e6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -160,6 +160,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
state.ipj.initBuild();
+ LOGGER.setLevel(Level.FINE);
+ System.out.println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel());
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("IntervalPartitionJoin is starting the build phase with " + state.k
+ " granules repesenting " + state.intervalPartitions + " interval partitions using "
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index e943a48..31e200b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -100,6 +100,8 @@ public class IntervalPartitionJoiner {
private long spillCount = 0;
private long spillReadCount = 0;
private long spillWriteCount = 0;
+ private long buildSize;
+ private int tmp = -1;
public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
@@ -129,6 +131,8 @@ public class IntervalPartitionJoiner {
public void initBuild() throws HyracksDataException {
buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
memForJoin * ctx.getInitialFrameSize());
+ System.err.println("k: " + k);
+ buildSize = 0;
}
private IPartitionedMemoryConstrain getPartitionMemoryConstrain() {
@@ -139,14 +143,23 @@ public class IntervalPartitionJoiner {
accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
+ int pid;
for (int i = 0; i < tupleCount; ++i) {
- int pid = buildHpc.partition(accessorBuild, i, k);
+ pid = buildHpc.partition(accessorBuild, i, k);
+
+ if (tmp != pid) {
+ System.err.println("buildSize: " + buildSize + " pid: " + pid + " k: " + k + " pair: " + IntervalPartitionUtil.getIntervalPartition(pid, k));
+ tmp = pid;
+ }
processTuple(i, pid);
ipjd.buildIncrementCount(pid);
+ buildSize++;
}
}
public void closeBuild() throws HyracksDataException {
+ System.err.println("buildSize: " + buildSize);
+
int inMemoryPartitions = 0;
int totalBuildPartitions = 0;
flushAndClearBuildSpilledPartition();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
index 8cae721..d242daa 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
@@ -181,6 +181,7 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc
this.array = buffer.array();
setIndexCount(0);
setDeletedSpace(0);
+ setNextIndex(0);
setTupleAppend(0);
resetCounts();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
index 7686540..af3cdfc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
@@ -37,15 +37,19 @@ import org.apache.hyracks.util.string.UTF8StringUtil;
import org.junit.Before;
import org.junit.Test;
+/**
+ * @see org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender
+ */
public class DeletableFrameTupleAppenderTest {
+ private static final int META_DATA_SIZE = 4 + 4 + 4 + 4;
+ private static final int SLOT_SIZE = 4 + 4;
+ private static final char TEST_CH = 'x';
+
DeletableFrameTupleAppender appender;
- ISerializerDeserializer[] fields = new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE,
- new UTF8StringSerializerDeserializer(),
- };
+ ISerializerDeserializer[] fields = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+ new UTF8StringSerializerDeserializer(), };
RecordDescriptor recordDescriptor = new RecordDescriptor(fields);
ArrayTupleBuilder builder = new ArrayTupleBuilder(recordDescriptor.getFieldCount());
- static final char TEST_CH = 'x';
int cap = 256;
@@ -60,26 +64,42 @@ public class DeletableFrameTupleAppenderTest {
appender.clear(buffer);
assertTrue(appender.getBuffer() == buffer);
assertTrue(appender.getTupleCount() == 0);
- assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4);
+ assertTrue(appender.getTotalFreeSpace() == cap - META_DATA_SIZE);
+ assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE);
}
ByteBuffer makeAFrame(int capacity, int count, int deletedBytes) throws HyracksDataException {
ByteBuffer buffer = ByteBuffer.allocate(capacity);
int metaOffset = capacity - 4;
+ buffer.putInt(metaOffset, count);
+ metaOffset -= 4;
buffer.putInt(metaOffset, deletedBytes);
+ // next index
metaOffset -= 4;
buffer.putInt(metaOffset, count);
+ // append slot
+ metaOffset -= 4;
+ int appendOffset = metaOffset;
+ buffer.putInt(metaOffset, 0);
metaOffset -= 4;
+
+ int start = 0;
for (int i = 0; i < count; i++, metaOffset -= 4) {
makeARecord(builder, i);
for (int x = 0; x < builder.getFieldEndOffsets().length; x++) {
buffer.putInt(builder.getFieldEndOffsets()[x]);
}
buffer.put(builder.getByteArray(), 0, builder.getSize());
- assert (metaOffset > buffer.position());
+
+ // Add slot information
+ buffer.putInt(metaOffset, start);
+ metaOffset -= 4;
buffer.putInt(metaOffset, buffer.position());
+ start = buffer.position();
+ assert (metaOffset > buffer.position());
}
+ buffer.putInt(appendOffset, start);
return buffer;
}
@@ -110,16 +130,16 @@ public class DeletableFrameTupleAppenderTest {
appender.reset(buffer);
assertTrue(appender.getBuffer() == buffer);
assertTrue(appender.getTupleCount() == 0);
- assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4);
+ assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE);
- int count = 10;
+ int count = 8;
int deleted = 7;
buffer = makeAFrame(cap, count, deleted);
int pos = buffer.position();
appender.reset(buffer);
assertTrue(appender.getBuffer() == buffer);
assertTrue(appender.getTupleCount() == count);
- assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4 - count * 4 - pos);
+ assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE - count * SLOT_SIZE - pos);
assertTrue(appender.getTotalFreeSpace() == appender.getContiguousFreeSpace() + deleted);
int dataOffset = 0;
@@ -130,7 +150,7 @@ public class DeletableFrameTupleAppenderTest {
@Test
public void testAppend() throws Exception {
- int count = 10;
+ int count = 8;
ByteBuffer bufferRead = makeAFrame(cap, count, 0);
DeletableFrameTupleAppender accessor = new DeletableFrameTupleAppender(recordDescriptor);
accessor.reset(bufferRead);
@@ -146,7 +166,7 @@ public class DeletableFrameTupleAppenderTest {
@Test
public void testDelete() throws Exception {
- int count = 10;
+ int count = 8;
int deleteSpace = 0;
ByteBuffer buffer = makeAFrame(cap, count, deleteSpace);
appender.reset(buffer);
@@ -165,7 +185,7 @@ public class DeletableFrameTupleAppenderTest {
public void testResetAfterDelete() throws Exception {
testDelete();
appender.reset(appender.getBuffer());
- assertEquals(cap - appender.getTupleCount() * 4 - 4 - 4, appender.getTotalFreeSpace());
+ assertEquals(cap - appender.getTupleCount() * SLOT_SIZE - META_DATA_SIZE, appender.getTotalFreeSpace());
}
@@ -187,7 +207,7 @@ public class DeletableFrameTupleAppenderTest {
@Test
public void testAppendAndDelete() throws Exception {
int cap = 1024;
- int count = 10;
+ int count = 8;
int deleteSpace = 0;
ByteBuffer buffer = makeAFrame(cap, count, deleteSpace);
int dataOffset = buffer.position();
@@ -221,7 +241,7 @@ public class DeletableFrameTupleAppenderTest {
@Test
public void testReOrganizeBuffer() throws Exception {
- int count = 10;
+ int count = 8;
testDelete();
appender.reOrganizeBuffer();
ByteBuffer bufferRead = makeAFrame(cap, count, 0);