You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/02/13 08:23:09 UTC

drill git commit: DRILL-1597: refactor empty population logic out & re-use across RepeatedList/Map vectors & add unit test

Repository: drill
Updated Branches:
  refs/heads/master aa2caa8af -> e46d68bee


DRILL-1597: refactor empty population logic out & re-use across RepeatedList/Map vectors & add unit test


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e46d68be
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e46d68be
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e46d68be

Branch: refs/heads/master
Commit: e46d68beed9da0cb7b5fad7c7cd76ad42a9e1a01
Parents: aa2caa8
Author: Hanifi Gunes <hg...@maprtech.com>
Authored: Fri Feb 6 11:00:23 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Thu Feb 12 14:06:12 2015 -0800

----------------------------------------------------------------------
 .../vector/complex/EmptyValuePopulator.java     | 53 +++++++++++
 .../exec/vector/complex/RepeatedListVector.java | 62 +++++--------
 .../exec/vector/complex/RepeatedMapVector.java  | 40 ++++-----
 .../exec/vector/complex/TestEmptyPopulator.java | 94 ++++++++++++++++++++
 4 files changed, 182 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e46d68be/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/EmptyValuePopulator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/EmptyValuePopulator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/EmptyValuePopulator.java
new file mode 100644
index 0000000..8c61a60
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/EmptyValuePopulator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.vector.UInt4Vector;
+
+/**
+ * A helper class that is used to track and populate empty values in repeated value vectors.
+ */
+public class EmptyValuePopulator {
+  private final UInt4Vector offsets;
+
+  public EmptyValuePopulator(UInt4Vector offsets) {
+    this.offsets = Preconditions.checkNotNull(offsets, "offsets cannot be null");
+  }
+
+  /**
+   * Marks all values since the last set as empty. The last set value is obtained from underlying offsets vector.
+   *
+   * @param lastIndex  the last index (inclusive) in the offsets vector until which empty population takes place
+   * @throws java.lang.IndexOutOfBoundsException  if lastIndex is negative or greater than offsets capacity.
+   */
+  public void populate(int lastIndex) {
+    if (lastIndex < 0) {
+      throw new IndexOutOfBoundsException("index cannot be negative");
+    }
+    final UInt4Vector.Accessor accessor = offsets.getAccessor();
+    final UInt4Vector.Mutator mutator = offsets.getMutator();
+    final int lastSet = Math.max(accessor.getValueCount() - 1, 0);
+    final int previousEnd = accessor.get(lastSet);
+    for (int i = lastSet; i < lastIndex; i++) {
+      mutator.setSafe(i + 1, previousEnd);
+    }
+    mutator.setValueCount(lastIndex+1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e46d68be/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index 3078f4e..8a79e50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -33,7 +33,6 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.ComplexHolder;
 import org.apache.drill.exec.expr.holders.RepeatedListHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
@@ -48,21 +47,23 @@ import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 
 public class RepeatedListVector extends AbstractContainerVector implements RepeatedFixedWidthVector{
 
+  public final static MajorType TYPE = Types.repeated(MinorType.LIST);
+
   private final UInt4Vector offsets;   // offsets to start of each record
   private final Mutator mutator = new Mutator();
   private final RepeatedListAccessor accessor = new RepeatedListAccessor();
   private ValueVector vector;
   private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
-  private int lastSet = 0;
+  private final EmptyValuePopulator emptyPopulator;
 
-  private int valueCount;
 
-  public static MajorType TYPE = Types.repeated(MinorType.LIST);
+  public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack){
+    this(MaterializedField.create(path, TYPE), allocator, callBack);
+  }
 
   public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
     super(field, allocator, callBack);
@@ -77,10 +78,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     }
 
     this.offsets = new UInt4Vector(null, allocator);
-  }
-
-  public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack){
-    this(MaterializedField.create(path, TYPE), allocator, callBack);
+    this.emptyPopulator = new EmptyValuePopulator(offsets);
   }
 
   @Override
