You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/08/02 18:13:12 UTC

[arrow] branch master updated: ARROW-17269: [Java] implemented TransferPair methods in MapVector to get correct valuevector as mapvector instead of listvector (#13776)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a9dcaff869 ARROW-17269: [Java] implemented TransferPair methods in MapVector to get correct valuevector as mapvector instead of listvector (#13776)
a9dcaff869 is described below

commit a9dcaff86960f3424c25d38c55fc6f8dcf0cdb9f
Author: Ankit Gehlot <46...@users.noreply.github.com>
AuthorDate: Tue Aug 2 23:43:03 2022 +0530

    ARROW-17269: [Java] implemented TransferPair methods in MapVector to get correct valuevector as mapvector instead of listvector (#13776)
    
    …rect valuevector as mapvector instead of listvector
    
    Authored-by: ankitgehlot <an...@dremio.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 .../apache/arrow/vector/complex/ListVector.java    |  10 +-
 .../org/apache/arrow/vector/complex/MapVector.java | 152 +++++++++++++++++++++
 .../org/apache/arrow/vector/TestMapVector.java     |  23 ++++
 .../apache/arrow/vector/TestSplitAndTransfer.java  |  22 +++
 4 files changed, 202 insertions(+), 5 deletions(-)

diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index d8fe72a707..0fa091fb0c 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -76,13 +76,13 @@ public class ListVector extends BaseRepeatedValueVector implements PromotableVec
   protected ArrowBuf validityBuffer;
   protected UnionListReader reader;
   private CallBack callBack;
-  private final FieldType fieldType;
-  private int validityAllocationSizeInBytes;
+  protected final FieldType fieldType;
+  protected int validityAllocationSizeInBytes;
 
   /**
    * The maximum index that is actually set.
    */
-  private int lastSet;
+  protected int lastSet;
 
   /**
    * Constructs a new instance.
@@ -276,7 +276,7 @@ public class ListVector extends BaseRepeatedValueVector implements PromotableVec
     return true;
   }
 
-  private void allocateValidityBuffer(final long size) {
+  protected void allocateValidityBuffer(final long size) {
     final int curSize = (int) size;
     validityBuffer = allocator.buffer(curSize);
     validityBuffer.readerIndex(0);
@@ -296,7 +296,7 @@ public class ListVector extends BaseRepeatedValueVector implements PromotableVec
     super.reAlloc();
   }
 
-  private void reallocValidityAndOffsetBuffers() {
+  protected void reallocValidityAndOffsetBuffers() {
     reallocOffsetBuffer();
     reallocValidityBuffer();
   }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
index 14cba0926e..b8f3f32a73 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
@@ -22,8 +22,12 @@ import static org.apache.arrow.util.Preconditions.checkArgument;
 import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.AddOrGetResult;
+import org.apache.arrow.vector.BitVectorHelper;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.ZeroVector;
 import org.apache.arrow.vector.complex.impl.UnionMapReader;
 import org.apache.arrow.vector.complex.impl.UnionMapWriter;
 import org.apache.arrow.vector.types.Types;
@@ -32,6 +36,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Map;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.util.TransferPair;
 
 /**
  * A MapVector is used to store entries of key/value pairs. It is a container vector that is
@@ -119,4 +124,151 @@ public class MapVector extends ListVector {
   public MinorType getMinorType() {
     return MinorType.MAP;
   }
+
+  @Override
+  public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
+    return getTransferPair(ref, allocator, null);
+  }
+
+  @Override
+  public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
+    return new TransferImpl(ref, allocator, callBack);
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector target) {
+    return new MapVector.TransferImpl((MapVector) target);
+  }
+
+  private class TransferImpl implements TransferPair {
+
+    MapVector to;
+    TransferPair dataTransferPair;
+
+    public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
+      this(new MapVector(name, allocator, fieldType, callBack));
+    }
+
+    public TransferImpl(MapVector to) {
+      this.to = to;
+      to.addOrGetVector(vector.getField().getFieldType());
+      if (to.getDataVector() instanceof ZeroVector) {
+        to.addOrGetVector(vector.getField().getFieldType());
+      }
+      dataTransferPair = getDataVector().makeTransferPair(to.getDataVector());
+    }
+
+    /**
+     * Transfer this vector'data to another vector. The memory associated
+     * with this vector is transferred to the allocator of target vector
+     * for accounting and management purposes.
+     */
+    @Override
+    public void transfer() {
+      to.clear();
+      dataTransferPair.transfer();
+      to.validityBuffer = transferBuffer(validityBuffer, to.allocator);
+      to.offsetBuffer = transferBuffer(offsetBuffer, to.allocator);
+      to.lastSet = lastSet;
+      if (valueCount > 0) {
+        to.setValueCount(valueCount);
+      }
+      clear();
+    }
+
+    /**
+     * Slice this vector at desired index and length and transfer the
+     * corresponding data to the target vector.
+     * @param startIndex start position of the split in source vector.
+     * @param length length of the split.
+     */
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+      Preconditions.checkArgument(startIndex >= 0 && length >= 0 && startIndex + length <= valueCount,
+              "Invalid parameters startIndex: %s, length: %s for valueCount: %s", startIndex, length, valueCount);
+      final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
+      final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint;
+      to.clear();
+      to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
+      /* splitAndTransfer offset buffer */
+      for (int i = 0; i < length + 1; i++) {
+        final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint;
+        to.offsetBuffer.setInt(i * OFFSET_WIDTH, relativeOffset);
+      }
+      /* splitAndTransfer validity buffer */
+      splitAndTransferValidityBuffer(startIndex, length, to);
+      /* splitAndTransfer data buffer */
+      dataTransferPair.splitAndTransfer(startPoint, sliceLength);
+      to.lastSet = length - 1;
+      to.setValueCount(length);
+    }
+
+    /*
+     * transfer the validity.
+     */
+    private void splitAndTransferValidityBuffer(int startIndex, int length, MapVector target) {
+      int firstByteSource = BitVectorHelper.byteIndex(startIndex);
+      int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
+      int byteSizeTarget = getValidityBufferSizeFromCount(length);
+      int offset = startIndex % 8;
+
+      if (length > 0) {
+        if (offset == 0) {
+          // slice
+          if (target.validityBuffer != null) {
+            target.validityBuffer.getReferenceManager().release();
+          }
+          target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
+          target.validityBuffer.getReferenceManager().retain(1);
+        } else {
+          /* Copy data
+           * When the first bit starts from the middle of a byte (offset != 0),
+           * copy data from src BitVector.
+           * Each byte in the target is composed by a part in i-th byte,
+           * another part in (i+1)-th byte.
+           */
+          target.allocateValidityBuffer(byteSizeTarget);
+
+          for (int i = 0; i < byteSizeTarget - 1; i++) {
+            byte b1 = BitVectorHelper.getBitsFromCurrentByte(validityBuffer, firstByteSource + i, offset);
+            byte b2 = BitVectorHelper.getBitsFromNextByte(validityBuffer, firstByteSource + i + 1, offset);
+
+            target.validityBuffer.setByte(i, (b1 + b2));
+          }
+
+          /* Copying the last piece is done in the following manner:
+           * if the source vector has 1 or more bytes remaining, we copy
+           * the last piece as a byte formed by shifting data
+           * from the current byte and the next byte.
+           *
+           * if the source vector has no more bytes remaining
+           * (we are at the last byte), we copy the last piece as a byte
+           * by shifting data from the current byte.
+           */
+          if ((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
+            byte b1 = BitVectorHelper.getBitsFromCurrentByte(validityBuffer,
+                    firstByteSource + byteSizeTarget - 1, offset);
+            byte b2 = BitVectorHelper.getBitsFromNextByte(validityBuffer,
+                    firstByteSource + byteSizeTarget, offset);
+
+            target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
+          } else {
+            byte b1 = BitVectorHelper.getBitsFromCurrentByte(validityBuffer,
+                    firstByteSource + byteSizeTarget - 1, offset);
+            target.validityBuffer.setByte(byteSizeTarget - 1, b1);
+          }
+        }
+      }
+    }
+
+    @Override
+    public ValueVector getTo() {
+      return to;
+    }
+
+    @Override
+    public void copyValueSafe(int from, int to) {
+      this.to.copyFrom(from, to, MapVector.this);
+    }
+  }
 }
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestMapVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestMapVector.java
index 9637021dbd..d60d5611a5 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestMapVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestMapVector.java
@@ -20,6 +20,7 @@ package org.apache.arrow.vector;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
@@ -1110,4 +1111,26 @@ public class TestMapVector {
       assertEquals(55, getResultValue(resultStruct));
     }
   }
