You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/09/02 23:05:43 UTC
spark git commit: Revert "[SPARK-16334] Reusing same dictionary
column for decoding consecutive row groups shouldn't throw an error"
Repository: spark
Updated Branches:
refs/heads/branch-2.0 b8f65dad7 -> c0ea77071
Revert "[SPARK-16334] Reusing same dictionary column for decoding consecutive row groups shouldn't throw an error"
This reverts commit a3930c3b9afa9f7eba2a5c8b8f279ca38e348e9b.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0ea7707
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0ea7707
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0ea7707
Branch: refs/heads/branch-2.0
Commit: c0ea7707127c92ecb51794b96ea40d7cdb28b168
Parents: b8f65da
Author: Davies Liu <da...@gmail.com>
Authored: Fri Sep 2 16:05:37 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Fri Sep 2 16:05:37 2016 -0700
----------------------------------------------------------------------
.../parquet/VectorizedColumnReader.java | 54 ++++++--------------
1 file changed, 16 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c0ea7707/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index cb51cb4..6c47dc0 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -221,21 +221,15 @@ public class VectorizedColumnReader {
if (column.dataType() == DataTypes.IntegerType ||
DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
- }
+ column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else if (column.dataType() == DataTypes.ByteType) {
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
- }
+ column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else if (column.dataType() == DataTypes.ShortType) {
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
- }
+ column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -246,9 +240,7 @@ public class VectorizedColumnReader {
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
- }
+ column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -257,27 +249,21 @@ public class VectorizedColumnReader {
case FLOAT:
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
- }
+ column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i)));
}
break;
case DOUBLE:
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
- }
+ column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i)));
}
break;
case INT96:
if (column.dataType() == DataTypes.TimestampType) {
for (int i = rowId; i < rowId + num; ++i) {
// TODO: Convert dictionary of Binaries to dictionary of Longs
- if (!column.isNullAt(i)) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
- }
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
}
} else {
throw new UnsupportedOperationException();
@@ -289,34 +275,26 @@ public class VectorizedColumnReader {
// and reuse it across batches. This should mean adding a ByteArray would just update
// the length and offset.
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putByteArray(i, v.getBytes());
- }
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putByteArray(i, v.getBytes());
}
break;
case FIXED_LEN_BYTE_ARRAY:
// DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
- }
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
- }
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- if (!column.isNullAt(i)) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putByteArray(i, v.getBytes());
- }
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putByteArray(i, v.getBytes());
}
} else {
throw new UnsupportedOperationException();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org