You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/06/30 16:51:13 UTC
parquet-mr git commit: PARQUET-642: Improve performance of ByteBuffer
based read / write paths
Repository: parquet-mr
Updated Branches:
refs/heads/master 9c40a7bb3 -> 7f8e952ab
PARQUET-642: Improve performance of ByteBuffer based read / write paths
While trying out the newest Parquet version, we noticed that the changes to start using ByteBuffers: https://github.com/apache/parquet-mr/commit/6b605a4ea05b66e1a6bf843353abcb4834a4ced8 and https://github.com/apache/parquet-mr/commit/6b24a1d1b5e2792a7821ad172a45e38d2b04f9b8 (mostly avro but a couple of ByteBuffer changes) caused our jobs to slow down a bit.
Read overhead: 4-6% (in MB_Millis)
Write overhead: 6-10% (MB_Millis).
Seems like this seems to be due to the encoding / decoding of Strings in the [Binary class](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java):
[toStringUsingUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L388) - for reads
[encodeUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L236) - for writes
With these changes we see around 5% improvement in MB_Millis while running the job on our Hadoop cluster.
Added some microbenchmark details to the jira.
Note that I've left the behavior the same for the avro write path - it still uses CharSequence and the Charset based encoders.
Author: Piyush Narang <pn...@twitter.com>
Closes #347 from piyushnarang/bytebuffer-encoding-fix-pr and squashes the following commits:
43c5bdd [Piyush Narang] Keep avro on char sequence
2d50c8c [Piyush Narang] Update Binary approach
9e58237 [Piyush Narang] Proof of concept fixes
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/7f8e952a
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/7f8e952a
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/7f8e952a
Branch: refs/heads/master
Commit: 7f8e952abc4d2fc4b96c97a51aa25fcf6ed8af02
Parents: 9c40a7b
Author: Piyush Narang <pn...@twitter.com>
Authored: Thu Jun 30 09:50:59 2016 -0700
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Thu Jun 30 09:50:59 2016 -0700
----------------------------------------------------------------------
.../apache/parquet/avro/AvroWriteSupport.java | 2 +-
.../java/org/apache/parquet/io/api/Binary.java | 73 ++++++++++++++------
2 files changed, 53 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/7f8e952a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 7fcd88e..460565b 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -364,7 +364,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
Utf8 utf8 = (Utf8) value;
return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
}
- return Binary.fromString((CharSequence) value);
+ return Binary.fromCharSequence((CharSequence) value);
}
private static GenericData getDataModel(Configuration conf) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/7f8e952a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index 30787f0..50b98c2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -31,6 +31,7 @@ import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.ParquetEncodingException;
import static org.apache.parquet.bytes.BytesUtils.UTF8;
@@ -214,7 +215,28 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
private static class FromStringBinary extends ByteBufferBackedBinary {
- public FromStringBinary(CharSequence value) {
+ public FromStringBinary(String value) {
+ // reused is false, because we do not hold on to the buffer after
+ // conversion, and nobody else has a handle to it
+ super(encodeUTF8(value), false);
+ }
+
+ @Override
+ public String toString() {
+ return "Binary{\"" + toStringUsingUTF8() + "\"}";
+ }
+
+ private static ByteBuffer encodeUTF8(String value) {
+ try {
+ return ByteBuffer.wrap(value.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new ParquetEncodingException("UTF-8 not supported.", e);
+ }
+ }
+ }
+
+ private static class FromCharSequenceBinary extends ByteBufferBackedBinary {
+ public FromCharSequenceBinary(CharSequence value) {
// reused is false, because we do not hold on to the buffer after
// conversion, and nobody else has a handle to it
super(encodeUTF8(value), false);
@@ -226,12 +248,12 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
private static final ThreadLocal<CharsetEncoder> ENCODER =
- new ThreadLocal<CharsetEncoder>() {
- @Override
- protected CharsetEncoder initialValue() {
- return StandardCharsets.UTF_8.newEncoder();
- }
- };
+ new ThreadLocal<CharsetEncoder>() {
+ @Override
+ protected CharsetEncoder initialValue() {
+ return StandardCharsets.UTF_8.newEncoder();
+ }
+ };
private static ByteBuffer encodeUTF8(CharSequence value) {
try {
@@ -386,16 +408,26 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
@Override
public String toStringUsingUTF8() {
- int limit = value.limit();
- value.limit(offset+length);
- int position = value.position();
- value.position(offset);
- // no corresponding interface to read a subset of a buffer, would have to slice it
- // which creates another ByteBuffer object or do what is done here to adjust the
- // limit/offset and set them back after
- String ret = UTF8.decode(value).toString();
- value.limit(limit);
- value.position(position);
+ String ret;
+ if (value.hasArray()) {
+ try {
+ ret = new String(value.array(), value.arrayOffset() + offset, length, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new ParquetDecodingException("UTF-8 not supported");
+ }
+ } else {
+ int limit = value.limit();
+ value.limit(offset+length);
+ int position = value.position();
+ value.position(offset);
+ // no corresponding interface to read a subset of a buffer, would have to slice it
+ // which creates another ByteBuffer object or do what is done here to adjust the
+ // limit/offset and set them back after
+ ret = UTF8.decode(value).toString();
+ value.limit(limit);
+ value.position(position);
+ }
+
return ret;
}
@@ -555,12 +587,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
public static Binary fromString(String value) {
- // this method is for binary backward-compatibility
- return fromString((CharSequence) value);
+ return new FromStringBinary(value);
}
- public static Binary fromString(CharSequence value) {
- return new FromStringBinary(value);
+ public static Binary fromCharSequence(CharSequence value) {
+ return new FromCharSequenceBinary(value);
}
/**