+
+  @Test
+  public void testGetTransferPair() {
+    try (MapVector mapVector = MapVector.empty("mapVector", allocator, false)) {
+
+      FieldType type = new FieldType(false, ArrowType.Struct.INSTANCE, null, null);
+      AddOrGetResult<StructVector> addResult = mapVector.addOrGetVector(type);
+      FieldType keyType = new FieldType(false, MinorType.BIGINT.getType(), null, null);
+      FieldType valueType = FieldType.nullable(MinorType.FLOAT8.getType());
+      addResult.getVector().addOrGet(MapVector.KEY_NAME, keyType, BigIntVector.class);
+      addResult.getVector().addOrGet(MapVector.VALUE_NAME, valueType, Float8Vector.class);
+      mapVector.allocateNew();
+      mapVector.setValueCount(0);
+
+      assertEquals(-1, mapVector.getLastSet());
+      TransferPair tp = mapVector.getTransferPair(mapVector.getName(), allocator, null);
+      tp.transfer();
+      ValueVector vector = tp.getTo();
+      assertSame(vector.getClass(), mapVector.getClass());
+      vector.clear();
+    }
+  }
 }
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java b/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java
index e60b87e601..716fa0bde4 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java
@@ -29,8 +29,10 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.complex.FixedSizeListVector;
 import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.ArrowType.Struct;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.util.TransferPair;
@@ -406,5 +408,25 @@ public class TestSplitAndTransfer {
     }
   }
 
+  @Test
+  public void testMapVectorZeroStartIndexAndLength() {
+    Map<String, String> metadata = new HashMap<>();
+    metadata.put("k1", "v1");
+    FieldType type = new FieldType(true, new ArrowType.Map(false), null, metadata);
+    try (final MapVector mapVector = new MapVector("mapVec", allocator, type, null);
+         final MapVector newMapVector = new MapVector("newMapVec", allocator, type, null)) {
+
+      mapVector.allocateNew();
+      final int valueCount = 0;
+      mapVector.setValueCount(valueCount);
+
+      final TransferPair tp = mapVector.makeTransferPair(newMapVector);
+
+      tp.splitAndTransfer(0, 0);
+      assertEquals(valueCount, newMapVector.getValueCount());
+
+      newMapVector.clear();
+    }
+  }
 
 }