You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/10 20:48:51 UTC
[incubator-iceberg] branch master updated: ORC: Make converters
reusable (#195) (#209)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 864d790 ORC: Make converters reusable (#195) (#209)
864d790 is described below
commit 864d79041f09d44fa1e50a7aa3237344a2880e5a
Author: Ratandeep Ratti <rr...@linkedin.com>
AuthorDate: Mon Jun 10 13:48:47 2019 -0700
ORC: Make converters reusable (#195) (#209)
---
.../apache/iceberg/spark/data/SparkOrcReader.java | 74 +++++++++++-----------
1 file changed, 36 insertions(+), 38 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
index 301a550..aa0397d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -60,16 +60,18 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
private final static int INITIAL_SIZE = 128 * 1024;
private final int numFields;
private final TypeDescription readSchema;
+ private final Converter[] converters;
public SparkOrcReader(Schema readSchema) {
this.readSchema = TypeConversion.toOrc(readSchema, new ColumnIdMap());
numFields = readSchema.columns().size();
+ converters = buildConverters();
}
- private Converter[] buildConverters(final UnsafeRowWriter writer) {
+ private Converter[] buildConverters() {
final Converter[] converters = new Converter[numFields];
for(int c = 0; c < numFields; ++c) {
- converters[c] = buildConverter(writer, readSchema.getChildren().get(c));
+ converters[c] = buildConverter(readSchema.getChildren().get(c));
}
return converters;
}
@@ -77,8 +79,6 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
@Override
public InternalRow read(VectorizedRowBatch batch, int row) {
final UnsafeRowWriter rowWriter = new UnsafeRowWriter(numFields, INITIAL_SIZE);
- final Converter[] converters = buildConverters(rowWriter);
-
rowWriter.reset();
rowWriter.zeroOutNullBytes();
for(int c=0; c < batch.cols.length; ++c) {
@@ -565,17 +565,17 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
private static class StructConverter implements Converter {
private final Converter[] children;
- private final UnsafeRowWriter childWriter;
- StructConverter(final UnsafeWriter parentWriter, final TypeDescription schema) {
+ StructConverter(final TypeDescription schema) {
children = new Converter[schema.getChildren().size()];
for(int c=0; c < children.length; ++c) {
- children[c] = buildConverter(parentWriter, schema.getChildren().get(c));
+ children[c] = buildConverter(schema.getChildren().get(c));
}
- childWriter = new UnsafeRowWriter(parentWriter, children.length);
+
}
- int writeStruct(StructColumnVector vector, int row) {
+ int writeStruct(UnsafeWriter parentWriter, StructColumnVector vector, int row) {
+ UnsafeRowWriter childWriter = new UnsafeRowWriter(parentWriter, children.length);
int start = childWriter.cursor();
childWriter.resetRowWriter();
for(int c=0; c < children.length; ++c) {
@@ -592,7 +592,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
if (!vector.noNulls && vector.isNull[row]) {
writer.setNullAt(column);
} else {
- int start = writeStruct((StructColumnVector) vector, row);
+ int start = writeStruct(writer, (StructColumnVector) vector, row);
writer.setOffsetAndSizeFromPreviousCursor(column, start);
}
}
@@ -606,30 +606,30 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
if (!vector.noNulls && vector.isNull[row]) {
writer.setNull(element);
} else {
- int start = writeStruct((StructColumnVector) vector, row);
+ int start = writeStruct(writer, (StructColumnVector) vector, row);
writer.setOffsetAndSizeFromPreviousCursor(element, start);
}
}
}
private static class ListConverter implements Converter {
- private final Converter children;
- private final UnsafeArrayWriter childWriter;
-
- ListConverter(final UnsafeWriter parentWriter, TypeDescription schema) {
- TypeDescription child = schema.getChildren().get(0);
- children = buildConverter(parentWriter, child);
- childWriter = new UnsafeArrayWriter(parentWriter, getArrayElementSize(child));
+ private final Converter childConverter;
+ private final TypeDescription child;
+ ListConverter(TypeDescription schema) {
+ child = schema.getChildren().get(0);
+ childConverter = buildConverter(child);
}
- int writeList(ListColumnVector v, int row) {
+ int writeList(UnsafeWriter parentWriter, ListColumnVector v, int row) {
int offset = (int) v.offsets[row];
int length = (int) v.lengths[row];
+
+ UnsafeArrayWriter childWriter = new UnsafeArrayWriter(parentWriter, getArrayElementSize(child));
int start = childWriter.cursor();
childWriter.initialize(length);
for(int c = 0; c < length; ++c) {
- children.convert(childWriter, c, v.child, offset + c);
+ childConverter.convert(childWriter, c, v.child, offset + c);
}
return start;
}
@@ -643,7 +643,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
if (!vector.noNulls && vector.isNull[row]) {
writer.setNullAt(column);
} else {
- int start = writeList((ListColumnVector) vector, row);
+ int start = writeList(writer, (ListColumnVector) vector, row);
writer.setOffsetAndSizeFromPreviousCursor(column, start);
}
}
@@ -657,7 +657,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
if (!vector.noNulls && vector.isNull[row]) {
writer.setNull(element);
} else {
- int start = writeList((ListColumnVector) vector, row);
+ int start = writeList(writer, (ListColumnVector) vector, row);
writer.setOffsetAndSizeFromPreviousCursor(element, start);
}
}
@@ -667,30 +667,27 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
private final Converter keyConvert;
private final Converter valueConvert;
- private final UnsafeArrayWriter keyWriter;
- private final UnsafeArrayWriter valueWriter;
-
private final int keySize;
private final int valueSize;
private final int KEY_SIZE_BYTES = 8;
- MapConverter(final UnsafeWriter parentWriter, TypeDescription schema) {
+ MapConverter(TypeDescription schema) {
final TypeDescription keyType = schema.getChildren().get(0);
final TypeDescription valueType = schema.getChildren().get(1);
- keyConvert = buildConverter(parentWriter, keyType);
+ keyConvert = buildConverter(keyType);
keySize = getArrayElementSize(keyType);
- keyWriter = new UnsafeArrayWriter(parentWriter, keySize);
- valueConvert = buildConverter(parentWriter, valueType);
+
+ valueConvert = buildConverter(valueType);
valueSize = getArrayElementSize(valueType);
- valueWriter = new UnsafeArrayWriter(parentWriter, valueSize);
}
- int writeMap(MapColumnVector v, int row) {
+ int writeMap(UnsafeWriter parentWriter, MapColumnVector v, int row) {
final int offset = (int) v.offsets[row];
final int length = (int) v.lengths[row];
- final int start = keyWriter.cursor();
+ UnsafeArrayWriter keyWriter = new UnsafeArrayWriter(parentWriter, keySize);
+ final int start = keyWriter.cursor();
// save room for the key size
keyWriter.grow(KEY_SIZE_BYTES);
keyWriter.increaseCursor(KEY_SIZE_BYTES);
@@ -705,6 +702,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
keyWriter.cursor() - start - KEY_SIZE_BYTES);
// serialize the values
+ UnsafeArrayWriter valueWriter = new UnsafeArrayWriter(parentWriter, valueSize);
valueWriter.initialize(length);
for(int c = 0; c < length; ++c) {
valueConvert.convert(valueWriter, c, v.values, offset + c);
@@ -720,7 +718,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
if (!vector.noNulls && vector.isNull[row]) {
writer.setNullAt(column);
} else {
- int start = writeMap((MapColumnVector) vector, row);
+ int start = writeMap(writer, (MapColumnVector) vector, row);
writer.setOffsetAndSizeFromPreviousCursor(column, start);
}
}
@@ -733,13 +731,13 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
if (!vector.noNulls && vector.isNull[row]) {
writer.setNull(element);
} else {
- int start = writeMap((MapColumnVector) vector, row);
+ int start = writeMap(writer, (MapColumnVector) vector, row);
writer.setOffsetAndSizeFromPreviousCursor(element, start);
}
}
}
- static Converter buildConverter(final UnsafeWriter writer, final TypeDescription schema) {
+ static Converter buildConverter(final TypeDescription schema) {
switch (schema.getCategory()) {
case BOOLEAN:
return new BooleanConverter();
@@ -770,11 +768,11 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
case VARCHAR:
return new BinaryConverter();
case STRUCT:
- return new StructConverter(writer, schema);
+ return new StructConverter(schema);
case LIST:
- return new ListConverter(writer, schema);
+ return new ListConverter(schema);
case MAP:
- return new MapConverter(writer, schema);
+ return new MapConverter(schema);
default:
throw new IllegalArgumentException("Unhandled type " + schema);
}