You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:27 UTC

[37/50] [abbrv] asterixdb git commit: Snapshot after workign frame tuple appender.

Snapshot after workign frame tuple appender.


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/23eab43d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/23eab43d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/23eab43d

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 23eab43dda9860030c2065343208a05dcc437486
Parents: fd514a0
Author: Preston Carman <pr...@apache.org>
Authored: Sun Sep 25 09:37:33 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Sun Sep 25 09:37:33 2016 -0700

----------------------------------------------------------------------
 .../intervalindex/IntervalIndexJoiner.java      | 77 +++++++++++---------
 ...IntervalPartitionJoinOperatorDescriptor.java |  2 +
 .../IntervalPartitionJoiner.java                | 15 +++-
 .../sort/util/DeletableFrameTupleAppender.java  |  1 +
 .../util/DeletableFrameTupleAppenderTest.java   | 50 +++++++++----
 5 files changed, 93 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index d3aaa65..a4ad666 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.runtime.operators.joins.intervalindex;
 
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -62,7 +61,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
     private final int[] streamIndex;
     private final RunFileStream[] runFileStream;
 
-    private final LinkedList<TuplePointer> buffer = new LinkedList<>();
+//    private final LinkedList<TuplePointer> buffer = new LinkedList<>();
 
     private final IIntervalMergeJoinChecker imjc;
 
@@ -124,6 +123,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         runFileStream[LEFT_PARTITION] = new RunFileStream(ctx, "left", status.branch[LEFT_PARTITION]);
         runFileStream[RIGHT_PARTITION] = new RunFileStream(ctx, "right", status.branch[RIGHT_PARTITION]);
 
+        LOGGER.setLevel(Level.FINE);
+        System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel());
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize
                     + " frames of memory.");
@@ -246,9 +247,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         long leftStart = IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey);
         long rightStart = IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey);
         if (leftStart < rightStart) {
+            // Left stream has next tuple, check if right active must be updated first.
             return activeManager[RIGHT_PARTITION].hasRecords()
                     && activeManager[RIGHT_PARTITION].getTopPoint() < leftStart;
         } else {
+            // Right stream has next tuple, check if left active must be update first.
             return !(activeManager[LEFT_PARTITION].hasRecords()
                     && activeManager[LEFT_PARTITION].getTopPoint() < rightStart);
         }
@@ -334,7 +337,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
                 // Add to active, end point index and buffer.
                 TuplePointer tp = new TuplePointer();
                 if (activeManager[LEFT_PARTITION].addTuple(inputAccessor[LEFT_PARTITION], tp)) {
-                    buffer.add(tp);
+                    processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION],
+                            inputAccessor[LEFT_PARTITION], true, writer);
+//                    buffer.add(tp);
                 } else {
                     // Spill case
                     freezeAndSpill();
@@ -348,10 +353,10 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         } while (loadLeftTuple().isLoaded() && loadRightTuple().isLoaded() && !checkToProcessRightTuple());
 
         // Add Results
-        if (!buffer.isEmpty()) {
-            processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer,
-                    memoryAccessor[LEFT_PARTITION], true, writer);
-        }
+//        if (!buffer.isEmpty()) {
+//            processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer,
+//                    memoryAccessor[LEFT_PARTITION], true, writer);
+//        }
     }
 
     private void processRightTuple(IFrameWriter writer) throws HyracksDataException {
@@ -364,7 +369,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
                 // Add to active, end point index and buffer.
                 TuplePointer tp = new TuplePointer();
                 if (activeManager[RIGHT_PARTITION].addTuple(inputAccessor[RIGHT_PARTITION], tp)) {
-                    buffer.add(tp);
+                    processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION],
+                            inputAccessor[RIGHT_PARTITION], false, writer);
+//                    buffer.add(tp);
                 } else {
                     // Spill case
                     freezeAndSpill();
@@ -378,32 +385,32 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         } while (loadRightTuple().isLoaded() && checkToProcessRightTuple());
 
         // Add Results
-        if (!buffer.isEmpty()) {
-            processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer,
-                    memoryAccessor[RIGHT_PARTITION], false, writer);
-        }
+//        if (!buffer.isEmpty()) {
+//            processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer,
+//                    memoryAccessor[RIGHT_PARTITION], false, writer);
+//        }
     }
 
