You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/03 18:33:39 UTC

[GitHub] [spark] parthchandra commented on a change in pull request #35262: [SPARK-37974][SQL] Implement vectorized DELTA_BYTE_ARRAY and DELTA_LENGTH_BYTE_ARRAY encodings for Parquet V2 support

parthchandra commented on a change in pull request #35262:
URL: https://github.com/apache/spark/pull/35262#discussion_r798135449



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
##########
@@ -66,20 +57,18 @@ public void readBinary(int total, WritableColumnVector c, int rowId) {
     }
     ByteBuffer buffer;
     ByteBufferOutputWriter outputWriter;
-    if (memoryMode == MemoryMode.OFF_HEAP) {
-      outputWriter = ByteBufferOutputWriter::copyWriteByteBuffer;
-    } else {
-      outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;
-    }
+    outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;

Review comment:
       done

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
##########
@@ -66,20 +57,18 @@ public void readBinary(int total, WritableColumnVector c, int rowId) {
     }
     ByteBuffer buffer;
     ByteBufferOutputWriter outputWriter;
-    if (memoryMode == MemoryMode.OFF_HEAP) {
-      outputWriter = ByteBufferOutputWriter::copyWriteByteBuffer;
-    } else {
-      outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;
-    }
+    outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;
     int length;
     for (int i = 0; i < total; i++) {
       length = lengthsVector.getInt(rowId + i);
-      try {
-        buffer = in.slice(length);
-      } catch (EOFException e) {
-        throw new ParquetDecodingException("Failed to read " + length + " bytes");
+      if (length > 0) {
+        try {
+          buffer = in.slice(length);
+        } catch (EOFException e) {
+          throw new ParquetDecodingException("Failed to read " + length + " bytes");
+        }
+        outputWriter.write(c, rowId + i, buffer, length);
       }
-      outputWriter.write(c, rowId + i, buffer, length);
       currentRow++;

Review comment:
       Done

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
##########
@@ -16,51 +16,121 @@
  */
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.RequiresPreviousReader;
+import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
- * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized interface.
+ * An implementation of the Parquet DELTA_BYTE_ARRAY decoder that supports the vectorized
+ * interface.
  */
-public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase {
-  private final DeltaByteArrayReader deltaByteArrayReader = new DeltaByteArrayReader();
+public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase
+    implements VectorizedValuesReader, RequiresPreviousReader {
+
+  private final VectorizedDeltaBinaryPackedReader prefixLengthReader =
+      new VectorizedDeltaBinaryPackedReader();
+  private final VectorizedDeltaLengthByteArrayReader suffixReader;
+  private WritableColumnVector prefixLengthVector;
+  private WritableColumnVector suffixVector;
+  private byte[] previous = new byte[0];
+  private int currentRow = 0;
+
+  //temporary variable used by getBinary
+  private final WritableColumnVector binaryValVector;
+
+  VectorizedDeltaByteArrayReader() {
+    this.suffixReader = new VectorizedDeltaLengthByteArrayReader();
+    binaryValVector = new OnHeapColumnVector(1, BinaryType);
+  }
 
   @Override
   public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
-    deltaByteArrayReader.initFromPage(valueCount, in);
+    prefixLengthVector = new OnHeapColumnVector(valueCount, IntegerType);
+    suffixVector = new OnHeapColumnVector(valueCount, BinaryType);
+    prefixLengthReader.initFromPage(valueCount, in);
+    prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
+        prefixLengthVector, 0);
+    suffixReader.initFromPage(valueCount, in);
+    suffixReader.readBinary(valueCount, suffixVector, 0);
   }
 
   @Override
   public Binary readBinary(int len) {
-    return deltaByteArrayReader.readBytes();
+    readValues(1, binaryValVector, 0, ByteBufferOutputWriter::writeArrayByteBuffer);
+    return Binary.fromConstantByteArray(binaryValVector.getBinary(0));
   }
 
-  @Override
-  public void readBinary(int total, WritableColumnVector c, int rowId) {
+  public void readValues(int total, WritableColumnVector c, int rowId,

Review comment:
       changed to private

##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
##########
@@ -66,20 +57,18 @@ public void readBinary(int total, WritableColumnVector c, int rowId) {
     }
     ByteBuffer buffer;
     ByteBufferOutputWriter outputWriter;
-    if (memoryMode == MemoryMode.OFF_HEAP) {
-      outputWriter = ByteBufferOutputWriter::copyWriteByteBuffer;
-    } else {
-      outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;
-    }
+    outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer;
     int length;
     for (int i = 0; i < total; i++) {
       length = lengthsVector.getInt(rowId + i);
-      try {
-        buffer = in.slice(length);
-      } catch (EOFException e) {
-        throw new ParquetDecodingException("Failed to read " + length + " bytes");
+      if (length > 0) {
+        try {
+          buffer = in.slice(length);
+        } catch (EOFException e) {
+          throw new ParquetDecodingException("Failed to read " + length + " bytes");
+        }
+        outputWriter.write(c, rowId + i, buffer, length);
       }
-      outputWriter.write(c, rowId + i, buffer, length);
       currentRow++;

Review comment:
       Note that there is a similar line in `VectorizedDeltaByteArrayReader` that cannot be moved out of the loop because the incremented value is accessed in each iteration.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org