You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/02 04:37:47 UTC
[12/22] hive git commit: HIVE-4243. Fix column names in ORC metadata.
HIVE-4243. Fix column names in ORC metadata.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7b1ed3d3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7b1ed3d3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7b1ed3d3
Branch: refs/heads/llap
Commit: 7b1ed3d3037860e2b7fc24b760a993f5e928b816
Parents: 99fa337
Author: Owen O'Malley <om...@apache.org>
Authored: Fri Sep 4 16:11:13 2015 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Thu Oct 1 13:07:03 2015 +0200
----------------------------------------------------------------------
.../hive/ql/io/orc/ColumnStatisticsImpl.java | 55 +-
.../apache/hadoop/hive/ql/io/orc/OrcFile.java | 33 +-
.../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 145 ++++-
.../apache/hadoop/hive/ql/io/orc/OrcUtils.java | 177 +-----
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 2 +-
.../hadoop/hive/ql/io/orc/TypeDescription.java | 466 ++++++++++++++++
.../apache/hadoop/hive/ql/io/orc/Writer.java | 9 +
.../hadoop/hive/ql/io/orc/WriterImpl.java | 550 +++++++++----------
.../hadoop/hive/ql/io/orc/orc_proto.proto | 1 +
.../hive/ql/io/orc/TestColumnStatistics.java | 43 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 15 +-
.../hadoop/hive/ql/io/orc/TestOrcFile.java | 41 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 2 +-
.../hadoop/hive/ql/io/orc/TestOrcWideTable.java | 224 +-------
.../hive/ql/io/orc/TestTypeDescription.java | 67 +++
.../resources/orc-file-dump-bloomfilter.out | 2 +-
.../resources/orc-file-dump-bloomfilter2.out | 2 +-
.../orc-file-dump-dictionary-threshold.out | 2 +-
ql/src/test/resources/orc-file-dump.json | 2 +-
ql/src/test/resources/orc-file-dump.out | 2 +-
ql/src/test/resources/orc-file-has-null.out | 2 +-
.../clientpositive/annotate_stats_part.q.out | 6 +-
.../clientpositive/annotate_stats_table.q.out | 4 +-
.../dynpart_sort_opt_vectorization.q.out | 16 +-
.../dynpart_sort_optimization2.q.out | 8 +-
.../extrapolate_part_stats_full.q.out | 24 +-
.../extrapolate_part_stats_partial.q.out | 76 +--
.../extrapolate_part_stats_partial_ndv.q.out | 38 +-
.../results/clientpositive/orc_analyze.q.out | 46 +-
.../results/clientpositive/orc_file_dump.q.out | 18 +-
.../clientpositive/orc_int_type_promotion.q.out | 6 +-
.../clientpositive/spark/vectorized_ptf.q.out | 108 ++--
.../tez/dynpart_sort_opt_vectorization.q.out | 16 +-
.../tez/dynpart_sort_optimization2.q.out | 8 +-
.../clientpositive/tez/orc_analyze.q.out | 46 +-
.../clientpositive/tez/union_fast_stats.q.out | 16 +-
.../clientpositive/tez/vector_outer_join1.q.out | 48 +-
.../clientpositive/tez/vector_outer_join4.q.out | 48 +-
.../clientpositive/tez/vectorized_ptf.q.out | 108 ++--
.../clientpositive/union_fast_stats.q.out | 16 +-
.../results/clientpositive/vectorized_ptf.q.out | 104 ++--
41 files changed, 1468 insertions(+), 1134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
index 15a3e2c..f39d3e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
@@ -22,8 +22,6 @@ import java.sql.Timestamp;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
@@ -964,35 +962,30 @@ class ColumnStatisticsImpl implements ColumnStatistics {
return builder;
}
- static ColumnStatisticsImpl create(ObjectInspector inspector) {
- switch (inspector.getCategory()) {
- case PRIMITIVE:
- switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
- case BOOLEAN:
- return new BooleanStatisticsImpl();
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- return new IntegerStatisticsImpl();
- case FLOAT:
- case DOUBLE:
- return new DoubleStatisticsImpl();
- case STRING:
- case CHAR:
- case VARCHAR:
- return new StringStatisticsImpl();
- case DECIMAL:
- return new DecimalStatisticsImpl();
- case DATE:
- return new DateStatisticsImpl();
- case TIMESTAMP:
- return new TimestampStatisticsImpl();
- case BINARY:
- return new BinaryStatisticsImpl();
- default:
- return new ColumnStatisticsImpl();
- }
+ static ColumnStatisticsImpl create(TypeDescription schema) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanStatisticsImpl();
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ return new IntegerStatisticsImpl();
+ case FLOAT:
+ case DOUBLE:
+ return new DoubleStatisticsImpl();
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return new StringStatisticsImpl();
+ case DECIMAL:
+ return new DecimalStatisticsImpl();
+ case DATE:
+ return new DateStatisticsImpl();
+ case TIMESTAMP:
+ return new TimestampStatisticsImpl();
+ case BINARY:
+ return new BinaryStatisticsImpl();
default:
return new ColumnStatisticsImpl();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index a60ebb4..23dec4a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
/**
* Contains factory methods to read or write ORC files.
@@ -102,7 +103,9 @@ public final class OrcFile {
*/
public enum WriterVersion {
ORIGINAL(0),
- HIVE_8732(1); // corrupted stripe/file maximum column statistics
+ HIVE_8732(1), // corrupted stripe/file maximum column statistics
+ HIVE_4243(2), // use real column names from Hive tables
+ FUTURE(Integer.MAX_VALUE); // a version from a future writer
private final int id;
@@ -205,7 +208,9 @@ public final class OrcFile {
public static class WriterOptions {
private final Configuration configuration;
private FileSystem fileSystemValue = null;
- private ObjectInspector inspectorValue = null;
+ private boolean explicitSchema = false;
+ private TypeDescription schema = null;
+ private ObjectInspector inspector = null;
private long stripeSizeValue;
private long blockSizeValue;
private int rowIndexStrideValue;
@@ -355,11 +360,26 @@ public final class OrcFile {
}
/**
- * A required option that sets the object inspector for the rows. Used
- * to determine the schema for the file.
+ * A required option that sets the object inspector for the rows. If
+ * setSchema is not called, it also defines the schema.
*/
public WriterOptions inspector(ObjectInspector value) {
- inspectorValue = value;
+ this.inspector = value;
+ if (!explicitSchema) {
+ schema = OrcOutputFormat.convertTypeInfo(
+ TypeInfoUtils.getTypeInfoFromObjectInspector(value));
+ }
+ return this;
+ }
+
+ /**
+ * Set the schema for the file. This is a required parameter.
+ * @param schema the schema for the file.
+ * @return this
+ */
+ public WriterOptions setSchema(TypeDescription schema) {
+ this.explicitSchema = true;
+ this.schema = schema;
return this;
}
@@ -426,7 +446,8 @@ public final class OrcFile {
FileSystem fs = opts.fileSystemValue == null ?
path.getFileSystem(opts.configuration) : opts.fileSystemValue;
- return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue,
+ return new WriterImpl(fs, path, opts.configuration, opts.inspector,
+ opts.schema,
opts.stripeSizeValue, opts.compressValue,
opts.bufferSizeValue, opts.rowIndexStrideValue,
opts.memoryManagerValue, opts.blockPaddingValue,
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index ea4ebb4..ad24c58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -20,12 +20,17 @@ package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
@@ -36,6 +41,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
@@ -52,6 +66,90 @@ import org.apache.hadoop.util.Progressable;
public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
implements AcidOutputFormat<NullWritable, OrcSerdeRow> {
+ private static final Log LOG = LogFactory.getLog(OrcOutputFormat.class);
+
+ static TypeDescription convertTypeInfo(TypeInfo info) {
+ switch (info.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
+ switch (pinfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ return TypeDescription.createBoolean();
+ case BYTE:
+ return TypeDescription.createByte();
+ case SHORT:
+ return TypeDescription.createShort();
+ case INT:
+ return TypeDescription.createInt();
+ case LONG:
+ return TypeDescription.createLong();
+ case FLOAT:
+ return TypeDescription.createFloat();
+ case DOUBLE:
+ return TypeDescription.createDouble();
+ case STRING:
+ return TypeDescription.createString();
+ case DATE:
+ return TypeDescription.createDate();
+ case TIMESTAMP:
+ return TypeDescription.createTimestamp();
+ case BINARY:
+ return TypeDescription.createBinary();
+ case DECIMAL: {
+ DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
+ return TypeDescription.createDecimal()
+ .withScale(dinfo.getScale())
+ .withPrecision(dinfo.getPrecision());
+ }
+ case VARCHAR: {
+ BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+ return TypeDescription.createVarchar()
+ .withMaxLength(cinfo.getLength());
+ }
+ case CHAR: {
+ BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+ return TypeDescription.createChar()
+ .withMaxLength(cinfo.getLength());
+ }
+ default:
+ throw new IllegalArgumentException("ORC doesn't handle primitive" +
+ " category " + pinfo.getPrimitiveCategory());
+ }
+ }
+ case LIST: {
+ ListTypeInfo linfo = (ListTypeInfo) info;
+ return TypeDescription.createList
+ (convertTypeInfo(linfo.getListElementTypeInfo()));
+ }
+ case MAP: {
+ MapTypeInfo minfo = (MapTypeInfo) info;
+ return TypeDescription.createMap
+ (convertTypeInfo(minfo.getMapKeyTypeInfo()),
+ convertTypeInfo(minfo.getMapValueTypeInfo()));
+ }
+ case UNION: {
+ UnionTypeInfo minfo = (UnionTypeInfo) info;
+ TypeDescription result = TypeDescription.createUnion();
+ for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
+ result.addUnionChild(convertTypeInfo(child));
+ }
+ return result;
+ }
+ case STRUCT: {
+ StructTypeInfo sinfo = (StructTypeInfo) info;
+ TypeDescription result = TypeDescription.createStruct();
+ for(String fieldName: sinfo.getAllStructFieldNames()) {
+ result.addField(fieldName,
+ convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
+ }
+ return result;
+ }
+ default:
+ throw new IllegalArgumentException("ORC doesn't handle " +
+ info.getCategory());
+ }
+ }
+
private static class OrcRecordWriter
implements RecordWriter<NullWritable, OrcSerdeRow>,
StatsProvidingRecordWriter {
@@ -115,7 +213,44 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
}
private OrcFile.WriterOptions getOptions(JobConf conf, Properties props) {
- return OrcFile.writerOptions(props, conf);
+ OrcFile.WriterOptions result = OrcFile.writerOptions(props, conf);
+ if (props != null) {
+ final String columnNameProperty =
+ props.getProperty(IOConstants.COLUMNS);
+ final String columnTypeProperty =
+ props.getProperty(IOConstants.COLUMNS_TYPES);
+ if (columnNameProperty != null &&
+ !columnNameProperty.isEmpty() &&
+ columnTypeProperty != null &&
+ !columnTypeProperty.isEmpty()) {
+ List<String> columnNames;
+ List<TypeInfo> columnTypes;
+
+ if (columnNameProperty.length() == 0) {
+ columnNames = new ArrayList<String>();
+ } else {
+ columnNames = Arrays.asList(columnNameProperty.split(","));
+ }
+
+ if (columnTypeProperty.length() == 0) {
+ columnTypes = new ArrayList<TypeInfo>();
+ } else {
+ columnTypes =
+ TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ }
+
+ TypeDescription schema = TypeDescription.createStruct();
+ for (int i = 0; i < columnNames.size(); ++i) {
+ schema.addField(columnNames.get(i),
+ convertTypeInfo(columnTypes.get(i)));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ORC schema = " + schema);
+ }
+ result.setSchema(schema);
+ }
+ }
+ return result;
}
@Override
@@ -123,7 +258,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
Progressable reporter) throws IOException {
return new
- OrcRecordWriter(new Path(name), getOptions(conf,null));
+ OrcRecordWriter(new Path(name), getOptions(conf, null));
}
@@ -135,7 +270,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
boolean isCompressed,
Properties tableProperties,
Progressable reporter) throws IOException {
- return new OrcRecordWriter(path, getOptions(conf,tableProperties));
+ return new OrcRecordWriter(path, getOptions(conf, tableProperties));
}
private class DummyOrcRecordUpdater implements RecordUpdater {
@@ -229,8 +364,8 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
}
@Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecordWriter(Path path,
- Options options) throws IOException {
+ public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter
+ getRawRecordWriter(Path path, Options options) throws IOException {
final Path filename = AcidUtils.createFilename(path, options);
final OrcFile.WriterOptions opts =
OrcFile.writerOptions(options.getConfiguration());
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
index db2ca15..3e2af23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
@@ -18,20 +18,10 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-
-import com.google.common.collect.Lists;
public class OrcUtils {
private static final Log LOG = LogFactory.getLog(OrcUtils.class);
@@ -49,159 +39,44 @@ public class OrcUtils {
* index 5 correspond to column d. After flattening list<string> gets 2 columns.
*
* @param selectedColumns - comma separated list of selected column names
- * @param allColumns - comma separated list of all column names
- * @param inspector - object inspector
+ * @param schema - object schema
* @return - boolean array with true value set for the specified column names
*/
- public static boolean[] includeColumns(String selectedColumns, String allColumns,
- ObjectInspector inspector) {
- int numFlattenedCols = getFlattenedColumnsCount(inspector);
- boolean[] results = new boolean[numFlattenedCols];
+ public static boolean[] includeColumns(String selectedColumns,
+ TypeDescription schema) {
+ int numFlattenedCols = schema.getMaximumId();
+ boolean[] results = new boolean[numFlattenedCols + 1];
if ("*".equals(selectedColumns)) {
Arrays.fill(results, true);
return results;
}
- if (selectedColumns != null && !selectedColumns.isEmpty()) {
- includeColumnsImpl(results, selectedColumns.toLowerCase(), allColumns, inspector);
- }
- return results;
- }
-
- private static void includeColumnsImpl(boolean[] includeColumns, String selectedColumns,
- String allColumns,
- ObjectInspector inspector) {
- Map<String, List<Integer>> columnSpanMap = getColumnSpan(allColumns, inspector);
- LOG.info("columnSpanMap: " + columnSpanMap);
-
- String[] selCols = selectedColumns.split(",");
- for (String sc : selCols) {
- if (columnSpanMap.containsKey(sc)) {
- List<Integer> colSpan = columnSpanMap.get(sc);
- int start = colSpan.get(0);
- int end = colSpan.get(1);
- for (int i = start; i <= end; i++) {
- includeColumns[i] = true;
+ if (selectedColumns != null &&
+ schema.getCategory() == TypeDescription.Category.STRUCT) {
+ List<String> fieldNames = schema.getFieldNames();
+ List<TypeDescription> fields = schema.getChildren();
+ for (String column: selectedColumns.split((","))) {
+ TypeDescription col = findColumn(column, fieldNames, fields);
+ if (col != null) {
+ for(int i=col.getId(); i <= col.getMaximumId(); ++i) {
+ results[i] = true;
}
}
}
-
- LOG.info("includeColumns: " + Arrays.toString(includeColumns));
}
-
- private static Map<String, List<Integer>> getColumnSpan(String allColumns,
- ObjectInspector inspector) {
- // map that contains the column span for each column. Column span is the number of columns
- // required after flattening. For a given object inspector this map contains the start column
- // id and end column id (both inclusive) after flattening.
- // EXAMPLE:
- // schema: struct<a:int, b:float, c:map<string,int>>
- // column span map for the above struct will be
- // a => [1,1], b => [2,2], c => [3,5]
- Map<String, List<Integer>> columnSpanMap = new HashMap<String, List<Integer>>();
- if (allColumns != null) {
- String[] columns = allColumns.split(",");
- int startIdx = 0;
- int endIdx = 0;
- if (inspector instanceof StructObjectInspector) {
- StructObjectInspector soi = (StructObjectInspector) inspector;
- List<? extends StructField> fields = soi.getAllStructFieldRefs();
- for (int i = 0; i < fields.size(); i++) {
- StructField sf = fields.get(i);
-
- // we get the type (category) from object inspector but column name from the argument.
- // The reason for this is hive (FileSinkOperator) does not pass the actual column names,
- // instead it passes the internal column names (_col1,_col2).
- ObjectInspector sfOI = sf.getFieldObjectInspector();
- String colName = columns[i];
-
- startIdx = endIdx + 1;
- switch (sfOI.getCategory()) {
- case PRIMITIVE:
- endIdx += 1;
- break;
- case STRUCT:
- endIdx += 1;
- StructObjectInspector structInsp = (StructObjectInspector) sfOI;
- List<? extends StructField> structFields = structInsp.getAllStructFieldRefs();
- for (int j = 0; j < structFields.size(); ++j) {
- endIdx += getFlattenedColumnsCount(structFields.get(j).getFieldObjectInspector());
- }
- break;
- case MAP:
- endIdx += 1;
- MapObjectInspector mapInsp = (MapObjectInspector) sfOI;
- endIdx += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
- endIdx += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
- break;
- case LIST:
- endIdx += 1;
- ListObjectInspector listInsp = (ListObjectInspector) sfOI;
- endIdx += getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
- break;
- case UNION:
- endIdx += 1;
- UnionObjectInspector unionInsp = (UnionObjectInspector) sfOI;
- List<ObjectInspector> choices = unionInsp.getObjectInspectors();
- for (int j = 0; j < choices.size(); ++j) {
- endIdx += getFlattenedColumnsCount(choices.get(j));
- }
- break;
- default:
- throw new IllegalArgumentException("Bad category: " +
- inspector.getCategory());
- }
-
- columnSpanMap.put(colName, Lists.newArrayList(startIdx, endIdx));
- }
- }
- }
- return columnSpanMap;
+ return results;
}
- /**
- * Returns the number of columns after flatting complex types.
- *
- * @param inspector - object inspector
- * @return
- */
- public static int getFlattenedColumnsCount(ObjectInspector inspector) {
- int numWriters = 0;
- switch (inspector.getCategory()) {
- case PRIMITIVE:
- numWriters += 1;
- break;
- case STRUCT:
- numWriters += 1;
- StructObjectInspector structInsp = (StructObjectInspector) inspector;
- List<? extends StructField> fields = structInsp.getAllStructFieldRefs();
- for (int i = 0; i < fields.size(); ++i) {
- numWriters += getFlattenedColumnsCount(fields.get(i).getFieldObjectInspector());
- }
- break;
- case MAP:
- numWriters += 1;
- MapObjectInspector mapInsp = (MapObjectInspector) inspector;
- numWriters += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
- numWriters += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
- break;
- case LIST:
- numWriters += 1;
- ListObjectInspector listInsp = (ListObjectInspector) inspector;
- numWriters += getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
- break;
- case UNION:
- numWriters += 1;
- UnionObjectInspector unionInsp = (UnionObjectInspector) inspector;
- List<ObjectInspector> choices = unionInsp.getObjectInspectors();
- for (int i = 0; i < choices.size(); ++i) {
- numWriters += getFlattenedColumnsCount(choices.get(i));
- }
- break;
- default:
- throw new IllegalArgumentException("Bad category: " +
- inspector.getCategory());
+ private static TypeDescription findColumn(String columnName,
+ List<String> fieldNames,
+ List<TypeDescription> fields) {
+ int i = 0;
+ for(String fieldName: fieldNames) {
+ if (fieldName.equalsIgnoreCase(columnName)) {
+ return fields.get(i);
+ } else {
+ i += 1;
+ }
}
- return numWriters;
+ return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 23b3b55..36fb858 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -347,7 +347,7 @@ public class ReaderImpl implements Reader {
return version;
}
}
- return OrcFile.WriterVersion.ORIGINAL;
+ return OrcFile.WriterVersion.FUTURE;
}
/** Extracts the necessary metadata from an externally store buffer (fullFooterBuffer). */
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
new file mode 100644
index 0000000..3481bb3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This is the description of the types in an ORC file.
+ */
+public class TypeDescription {
+ private static final int MAX_PRECISION = 38;
+ private static final int MAX_SCALE = 38;
+ private static final int DEFAULT_PRECISION = 38;
+ private static final int DEFAULT_SCALE = 10;
+ private static final int DEFAULT_LENGTH = 256;
+ public enum Category {
+ BOOLEAN("boolean", true),
+ BYTE("tinyint", true),
+ SHORT("smallint", true),
+ INT("int", true),
+ LONG("bigint", true),
+ FLOAT("float", true),
+ DOUBLE("double", true),
+ STRING("string", true),
+ DATE("date", true),
+ TIMESTAMP("timestamp", true),
+ BINARY("binary", true),
+ DECIMAL("decimal", true),
+ VARCHAR("varchar", true),
+ CHAR("char", true),
+ LIST("array", false),
+ MAP("map", false),
+ STRUCT("struct", false),
+ UNION("union", false);
+
+ Category(String name, boolean isPrimitive) {
+ this.name = name;
+ this.isPrimitive = isPrimitive;
+ }
+
+ final boolean isPrimitive;
+ final String name;
+
+ public boolean isPrimitive() {
+ return isPrimitive;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
+
+ public static TypeDescription createBoolean() {
+ return new TypeDescription(Category.BOOLEAN);
+ }
+
+ public static TypeDescription createByte() {
+ return new TypeDescription(Category.BYTE);
+ }
+
+ public static TypeDescription createShort() {
+ return new TypeDescription(Category.SHORT);
+ }
+
+ public static TypeDescription createInt() {
+ return new TypeDescription(Category.INT);
+ }
+
+ public static TypeDescription createLong() {
+ return new TypeDescription(Category.LONG);
+ }
+
+ public static TypeDescription createFloat() {
+ return new TypeDescription(Category.FLOAT);
+ }
+
+ public static TypeDescription createDouble() {
+ return new TypeDescription(Category.DOUBLE);
+ }
+
+ public static TypeDescription createString() {
+ return new TypeDescription(Category.STRING);
+ }
+
+ public static TypeDescription createDate() {
+ return new TypeDescription(Category.DATE);
+ }
+
+ public static TypeDescription createTimestamp() {
+ return new TypeDescription(Category.TIMESTAMP);
+ }
+
+ public static TypeDescription createBinary() {
+ return new TypeDescription(Category.BINARY);
+ }
+
+ public static TypeDescription createDecimal() {
+ return new TypeDescription(Category.DECIMAL);
+ }
+
+ /**
+ * For decimal types, set the precision.
+ * @param precision the new precision
+ * @return this
+ */
+ public TypeDescription withPrecision(int precision) {
+ if (category != Category.DECIMAL) {
+ throw new IllegalArgumentException("precision is only allowed on decimal"+
+ " and not " + category.name);
+ } else if (precision < 1 || precision > MAX_PRECISION || scale > precision){
+ throw new IllegalArgumentException("precision " + precision +
+ " is out of range 1 .. " + scale);
+ }
+ this.precision = precision;
+ return this;
+ }
+
+ /**
+ * For decimal types, set the scale.
+ * @param scale the new scale
+ * @return this
+ */
+ public TypeDescription withScale(int scale) {
+ if (category != Category.DECIMAL) {
+ throw new IllegalArgumentException("scale is only allowed on decimal"+
+ " and not " + category.name);
+ } else if (scale < 0 || scale > MAX_SCALE || scale > precision) {
+ throw new IllegalArgumentException("scale is out of range at " + scale);
+ }
+ this.scale = scale;
+ return this;
+ }
+
+ public static TypeDescription createVarchar() {
+ return new TypeDescription(Category.VARCHAR);
+ }
+
+ public static TypeDescription createChar() {
+ return new TypeDescription(Category.CHAR);
+ }
+
+ /**
+ * Set the maximum length for char and varchar types.
+ * @param maxLength the maximum value
+ * @return this
+ */
+ public TypeDescription withMaxLength(int maxLength) {
+ if (category != Category.VARCHAR && category != Category.CHAR) {
+ throw new IllegalArgumentException("maxLength is only allowed on char" +
+ " and varchar and not " + category.name);
+ }
+ this.maxLength = maxLength;
+ return this;
+ }
+
+ public static TypeDescription createList(TypeDescription childType) {
+ TypeDescription result = new TypeDescription(Category.LIST);
+ result.children.add(childType);
+ childType.parent = result;
+ return result;
+ }
+
+ public static TypeDescription createMap(TypeDescription keyType,
+ TypeDescription valueType) {
+ TypeDescription result = new TypeDescription(Category.MAP);
+ result.children.add(keyType);
+ result.children.add(valueType);
+ keyType.parent = result;
+ valueType.parent = result;
+ return result;
+ }
+
+ public static TypeDescription createUnion() {
+ return new TypeDescription(Category.UNION);
+ }
+
+ public static TypeDescription createStruct() {
+ return new TypeDescription(Category.STRUCT);
+ }
+
+ /**
+ * Add a child to a union type.
+ * @param child a new child type to add
+ * @return the union type.
+ */
+ public TypeDescription addUnionChild(TypeDescription child) {
+ if (category != Category.UNION) {
+ throw new IllegalArgumentException("Can only add types to union type" +
+ " and not " + category);
+ }
+ children.add(child);
+ child.parent = this;
+ return this;
+ }
+
+ /**
+ * Add a field to a struct type as it is built.
+ * @param field the field name
+ * @param fieldType the type of the field
+ * @return the struct type
+ */
+ public TypeDescription addField(String field, TypeDescription fieldType) {
+ if (category != Category.STRUCT) {
+ throw new IllegalArgumentException("Can only add fields to struct type" +
+ " and not " + category);
+ }
+ fieldNames.add(field);
+ children.add(fieldType);
+ fieldType.parent = this;
+ return this;
+ }
+
+ /**
+ * Get the id for this type.
+ * The first call will cause all of the the ids in tree to be assigned, so
+ * it should not be called before the type is completely built.
+ * @return the sequential id
+ */
+ public int getId() {
+ // if the id hasn't been assigned, assign all of the ids from the root
+ if (id == -1) {
+ TypeDescription root = this;
+ while (root.parent != null) {
+ root = root.parent;
+ }
+ root.assignIds(0);
+ }
+ return id;
+ }
+
+ /**
+ * Get the maximum id assigned to this type or its children.
+ * The first call will cause all of the the ids in tree to be assigned, so
+ * it should not be called before the type is completely built.
+ * @return the maximum id assigned under this type
+ */
+ public int getMaximumId() {
+ // if the id hasn't been assigned, assign all of the ids from the root
+ if (maxId == -1) {
+ TypeDescription root = this;
+ while (root.parent != null) {
+ root = root.parent;
+ }
+ root.assignIds(0);
+ }
+ return maxId;
+ }
+
+ /**
+ * Get the kind of this type.
+ * @return get the category for this type.
+ */
+ public Category getCategory() {
+ return category;
+ }
+
+ /**
+ * Get the maximum length of the type. Only used for char and varchar types.
+ * @return the maximum length of the string type
+ */
+ public int getMaxLength() {
+ return maxLength;
+ }
+
+ /**
+ * Get the precision of the decimal type.
+ * @return the number of digits for the precision.
+ */
+ public int getPrecision() {
+ return precision;
+ }
+
+ /**
+ * Get the scale of the decimal type.
+ * @return the number of digits for the scale.
+ */
+ public int getScale() {
+ return scale;
+ }
+
+ /**
+ * For struct types, get the list of field names.
+ * @return the list of field names.
+ */
+ public List<String> getFieldNames() {
+ return Collections.unmodifiableList(fieldNames);
+ }
+
+ /**
+ * Get the subtypes of this type.
+ * @return the list of children types
+ */
+ public List<TypeDescription> getChildren() {
+ return children == null ? null : Collections.unmodifiableList(children);
+ }
+
+ /**
+ * Assign ids to all of the nodes under this one.
+ * @param startId the lowest id to assign
+ * @return the next available id
+ */
+ private int assignIds(int startId) {
+ id = startId++;
+ if (children != null) {
+ for (TypeDescription child : children) {
+ startId = child.assignIds(startId);
+ }
+ }
+ maxId = startId - 1;
+ return startId;
+ }
+
+ private TypeDescription(Category category) {
+ this.category = category;
+ if (category.isPrimitive) {
+ children = null;
+ } else {
+ children = new ArrayList<>();
+ }
+ if (category == Category.STRUCT) {
+ fieldNames = new ArrayList<>();
+ } else {
+ fieldNames = null;
+ }
+ }
+
+ private int id = -1;
+ private int maxId = -1;
+ private TypeDescription parent;
+ private final Category category;
+ private final List<TypeDescription> children;
+ private final List<String> fieldNames;
+ private int maxLength = DEFAULT_LENGTH;
+ private int precision = DEFAULT_PRECISION;
+ private int scale = DEFAULT_SCALE;
+
+ public void printToBuffer(StringBuilder buffer) {
+ buffer.append(category.name);
+ switch (category) {
+ case DECIMAL:
+ buffer.append('(');
+ buffer.append(precision);
+ buffer.append(',');
+ buffer.append(scale);
+ buffer.append(')');
+ break;
+ case CHAR:
+ case VARCHAR:
+ buffer.append('(');
+ buffer.append(maxLength);
+ buffer.append(')');
+ break;
+ case LIST:
+ case MAP:
+ case UNION:
+ buffer.append('<');
+ for(int i=0; i < children.size(); ++i) {
+ if (i != 0) {
+ buffer.append(',');
+ }
+ children.get(i).printToBuffer(buffer);
+ }
+ buffer.append('>');
+ break;
+ case STRUCT:
+ buffer.append('<');
+ for(int i=0; i < children.size(); ++i) {
+ if (i != 0) {
+ buffer.append(',');
+ }
+ buffer.append(fieldNames.get(i));
+ buffer.append(':');
+ children.get(i).printToBuffer(buffer);
+ }
+ buffer.append('>');
+ break;
+ default:
+ break;
+ }
+ }
+
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ printToBuffer(buffer);
+ return buffer.toString();
+ }
+
+ private void printJsonToBuffer(String prefix, StringBuilder buffer,
+ int indent) {
+ for(int i=0; i < indent; ++i) {
+ buffer.append(' ');
+ }
+ buffer.append(prefix);
+ buffer.append("{\"category\": \"");
+ buffer.append(category.name);
+ buffer.append("\", \"id\": ");
+ buffer.append(getId());
+ buffer.append(", \"max\": ");
+ buffer.append(maxId);
+ switch (category) {
+ case DECIMAL:
+ buffer.append(", \"precision\": ");
+ buffer.append(precision);
+ buffer.append(", \"scale\": ");
+ buffer.append(scale);
+ break;
+ case CHAR:
+ case VARCHAR:
+ buffer.append(", \"length\": ");
+ buffer.append(maxLength);
+ break;
+ case LIST:
+ case MAP:
+ case UNION:
+ buffer.append(", \"children\": [");
+ for(int i=0; i < children.size(); ++i) {
+ buffer.append('\n');
+ children.get(i).printJsonToBuffer("", buffer, indent + 2);
+ if (i != children.size() - 1) {
+ buffer.append(',');
+ }
+ }
+ buffer.append("]");
+ break;
+ case STRUCT:
+ buffer.append(", \"fields\": [");
+ for(int i=0; i < children.size(); ++i) {
+ buffer.append('\n');
+ children.get(i).printJsonToBuffer("\"" + fieldNames.get(i) + "\": ",
+ buffer, indent + 2);
+ if (i != children.size() - 1) {
+ buffer.append(',');
+ }
+ }
+ buffer.append(']');
+ break;
+ default:
+ break;
+ }
+ buffer.append('}');
+ }
+
+ public String toJson() {
+ StringBuilder buffer = new StringBuilder();
+ printJsonToBuffer("", buffer, 0);
+ return buffer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
index 6411e3f..8991f2d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -26,6 +28,13 @@ import java.util.List;
* The interface for writing ORC files.
*/
public interface Writer {
+
+ /**
+ * Get the schema for this writer
+ * @return the file schema
+ */
+ TypeDescription getSchema();
+
/**
* Add arbitrary meta-data to the ORC file. This may be called at any point
* until the Writer is closed. If the same key is passed a second time, the
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 7aa8d65..767d3f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy;
@@ -54,7 +53,6 @@ import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
@@ -72,9 +70,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
@@ -127,6 +122,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private final int bufferSize;
private final long blockSize;
private final double paddingTolerance;
+ private final TypeDescription schema;
+
// the streams that make up the current stripe
private final Map<StreamName, BufferedStream> streams =
new TreeMap<StreamName, BufferedStream>();
@@ -165,6 +162,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
Path path,
Configuration conf,
ObjectInspector inspector,
+ TypeDescription schema,
long stripeSize,
CompressionKind compress,
int bufferSize,
@@ -183,6 +181,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.path = path;
this.conf = conf;
this.callback = callback;
+ this.schema = schema;
if (callback != null) {
callbackContext = new OrcFile.WriterContext(){
@@ -207,21 +206,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.memoryManager = memoryManager;
buildIndex = rowIndexStride > 0;
codec = createCodec(compress);
- String allColumns = conf.get(IOConstants.COLUMNS);
- if (allColumns == null) {
- allColumns = getColumnNamesFromInspector(inspector);
- }
- this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize);
+ int numColumns = schema.getMaximumId() + 1;
+ this.bufferSize = getEstimatedBufferSize(getMemoryAvailableForORC(),
+ codec != null, numColumns, bufferSize);
if (version == OrcFile.Version.V_0_11) {
/* do not write bloom filters for ORC v11 */
- this.bloomFilterColumns =
- OrcUtils.includeColumns(null, allColumns, inspector);
+ this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
} else {
this.bloomFilterColumns =
- OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector);
+ OrcUtils.includeColumns(bloomFilterColumnNames, schema);
}
this.bloomFilterFpp = bloomFilterFpp;
- treeWriter = createTreeWriter(inspector, streamFactory, false);
+ treeWriter = createTreeWriter(inspector, schema, streamFactory, false);
if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
throw new IllegalArgumentException("Row stride must be at least " +
MIN_ROW_INDEX_STRIDE);
@@ -231,62 +227,42 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
memoryManager.addWriter(path, stripeSize, this);
}
- private String getColumnNamesFromInspector(ObjectInspector inspector) {
- List<String> fieldNames = Lists.newArrayList();
- Joiner joiner = Joiner.on(",");
- if (inspector instanceof StructObjectInspector) {
- StructObjectInspector soi = (StructObjectInspector) inspector;
- List<? extends StructField> fields = soi.getAllStructFieldRefs();
- for(StructField sf : fields) {
- fieldNames.add(sf.getFieldName());
- }
- }
- return joiner.join(fieldNames);
- }
+ static int getEstimatedBufferSize(long availableMem,
+ boolean isCompressed,
+ int columnCount, int bs) {
+ if (columnCount > COLUMN_COUNT_THRESHOLD) {
+ // In BufferedStream, there are 3 outstream buffers (compressed,
+ // uncompressed and overflow) and list of previously compressed buffers.
+ // Since overflow buffer is rarely used, lets consider only 2 allocation.
+ // Also, initially, the list of compression buffers will be empty.
+ final int outStreamBuffers = isCompressed ? 2 : 1;
- @VisibleForTesting
- int getEstimatedBufferSize(int bs) {
- return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs);
- }
+ // max possible streams per column is 5. For string columns, there is
+ // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams.
+ final int maxStreams = 5;
- int getEstimatedBufferSize(String colNames, int bs) {
- long availableMem = getMemoryAvailableForORC();
- if (colNames != null) {
- final int numCols = colNames.split(",").length;
- if (numCols > COLUMN_COUNT_THRESHOLD) {
- // In BufferedStream, there are 3 outstream buffers (compressed,
- // uncompressed and overflow) and list of previously compressed buffers.
- // Since overflow buffer is rarely used, lets consider only 2 allocation.
- // Also, initially, the list of compression buffers will be empty.
- final int outStreamBuffers = codec == null ? 1 : 2;
-
- // max possible streams per column is 5. For string columns, there is
- // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams.
- final int maxStreams = 5;
-
- // Lets assume 10% memory for holding dictionary in memory and other
- // object allocations
- final long miscAllocation = (long) (0.1f * availableMem);
-
- // compute the available memory
- final long remainingMem = availableMem - miscAllocation;
-
- int estBufferSize = (int) (remainingMem /
- (maxStreams * outStreamBuffers * numCols));
- estBufferSize = getClosestBufferSize(estBufferSize, bs);
- if (estBufferSize > bs) {
- estBufferSize = bs;
- }
+ // Lets assume 10% memory for holding dictionary in memory and other
+ // object allocations
+ final long miscAllocation = (long) (0.1f * availableMem);
- LOG.info("WIDE TABLE - Number of columns: " + numCols +
- " Chosen compression buffer size: " + estBufferSize);
- return estBufferSize;
+ // compute the available memory
+ final long remainingMem = availableMem - miscAllocation;
+
+ int estBufferSize = (int) (remainingMem /
+ (maxStreams * outStreamBuffers * columnCount));
+ estBufferSize = getClosestBufferSize(estBufferSize);
+ if (estBufferSize > bs) {
+ estBufferSize = bs;
}
+
+ LOG.info("WIDE TABLE - Number of columns: " + columnCount +
+ " Chosen compression buffer size: " + estBufferSize);
+ return estBufferSize;
}
return bs;
}
- private int getClosestBufferSize(int estBufferSize, int bs) {
+ private static int getClosestBufferSize(int estBufferSize) {
final int kb4 = 4 * 1024;
final int kb8 = 8 * 1024;
final int kb16 = 16 * 1024;
@@ -546,15 +522,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
/**
- * Get the current column id. After creating all tree writers this count should tell how many
- * columns (including columns within nested complex objects) are created in total.
- * @return current column id
- */
- public int getCurrentColumnId() {
- return columnCount;
- }
-
- /**
* Get the stride rate of the row index.
*/
public int getRowIndexStride() {
@@ -666,11 +633,13 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* Create a tree writer.
* @param columnId the column id of the column to write
* @param inspector the object inspector to use
+ * @param schema the row schema
* @param streamFactory limited access to the Writer's data.
* @param nullable can the value be null?
* @throws IOException
*/
TreeWriter(int columnId, ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory streamFactory,
boolean nullable) throws IOException {
this.streamFactory = streamFactory;
@@ -686,9 +655,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
this.foundNulls = false;
createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
- indexStatistics = ColumnStatisticsImpl.create(inspector);
- stripeColStatistics = ColumnStatisticsImpl.create(inspector);
- fileStatistics = ColumnStatisticsImpl.create(inspector);
+ indexStatistics = ColumnStatisticsImpl.create(schema);
+ stripeColStatistics = ColumnStatisticsImpl.create(schema);
+ fileStatistics = ColumnStatisticsImpl.create(schema);
childrenWriters = new TreeWriter[0];
rowIndex = OrcProto.RowIndex.newBuilder();
rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
@@ -749,7 +718,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
/**
* Add a new value to the column.
- * @param obj
+ * @param obj the object to write
* @throws IOException
*/
void write(Object obj) throws IOException {
@@ -919,9 +888,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
BooleanTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.writer = new BitFieldWriter(out, 1);
@@ -958,9 +928,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
ByteTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
this.writer = new RunLengthByteWriter(writer.createStream(id,
OrcProto.Stream.Kind.DATA));
recordPosition(rowIndexPosition);
@@ -1003,9 +974,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
IntegerTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
OutStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.isDirectV2 = isNewWriteFormat(writer);
@@ -1079,9 +1051,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
FloatTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.utils = new SerializationUtils();
@@ -1123,9 +1096,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
DoubleTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.utils = new SerializationUtils();
@@ -1184,9 +1158,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
StringTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
stringOutput = writer.createStream(id,
OrcProto.Stream.Kind.DICTIONARY_DATA);
@@ -1423,9 +1398,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
CharTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
}
/**
@@ -1445,9 +1421,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
VarcharTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
}
/**
@@ -1467,9 +1444,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
BinaryTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.isDirectV2 = isNewWriteFormat(writer);
@@ -1531,9 +1509,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
TimestampTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
this.seconds = createIntegerWriter(writer.createStream(id,
OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
@@ -1610,9 +1589,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
DateTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
OutStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.isDirectV2 = isNewWriteFormat(writer);
@@ -1666,9 +1646,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
DecimalTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
this.scaleStream = createIntegerWriter(writer.createStream(id,
@@ -1726,16 +1707,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private final List<? extends StructField> fields;
StructTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
+ List<TypeDescription> children = schema.getChildren();
StructObjectInspector structObjectInspector =
(StructObjectInspector) inspector;
fields = structObjectInspector.getAllStructFieldRefs();
- childrenWriters = new TreeWriter[fields.size()];
+ childrenWriters = new TreeWriter[children.size()];
for(int i=0; i < childrenWriters.length; ++i) {
+ ObjectInspector childOI = i < fields.size() ?
+ fields.get(i).getFieldObjectInspector() : null;
childrenWriters[i] = createTreeWriter(
- fields.get(i).getFieldObjectInspector(), writer, true);
+ childOI, children.get(i), writer,
+ true);
}
recordPosition(rowIndexPosition);
}
@@ -1770,15 +1756,16 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
ListTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
- ListObjectInspector listObjectInspector = (ListObjectInspector) inspector;
+ ObjectInspector childOI =
+ ((ListObjectInspector) inspector).getListElementObjectInspector();
childrenWriters = new TreeWriter[1];
childrenWriters[0] =
- createTreeWriter(listObjectInspector.getListElementObjectInspector(),
- writer, true);
+ createTreeWriter(childOI, schema.getChildren().get(0), writer, true);
lengths = createIntegerWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
recordPosition(rowIndexPosition);
@@ -1834,16 +1821,20 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
MapTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
MapObjectInspector insp = (MapObjectInspector) inspector;
childrenWriters = new TreeWriter[2];
+ List<TypeDescription> children = schema.getChildren();
childrenWriters[0] =
- createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
+ createTreeWriter(insp.getMapKeyObjectInspector(), children.get(0),
+ writer, true);
childrenWriters[1] =
- createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
+ createTreeWriter(insp.getMapValueObjectInspector(), children.get(1),
+ writer, true);
lengths = createIntegerWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
recordPosition(rowIndexPosition);
@@ -1901,14 +1892,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
UnionTreeWriter(int columnId,
ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
- super(columnId, inspector, writer, nullable);
+ super(columnId, inspector, schema, writer, nullable);
UnionObjectInspector insp = (UnionObjectInspector) inspector;
List<ObjectInspector> choices = insp.getObjectInspectors();
- childrenWriters = new TreeWriter[choices.size()];
+ List<TypeDescription> children = schema.getChildren();
+ childrenWriters = new TreeWriter[children.size()];
for(int i=0; i < childrenWriters.length; ++i) {
- childrenWriters[i] = createTreeWriter(choices.get(i), writer, true);
+ childrenWriters[i] = createTreeWriter(choices.get(i),
+ children.get(i), writer, true);
}
tags =
new RunLengthByteWriter(writer.createStream(columnId,
@@ -1949,168 +1943,151 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
private static TreeWriter createTreeWriter(ObjectInspector inspector,
+ TypeDescription schema,
StreamFactory streamFactory,
boolean nullable) throws IOException {
- switch (inspector.getCategory()) {
- case PRIMITIVE:
- switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
- case BOOLEAN:
- return new BooleanTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case BYTE:
- return new ByteTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case SHORT:
- case INT:
- case LONG:
- return new IntegerTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case FLOAT:
- return new FloatTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case DOUBLE:
- return new DoubleTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case STRING:
- return new StringTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case CHAR:
- return new CharTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case VARCHAR:
- return new VarcharTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case BINARY:
- return new BinaryTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case TIMESTAMP:
- return new TimestampTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case DATE:
- return new DateTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- case DECIMAL:
- return new DecimalTreeWriter(streamFactory.getNextColumnId(),
- inspector, streamFactory, nullable);
- default:
- throw new IllegalArgumentException("Bad primitive category " +
- ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
- }
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case BYTE:
+ return new ByteTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case SHORT:
+ case INT:
+ case LONG:
+ return new IntegerTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case FLOAT:
+ return new FloatTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case DOUBLE:
+ return new DoubleTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case STRING:
+ return new StringTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case CHAR:
+ return new CharTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case VARCHAR:
+ return new VarcharTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case BINARY:
+ return new BinaryTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case TIMESTAMP:
+ return new TimestampTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case DATE:
+ return new DateTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
+ case DECIMAL:
+ return new DecimalTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
case STRUCT:
- return new StructTreeWriter(streamFactory.getNextColumnId(), inspector,
- streamFactory, nullable);
+ return new StructTreeWriter(streamFactory.getNextColumnId(),
+ inspector, schema, streamFactory, nullable);
case MAP:
return new MapTreeWriter(streamFactory.getNextColumnId(), inspector,
- streamFactory, nullable);
+ schema, streamFactory, nullable);
case LIST:
return new ListTreeWriter(streamFactory.getNextColumnId(), inspector,
- streamFactory, nullable);
+ schema, streamFactory, nullable);
case UNION:
return new UnionTreeWriter(streamFactory.getNextColumnId(), inspector,
- streamFactory, nullable);
+ schema, streamFactory, nullable);
default:
throw new IllegalArgumentException("Bad category: " +
- inspector.getCategory());
+ schema.getCategory());
}
}
private static void writeTypes(OrcProto.Footer.Builder builder,
- TreeWriter treeWriter) {
+ TypeDescription schema) {
OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
- switch (treeWriter.inspector.getCategory()) {
- case PRIMITIVE:
- switch (((PrimitiveObjectInspector) treeWriter.inspector).
- getPrimitiveCategory()) {
- case BOOLEAN:
- type.setKind(OrcProto.Type.Kind.BOOLEAN);
- break;
- case BYTE:
- type.setKind(OrcProto.Type.Kind.BYTE);
- break;
- case SHORT:
- type.setKind(OrcProto.Type.Kind.SHORT);
- break;
- case INT:
- type.setKind(OrcProto.Type.Kind.INT);
- break;
- case LONG:
- type.setKind(OrcProto.Type.Kind.LONG);
- break;
- case FLOAT:
- type.setKind(OrcProto.Type.Kind.FLOAT);
- break;
- case DOUBLE:
- type.setKind(OrcProto.Type.Kind.DOUBLE);
- break;
- case STRING:
- type.setKind(OrcProto.Type.Kind.STRING);
- break;
- case CHAR:
- // The char length needs to be written to file and should be available
- // from the object inspector
- CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo();
- type.setKind(Type.Kind.CHAR);
- type.setMaximumLength(charTypeInfo.getLength());
- break;
- case VARCHAR:
- // The varchar length needs to be written to file and should be available
- // from the object inspector
- VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo();
- type.setKind(Type.Kind.VARCHAR);
- type.setMaximumLength(typeInfo.getLength());
- break;
- case BINARY:
- type.setKind(OrcProto.Type.Kind.BINARY);
- break;
- case TIMESTAMP:
- type.setKind(OrcProto.Type.Kind.TIMESTAMP);
- break;
- case DATE:
- type.setKind(OrcProto.Type.Kind.DATE);
- break;
- case DECIMAL:
- DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)((PrimitiveObjectInspector)treeWriter.inspector).getTypeInfo();
- type.setKind(OrcProto.Type.Kind.DECIMAL);
- type.setPrecision(decTypeInfo.precision());
- type.setScale(decTypeInfo.scale());
- break;
- default:
- throw new IllegalArgumentException("Unknown primitive category: " +
- ((PrimitiveObjectInspector) treeWriter.inspector).
- getPrimitiveCategory());
- }
+ List<TypeDescription> children = schema.getChildren();
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ type.setKind(OrcProto.Type.Kind.CHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case VARCHAR:
+ type.setKind(Type.Kind.VARCHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ type.setPrecision(schema.getPrecision());
+ type.setScale(schema.getScale());
break;
case LIST:
type.setKind(OrcProto.Type.Kind.LIST);
- type.addSubtypes(treeWriter.childrenWriters[0].id);
+ type.addSubtypes(children.get(0).getId());
break;
case MAP:
type.setKind(OrcProto.Type.Kind.MAP);
- type.addSubtypes(treeWriter.childrenWriters[0].id);
- type.addSubtypes(treeWriter.childrenWriters[1].id);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
break;
case STRUCT:
type.setKind(OrcProto.Type.Kind.STRUCT);
- for(TreeWriter child: treeWriter.childrenWriters) {
- type.addSubtypes(child.id);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
}
- for(StructField field: ((StructTreeWriter) treeWriter).fields) {
- type.addFieldNames(field.getFieldName());
+ for(String field: schema.getFieldNames()) {
+ type.addFieldNames(field);
}
break;
case UNION:
type.setKind(OrcProto.Type.Kind.UNION);
- for(TreeWriter child: treeWriter.childrenWriters) {
- type.addSubtypes(child.id);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
}
break;
default:
throw new IllegalArgumentException("Unknown category: " +
- treeWriter.inspector.getCategory());
+ schema.getCategory());
}
builder.addTypes(type);
- for(TreeWriter child: treeWriter.childrenWriters) {
- writeTypes(builder, child);
+ if (children != null) {
+ for(TypeDescription child: children) {
+ writeTypes(builder, child);
+ }
}
}
@@ -2243,73 +2220,58 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
private long computeRawDataSize() {
- long result = 0;
- for (TreeWriter child : treeWriter.getChildrenWriters()) {
- result += getRawDataSizeFromInspectors(child, child.inspector);
- }
- return result;
+ return getRawDataSize(treeWriter, schema);
}
- private long getRawDataSizeFromInspectors(TreeWriter child, ObjectInspector oi) {
+ private long getRawDataSize(TreeWriter child,
+ TypeDescription schema) {
long total = 0;
- switch (oi.getCategory()) {
- case PRIMITIVE:
- total += getRawDataSizeFromPrimitives(child, oi);
- break;
- case LIST:
- case MAP:
- case UNION:
- case STRUCT:
- for (TreeWriter tw : child.childrenWriters) {
- total += getRawDataSizeFromInspectors(tw, tw.inspector);
- }
- break;
- default:
- LOG.debug("Unknown object inspector category.");
- break;
- }
- return total;
- }
-
- private long getRawDataSizeFromPrimitives(TreeWriter child, ObjectInspector oi) {
- long result = 0;
long numVals = child.fileStatistics.getNumberOfValues();
- switch (((PrimitiveObjectInspector) oi).getPrimitiveCategory()) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case FLOAT:
- return numVals * JavaDataModel.get().primitive1();
- case LONG:
- case DOUBLE:
- return numVals * JavaDataModel.get().primitive2();
- case STRING:
- case VARCHAR:
- case CHAR:
- // ORC strings are converted to java Strings. so use JavaDataModel to
- // compute the overall size of strings
- child = (StringTreeWriter) child;
- StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics;
- numVals = numVals == 0 ? 1 : numVals;
- int avgStringLen = (int) (scs.getSum() / numVals);
- return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen);
- case DECIMAL:
- return numVals * JavaDataModel.get().lengthOfDecimal();
- case DATE:
- return numVals * JavaDataModel.get().lengthOfDate();
- case BINARY:
- // get total length of binary blob
- BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics;
- return bcs.getSum();
- case TIMESTAMP:
- return numVals * JavaDataModel.get().lengthOfTimestamp();
- default:
- LOG.debug("Unknown primitive category.");
- break;
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case FLOAT:
+ return numVals * JavaDataModel.get().primitive1();
+ case LONG:
+ case DOUBLE:
+ return numVals * JavaDataModel.get().primitive2();
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ // ORC strings are converted to java Strings. so use JavaDataModel to
+ // compute the overall size of strings
+ StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics;
+ numVals = numVals == 0 ? 1 : numVals;
+ int avgStringLen = (int) (scs.getSum() / numVals);
+ return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen);
+ case DECIMAL:
+ return numVals * JavaDataModel.get().lengthOfDecimal();
+ case DATE:
+ return numVals * JavaDataModel.get().lengthOfDate();
+ case BINARY:
+ // get total length of binary blob
+ BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics;
+ return bcs.getSum();
+ case TIMESTAMP:
+ return numVals * JavaDataModel.get().lengthOfTimestamp();
+ case LIST:
+ case MAP:
+ case UNION:
+ case STRUCT: {
+ TreeWriter[] childWriters = child.getChildrenWriters();
+ List<TypeDescription> childTypes = schema.getChildren();
+ for (int i=0; i < childWriters.length; ++i) {
+ total += getRawDataSize(childWriters[i], childTypes.get(i));
+ }
+ break;
+ }
+ default:
+ LOG.debug("Unknown object inspector category.");
+ break;
}
-
- return result;
+ return total;
}
private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
@@ -2356,7 +2318,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// populate raw data size
rawDataSize = computeRawDataSize();
// serialize the types
- writeTypes(builder, treeWriter);
+ writeTypes(builder, schema);
// add the stripe information
for(OrcProto.StripeInformation stripe: stripes) {
builder.addStripes(stripe);
@@ -2385,7 +2347,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
.setMagic(OrcFile.MAGIC)
.addVersion(version.getMajor())
.addVersion(version.getMinor())
- .setWriterVersion(OrcFile.WriterVersion.HIVE_8732.getId());
+ .setWriterVersion(OrcFile.WriterVersion.HIVE_4243.getId());
if (compress != CompressionKind.NONE) {
builder.setCompressionBlockSize(bufferSize);
}
@@ -2410,6 +2372,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
+ @Override
public void addUserMetadata(String name, ByteBuffer value) {
userMetadata.put(name, ByteString.copyFrom(value));
}
@@ -2493,12 +2460,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
getStream();
long start = rawWriter.getPos();
- long stripeLen = length;
long availBlockSpace = blockSize - (start % blockSize);
// see if stripe can fit in the current hdfs block, else pad the remaining
// space in the block
- if (stripeLen < blockSize && stripeLen > availBlockSpace &&
+ if (length < blockSize && length > availBlockSpace &&
addBlockPadding) {
byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
LOG.info(String.format("Padding ORC by %d bytes while merging..",
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
----------------------------------------------------------------------
diff --git a/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto b/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
index 3b7a9b3..acadef9 100644
--- a/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
+++ b/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
@@ -213,6 +213,7 @@ message PostScript {
// Version of the writer:
// 0 (or missing) = original
// 1 = HIVE-8732 fixed
+ // 2 = HIVE-4243 fixed
optional uint32 writerVersion = 6;
// Leave this last in the record
optional string magic = 8000;
http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
index 4d30377..4e3bc90 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
@@ -48,11 +48,10 @@ public class TestColumnStatistics {
@Test
public void testLongMerge() throws Exception {
- ObjectInspector inspector =
- PrimitiveObjectInspectorFactory.javaIntObjectInspector;
+ TypeDescription schema = TypeDescription.createInt();
- ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
- ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+ ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+ ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
stats1.updateInteger(10);
stats1.updateInteger(10);
stats2.updateInteger(1);
@@ -71,11 +70,10 @@ public class TestColumnStatistics {
@Test
public void testDoubleMerge() throws Exception {
- ObjectInspector inspector =
- PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
+ TypeDescription schema = TypeDescription.createDouble();
- ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
- ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+ ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+ ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
stats1.updateDouble(10.0);
stats1.updateDouble(100.0);
stats2.updateDouble(1.0);
@@ -95,11 +93,10 @@ public class TestColumnStatistics {
@Test
public void testStringMerge() throws Exception {
- ObjectInspector inspector =
- PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+ TypeDescription schema = TypeDescription.createString();
- ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
- ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+ ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+ ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
stats1.updateString(new Text("bob"));
stats1.updateString(new Text("david"));
stats1.updateString(new Text("charles"));
@@ -119,11 +116,10 @@ public class TestColumnStatistics {
@Test
public void testDateMerge() throws Exception {
- ObjectInspector inspector =
- PrimitiveObjectInspectorFactory.javaDateObjectInspector;
+ TypeDescription schema = TypeDescription.createDate();
- ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
- ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+ ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+ ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
stats1.updateDate(new DateWritable(1000));
stats1.updateDate(new DateWritable(100));
stats2.updateDate(new DateWritable(10));
@@ -142,11 +138,10 @@ public class TestColumnStatistics {
@Test
public void testTimestampMerge() throws Exception {
- ObjectInspector inspector =
- PrimitiveObjectInspectorFactory.javaTimestampObjectInspector;
+ TypeDescription schema = TypeDescription.createTimestamp();
- ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
- ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+ ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+ ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
stats1.updateTimestamp(new Timestamp(10));
stats1.updateTimestamp(new Timestamp(100));
stats2.updateTimestamp(new Timestamp(1));
@@ -165,11 +160,11 @@ public class TestColumnStatistics {
@Test
public void testDecimalMerge() throws Exception {
- ObjectInspector inspector =
- PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector;
+ TypeDescription schema = TypeDescription.createDecimal()
+ .withPrecision(38).withScale(16);
- ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
- ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+ ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+ ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
stats1.updateDecimal(HiveDecimal.create(10));
stats1.updateDecimal(HiveDecimal.create(100));
stats2.updateDecimal(HiveDecimal.create(1));