-    private void processActiveJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor,
-            List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, boolean reversed, IFrameWriter writer)
-            throws HyracksDataException {
-        for (TuplePointer outerTp : outer) {
-            outerAccessor.reset(outerTp);
-            for (TuplePointer innerTp : inner) {
-                innerAccessor.reset(innerTp);
-                if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor,
-                        innerTp.getTupleIndex(), reversed)) {
-                    addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(),
-                            reversed, writer);
-                }
-                joinComparisonCount++;
-            }
-        }
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Sweep for " + buffer.size() + " tuples");
-        }
-        buffer.clear();
-    }
+//    private void processActiveJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor,
+//            List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, boolean reversed, IFrameWriter writer)
+//            throws HyracksDataException {
+//        for (TuplePointer outerTp : outer) {
+//            outerAccessor.reset(outerTp);
+//            for (TuplePointer innerTp : inner) {
+//                innerAccessor.reset(innerTp);
+//                if (imjc.checkToSaveInResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor,
+//                        innerTp.getTupleIndex(), reversed)) {
+//                    addToResult(outerAccessor, outerTp.getTupleIndex(), innerAccessor, innerTp.getTupleIndex(),
+//                            reversed, writer);
+//                }
+//                joinComparisonCount++;
+//            }
+//        }
+//        if (LOGGER.isLoggable(Level.FINE)) {
+//            LOGGER.fine("Sweep for " + buffer.size() + " tuples");
+//        }
+//        buffer.clear();
+//    }
 
     private void processTupleJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor,
             ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter writer) throws HyracksDataException {
@@ -456,6 +463,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
 
     private void unfreezeAndContinue(int frozenPartition, ITupleAccessor accessor) throws HyracksDataException {
         int flushPartition = frozenPartition == LEFT_PARTITION ? RIGHT_PARTITION : LEFT_PARTITION;
+        runFileStream[frozenPartition].flushAndStopRunFile(accessor);
         if (LOGGER.isLoggable(Level.WARNING)) {
             LOGGER.warning("snapshot(" + frozenPartition + "): " + frameCounts[RIGHT_PARTITION] + " right, "
                     + frameCounts[LEFT_PARTITION] + " left, left[" + bufferManager.getNumTuples(LEFT_PARTITION)
@@ -474,8 +482,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
             spillReadCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getReadCount();
             spillWriteCount[RIGHT_PARTITION] = runFileStream[RIGHT_PARTITION].getWriteCount();
         }
-
-        runFileStream[frozenPartition].flushAndStopRunFile(accessor);
         flushMemory(flushPartition);
         if ((LEFT_PARTITION == frozenPartition && !status.branch[LEFT_PARTITION].isRunFileReading())
                 || (RIGHT_PARTITION == frozenPartition && !status.branch[RIGHT_PARTITION].isRunFileReading())) {
@@ -489,8 +495,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
 
     @Override
     public void closeInput(int partition) throws HyracksDataException {
-        // TODO Auto-generated method stub
-
+        // No changes are required.
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 6ea1e6f..c7986e6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -160,6 +160,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
                             BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
 
                     state.ipj.initBuild();
+                    LOGGER.setLevel(Level.FINE);
+                    System.out.println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel());
                     if (LOGGER.isLoggable(Level.FINE)) {
                         LOGGER.fine("IntervalPartitionJoin is starting the build phase with " + state.k
                                 + " granules repesenting " + state.intervalPartitions + " interval partitions using "

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index e943a48..31e200b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -100,6 +100,8 @@ public class IntervalPartitionJoiner {
     private long spillCount = 0;
     private long spillReadCount = 0;
     private long spillWriteCount = 0;
+    private long buildSize;
+    private int tmp = -1;
 
     public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
             String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
@@ -129,6 +131,8 @@ public class IntervalPartitionJoiner {
     public void initBuild() throws HyracksDataException {
         buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
                 memForJoin * ctx.getInitialFrameSize());
+        System.err.println("k: " + k);
+        buildSize = 0;
     }
 
     private IPartitionedMemoryConstrain getPartitionMemoryConstrain() {
@@ -139,14 +143,23 @@ public class IntervalPartitionJoiner {
         accessorBuild.reset(buffer);
         int tupleCount = accessorBuild.getTupleCount();
 
+        int pid;
         for (int i = 0; i < tupleCount; ++i) {
-            int pid = buildHpc.partition(accessorBuild, i, k);
+            pid = buildHpc.partition(accessorBuild, i, k);
+
+            if (tmp != pid) {
+                System.err.println("buildSize: " + buildSize + " pid: " + pid + " k: " + k + " pair: " + IntervalPartitionUtil.getIntervalPartition(pid, k));
+                tmp = pid;
+            }
             processTuple(i, pid);
             ipjd.buildIncrementCount(pid);
+            buildSize++;
         }
     }
 
     public void closeBuild() throws HyracksDataException {
+        System.err.println("buildSize: " + buildSize);
+
         int inMemoryPartitions = 0;
         int totalBuildPartitions = 0;
         flushAndClearBuildSpilledPartition();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
index 8cae721..d242daa 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
@@ -181,6 +181,7 @@ public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAc
         this.array = buffer.array();
         setIndexCount(0);
         setDeletedSpace(0);
+        setNextIndex(0);
         setTupleAppend(0);
         resetCounts();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
index 7686540..af3cdfc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
@@ -37,15 +37,19 @@ import org.apache.hyracks.util.string.UTF8StringUtil;
 import org.junit.Before;
 import org.junit.Test;
 
+/**
+ * @see org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender
+ */
 public class DeletableFrameTupleAppenderTest {
+    private static final int META_DATA_SIZE = 4 + 4 + 4 + 4;
+    private static final int SLOT_SIZE = 4 + 4;
+    private static final char TEST_CH = 'x';
+
     DeletableFrameTupleAppender appender;
-    ISerializerDeserializer[] fields = new ISerializerDeserializer[] {
-            IntegerSerializerDeserializer.INSTANCE,
-            new UTF8StringSerializerDeserializer(),
-    };
+    ISerializerDeserializer[] fields = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer(), };
     RecordDescriptor recordDescriptor = new RecordDescriptor(fields);
     ArrayTupleBuilder builder = new ArrayTupleBuilder(recordDescriptor.getFieldCount());
-    static final char TEST_CH = 'x';
 
     int cap = 256;
 
@@ -60,26 +64,42 @@ public class DeletableFrameTupleAppenderTest {
         appender.clear(buffer);
         assertTrue(appender.getBuffer() == buffer);
         assertTrue(appender.getTupleCount() == 0);
-        assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4);
+        assertTrue(appender.getTotalFreeSpace() == cap - META_DATA_SIZE);
+        assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE);
     }
 
     ByteBuffer makeAFrame(int capacity, int count, int deletedBytes) throws HyracksDataException {
         ByteBuffer buffer = ByteBuffer.allocate(capacity);
         int metaOffset = capacity - 4;
+        buffer.putInt(metaOffset, count);
+        metaOffset -= 4;
         buffer.putInt(metaOffset, deletedBytes);
+        // next index
         metaOffset -= 4;
         buffer.putInt(metaOffset, count);
+        // append slot
+        metaOffset -= 4;
+        int appendOffset = metaOffset;
+        buffer.putInt(metaOffset, 0);
         metaOffset -= 4;
+
+        int start = 0;
         for (int i = 0; i < count; i++, metaOffset -= 4) {
             makeARecord(builder, i);
             for (int x = 0; x < builder.getFieldEndOffsets().length; x++) {
                 buffer.putInt(builder.getFieldEndOffsets()[x]);
             }
             buffer.put(builder.getByteArray(), 0, builder.getSize());
-            assert (metaOffset > buffer.position());
+
+            // Add slot information
+            buffer.putInt(metaOffset, start);
+            metaOffset -= 4;
             buffer.putInt(metaOffset, buffer.position());
 
+            start = buffer.position();
+            assert (metaOffset > buffer.position());
         }
+        buffer.putInt(appendOffset, start);
         return buffer;
     }
 
@@ -110,16 +130,16 @@ public class DeletableFrameTupleAppenderTest {
         appender.reset(buffer);
         assertTrue(appender.getBuffer() == buffer);
         assertTrue(appender.getTupleCount() == 0);
-        assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4);
+        assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE);
 
-        int count = 10;
+        int count = 8;
         int deleted = 7;
         buffer = makeAFrame(cap, count, deleted);
         int pos = buffer.position();
         appender.reset(buffer);
         assertTrue(appender.getBuffer() == buffer);
         assertTrue(appender.getTupleCount() == count);
-        assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4 - count * 4 - pos);
+        assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE - count * SLOT_SIZE - pos);
         assertTrue(appender.getTotalFreeSpace() == appender.getContiguousFreeSpace() + deleted);
 
         int dataOffset = 0;
@@ -130,7 +150,7 @@ public class DeletableFrameTupleAppenderTest {
 
     @Test
     public void testAppend() throws Exception {
-        int count = 10;
+        int count = 8;
         ByteBuffer bufferRead = makeAFrame(cap, count, 0);
         DeletableFrameTupleAppender accessor = new DeletableFrameTupleAppender(recordDescriptor);
         accessor.reset(bufferRead);
@@ -146,7 +166,7 @@ public class DeletableFrameTupleAppenderTest {
 
     @Test
     public void testDelete() throws Exception {
-        int count = 10;
+        int count = 8;
         int deleteSpace = 0;
         ByteBuffer buffer = makeAFrame(cap, count, deleteSpace);
         appender.reset(buffer);
@@ -165,7 +185,7 @@ public class DeletableFrameTupleAppenderTest {
     public void testResetAfterDelete() throws Exception {
         testDelete();
         appender.reset(appender.getBuffer());
-        assertEquals(cap - appender.getTupleCount() * 4 - 4 - 4, appender.getTotalFreeSpace());
+        assertEquals(cap - appender.getTupleCount() * SLOT_SIZE - META_DATA_SIZE, appender.getTotalFreeSpace());
 
     }
 
@@ -187,7 +207,7 @@ public class DeletableFrameTupleAppenderTest {
     @Test
     public void testAppendAndDelete() throws Exception {
         int cap = 1024;
-        int count = 10;
+        int count = 8;
         int deleteSpace = 0;
         ByteBuffer buffer = makeAFrame(cap, count, deleteSpace);
         int dataOffset = buffer.position();
@@ -221,7 +241,7 @@ public class DeletableFrameTupleAppenderTest {
 
     @Test
     public void testReOrganizeBuffer() throws Exception {
-        int count = 10;
+        int count = 8;
         testDelete();
         appender.reOrganizeBuffer();
         ByteBuffer bufferRead = makeAFrame(cap, count, 0);