@@ -123,23 +121,18 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   public class Mutator implements ValueVector.Mutator, RepeatedMutator{
 
     public void startNewGroup(int index) {
+      emptyPopulator.populate(index+1);
       offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
     }
 
     public int add(int index) {
-      int endOffset = index+1;
-      int currentChildOffset = offsets.getAccessor().get(endOffset);
-      int newChildOffset = currentChildOffset + 1;
-      offsets.getMutator().setSafe(endOffset, newChildOffset);
-      lastSet = index;
-      // this is done at beginning so return the currentChildOffset, not the new offset.
-      return currentChildOffset;
-
+      final int prevEnd = offsets.getAccessor().get(index+1);
+      offsets.getMutator().setSafe(index+1, prevEnd+1);
+      return prevEnd;
     }
 
-    @Override
     public void setValueCount(int groupCount) {
-      populateEmpties(groupCount);
+      emptyPopulator.populate(groupCount);
       offsets.getMutator().setValueCount(groupCount+1);
 
       if (vector != null) {
@@ -149,9 +142,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     }
 
     @Override
-    public void reset() {
-      lastSet = 0;
-    }
+    public void reset() { }
 
     @Override
     public void generateTestData(int values) {
@@ -196,11 +187,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
 
     @Override
     public int getValueCount() {
-//      if (offsets.getAccessor().getValueCount() == 0 ) {
-//        return 0;
-//      } else {
-        return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1);
-//      }
+      return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1);
     }
 
     public void get(int index, RepeatedListHolder holder) {
@@ -263,7 +250,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
 
   @Override
   public void clear() {
-    lastSet = 0;
+    getMutator().reset();
     offsets.clear();
     if (vector != null) {
       vector.clear();
@@ -299,7 +286,6 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     public void transfer() {
       offsets.transferTo(to.offsets);
       vectorTransfer.transfer();
-      to.valueCount = valueCount;
       clear();
     }
 
@@ -314,16 +300,16 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     }
 
     @Override
-    public void copyValueSafe(int from, int to) {
+    public void copyValueSafe(int srcIndex, int destIndex) {
       RepeatedListHolder holder = new RepeatedListHolder();
-      accessor.get(from, holder);
-      int newIndex = this.to.offsets.getAccessor().get(to);
+      accessor.get(srcIndex, holder);
+      to.emptyPopulator.populate(destIndex+1);
+      int newIndex = to.offsets.getAccessor().get(destIndex);
       //todo: make this a bulk copy.
       for (int i = holder.start; i < holder.end; i++, newIndex++) {
         vectorTransfer.copyValueSafe(i, newIndex);
       }
-      this.to.offsets.getMutator().setSafe(to + 1, newIndex);
-      this.to.lastSet++;
+      to.offsets.getMutator().setSafe(destIndex + 1, newIndex);
     }
 
   }
@@ -392,14 +378,6 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
         .build();
   }
 
-  private void populateEmpties(int groupCount) {
-    int previousEnd = offsets.getAccessor().get(lastSet + 1);
-    for (int i = lastSet + 2; i <= groupCount; i++) {
-      offsets.getMutator().setSafe(i, previousEnd);
-    }
-    lastSet = groupCount - 1;
-  }
-
   @Override
   public Iterator<ValueVector> iterator() {
     return Collections.singleton(vector).iterator();

http://git-wip-us.apache.org/repos/asf/drill/blob/e46d68be/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 6dce363..ad8c66f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -61,20 +61,21 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
   private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this);
   private final RepeatedMapAccessor accessor = new RepeatedMapAccessor();
   private final Mutator mutator = new Mutator();
-  private int lastPopulatedValueIndex = -1;
+  private final EmptyValuePopulator emptyPopulator;
 
   public RepeatedMapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
     super(field, allocator, callBack);
     this.offsets = new UInt4Vector(null, allocator);
+    this.emptyPopulator = new EmptyValuePopulator(offsets);
   }
 
   @Override
-  public void allocateNew(int topLevelValueCount, int childValueCount) {
+  public void allocateNew(int groupCount, int valueCount) {
     clear();
-    offsets.allocateNew(topLevelValueCount+1);
+    offsets.allocateNew(groupCount+1);
     offsets.zeroVector();
     for (ValueVector v : getChildren()) {
-      AllocationHelper.allocatePrecomputedChildCount(v, topLevelValueCount, 50, childValueCount);
+      AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, valueCount);
     }
     mutator.reset();
     accessor.reset();
@@ -296,7 +297,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
     public void copyValueSafe(int srcIndex, int destIndex) {
       RepeatedMapHolder holder = new RepeatedMapHolder();
       from.getAccessor().get(srcIndex, holder);
-      to.populateEmpties(destIndex+1);
+      to.emptyPopulator.populate(destIndex + 1);
       int newIndex = to.offsets.getAccessor().get(destIndex);
       //todo: make these bulk copies
       for (int i = holder.start; i < holder.end; i++, newIndex++) {
@@ -318,15 +319,15 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
 
       to.offsets.clear();
       to.offsets.allocateNew(groups + 1);
-      int normalizedPos = 0;
 
+      int normalizedPos;
       for (int i=0; i < groups+1; i++) {
         normalizedPos = a.get(groupStart+i) - startPos;
         m.set(i, normalizedPos);
       }
 
       m.setValueCount(groups + 1);
-      to.lastPopulatedValueIndex = groups - 1;
+      to.emptyPopulator.populate(groups);
 
       for (TransferPair p : pairs) {
         p.splitAndTransfer(startPos, valuesToCopy);
@@ -434,7 +435,6 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
     @Override
     public int getValueCount() {
       return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1);
-//      return offsets.getAccessor().getValueCount() - 1;
     }
 
     public int getGroupSizeAtIndex(int index) {
@@ -492,29 +492,22 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
     }
   }
 
-  private void populateEmpties(int topLevelValueCount) {
-    int previousEnd = offsets.getAccessor().get(lastPopulatedValueIndex + 1);
-    for (int i = lastPopulatedValueIndex + 1; i < topLevelValueCount; i++) {
-      offsets.getMutator().setSafe(i+1, previousEnd);
-    }
-    lastPopulatedValueIndex = topLevelValueCount - 1;
-  }
 
   public class Mutator implements ValueVector.Mutator, RepeatedMutator {
 
     public void startNewGroup(int index) {
-      populateEmpties(index+1);
+      emptyPopulator.populate(index+1);
       offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
     }
 
     public int add(int index) {
-      int prevEnd = offsets.getAccessor().get(index+1);
-      offsets.getMutator().setSafe(index+1, prevEnd+1);
+      final int prevEnd = offsets.getAccessor().get(index+1);
+      offsets.getMutator().setSafe(index + 1, prevEnd + 1);
       return prevEnd;
     }
 
     public void setValueCount(int topLevelValueCount) {
-      populateEmpties(topLevelValueCount);
+      emptyPopulator.populate(topLevelValueCount);
       offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount+1);
       int childValueCount = offsets.getAccessor().get(topLevelValueCount);
       for (ValueVector v : getChildren()) {
@@ -523,10 +516,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
     }
 
     @Override
-    public void reset() {
-      // the last non empty element index starts from -1
-      lastPopulatedValueIndex = -1;
-    }
+    public void reset() { }
 
     @Override
     public void generateTestData(int values) {
@@ -553,8 +543,8 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
     getMutator().reset();
 
     offsets.clear();
-    for(ValueVector v : getChildren()) {
-      v.clear();;
+    for(ValueVector vector:getChildren()) {
+      vector.clear();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e46d68be/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/TestEmptyPopulator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/TestEmptyPopulator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/TestEmptyPopulator.java
new file mode 100644
index 0000000..8426a6a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/TestEmptyPopulator.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestEmptyPopulator extends ExecTest {
+  private static final int BUF_SIZE = 10000;
+
+  @Mock
+  private BufferAllocator allocator;
+  private UInt4Vector offsets;
+  private UInt4Vector.Accessor accessor;
+  private UInt4Vector.Mutator mutator;
+  private EmptyValuePopulator populator;
+
+  private final ByteBuffer buffer = ByteBuffer.allocateDirect(BUF_SIZE);
+
+
+  @Before
+  public void initialize() {
+    Mockito.when(allocator.buffer(Mockito.anyInt())).thenReturn(DrillBuf.wrapByteBuffer(buffer));
+    offsets = new UInt4Vector(null, allocator);
+    offsets.allocateNewSafe();
+    accessor = offsets.getAccessor();
+    mutator = offsets.getMutator();
+    mutator.set(0, 0);
+    mutator.setValueCount(1);
+    Assert.assertTrue("offsets must have one value", accessor.getValueCount() == 1);
+    populator = new EmptyValuePopulator(offsets);
+  }
+
+  @Test(expected = IndexOutOfBoundsException.class)
+  public void testNegativeValuesThrowException() {
+    populator.populate(-1);
+  }
+
+  @Test
+  public void testZeroHasNoEffect() {
+    populator.populate(0);
+    Assert.assertTrue("offset must have one value", accessor.getValueCount() == 1);
+  }
+
+  @Test
+  public void testEmptyPopulationWorks() {
+    populator.populate(1);
+    Assert.assertEquals("offset must have valid size", 2, accessor.getValueCount());
+    Assert.assertEquals("value must match", 0, accessor.get(1));
+
+    mutator.set(1, 10);
+    populator.populate(2);
+    Assert.assertEquals("offset must have valid size", 3, accessor.getValueCount());
+    Assert.assertEquals("value must match", 10, accessor.get(1));
+
+    mutator.set(2, 20);
+    populator.populate(5);
+    Assert.assertEquals("offset must have valid size", 6, accessor.getValueCount());
+    for (int i=2; i<=5;i++) {
+      Assert.assertEquals(String.format("value at index[%s] must match", i), 20, accessor.get(i));
+    }
+
+    populator.populate(0);
+    Assert.assertEquals("offset must have valid size", 1, accessor.getValueCount());
+    Assert.assertEquals("value must match", 0, accessor.get(0));
+  }
+}