You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/01/12 18:56:47 UTC
[16/18] hive git commit: HIVE-12625: Backport to branch-1 HIVE-11981
ORC Schema Evolution Issues (Vectorized, ACID,
and Non-Vectorized) (Matt McCline,
reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema
evolution (Prasanth Jayachan
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 0adbea1..e1c2f31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
/**
@@ -73,7 +75,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
private static final Log LOG = LogFactory.getLog(VectorMapJoinGenerateResultOperator.class.getName());
private static final String CLASS_NAME = VectorMapJoinGenerateResultOperator.class.getName();
- private transient PrimitiveTypeInfo[] bigTablePrimitiveTypeInfos;
+ //------------------------------------------------------------------------------------------------
+
+ private transient TypeInfo[] bigTableTypeInfos;
private transient VectorSerializeRow bigTableVectorSerializeRow;
@@ -394,14 +398,14 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
private void setupSpillSerDe(VectorizedRowBatch batch) throws HiveException {
- PrimitiveTypeInfo[] inputObjInspectorsTypeInfos =
- VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+ TypeInfo[] inputObjInspectorsTypeInfos =
+ VectorizedBatchUtil.typeInfosFromStructObjectInspector(
(StructObjectInspector) inputObjInspectors[posBigTable]);
List<Integer> projectedColumns = vContext.getProjectedColumns();
int projectionSize = vContext.getProjectedColumns().size();
- List<PrimitiveTypeInfo> typeInfoList = new ArrayList<PrimitiveTypeInfo>();
+ List<TypeInfo> typeInfoList = new ArrayList<TypeInfo>();
List<Integer> noNullsProjectionList = new ArrayList<Integer>();
for (int i = 0; i < projectionSize; i++) {
int projectedColumn = projectedColumns.get(i);
@@ -413,17 +417,19 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
int[] noNullsProjection = ArrayUtils.toPrimitive(noNullsProjectionList.toArray(new Integer[0]));
int noNullsProjectionSize = noNullsProjection.length;
- bigTablePrimitiveTypeInfos = typeInfoList.toArray(new PrimitiveTypeInfo[0]);
+ bigTableTypeInfos = typeInfoList.toArray(new TypeInfo[0]);
bigTableVectorSerializeRow =
- new VectorSerializeRow(new LazyBinarySerializeWrite(noNullsProjectionSize));
+ new VectorSerializeRow(
+ new LazyBinarySerializeWrite(noNullsProjectionSize));
bigTableVectorSerializeRow.init(
- bigTablePrimitiveTypeInfos,
- noNullsProjectionList);
+ bigTableTypeInfos,
+ noNullsProjection);
- bigTableVectorDeserializeRow = new VectorDeserializeRow(
- new LazyBinaryDeserializeRead(bigTablePrimitiveTypeInfos));
+ bigTableVectorDeserializeRow =
+ new VectorDeserializeRow(
+ new LazyBinaryDeserializeRead(bigTableTypeInfos));
bigTableVectorDeserializeRow.init(noNullsProjection);
}
@@ -833,4 +839,4 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
sb.append("]");
return sb.toString();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 8f60e9d..6d4c198 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -33,6 +34,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.regex.Pattern;
/**
@@ -572,4 +575,20 @@ public class AcidUtils {
original.add(stat);
}
}
+
+ public static boolean isTablePropertyTransactional(Properties props) {
+ String resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if (resultStr == null) {
+ resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
+ }
+ return resultStr != null && resultStr.equalsIgnoreCase("true");
+ }
+
+ public static boolean isTablePropertyTransactional(Map<String, String> parameters) {
+ String resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if (resultStr == null) {
+ resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
+ }
+ return resultStr != null && resultStr.equalsIgnoreCase("true");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 2d6e752..181ce84 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -490,11 +490,16 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
// ensure filters are not set from previous pushFilters
jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR);
jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
+
+ Utilities.unsetSchemaEvolution(jobConf);
+
TableScanDesc scanDesc = tableScan.getConf();
if (scanDesc == null) {
return;
}
+ Utilities.addTableSchemaToConf(jobConf, tableScan);
+
// construct column name list and types for reference by filter push down
Utilities.setColumnNameList(jobConf, tableScan);
Utilities.setColumnTypeList(jobConf, tableScan);
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
index 9879dfe..8d94da8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
@@ -36,6 +36,17 @@ public final class IOConstants {
public static final String AVRO = "AVRO";
public static final String AVROFILE = "AVROFILE";
+ /**
+ * The desired TABLE column names and types for input format schema evolution.
+ * This is different than COLUMNS and COLUMNS_TYPES, which are based on individual partition
+ * metadata.
+ *
+ * Virtual columns and partition columns are not included
+ *
+ */
+ public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns";
+ public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types";
+
@VisibleForTesting
public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde";
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
new file mode 100644
index 0000000..6c455bd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+/**
+ * Marker interface to indicate a given input format is self-describing and
+ * can perform schema evolution itself.
+ */
+public interface SelfDescribingInputFormatInterface {
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
deleted file mode 100644
index e9e1d5a..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * A MapReduce/Hive Vectorized input format for RC files.
- */
-public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
- implements InputFormatChecker {
-
- public VectorizedRCFileInputFormat() {
- setMinSplitSize(SequenceFile.SYNC_INTERVAL);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
-
- reporter.setStatus(split.toString());
-
- return new VectorizedRCFileRecordReader(job, (FileSplit) split);
- }
-
- @Override
- public boolean validateInput(FileSystem fs, HiveConf conf,
- List<FileStatus> files) throws IOException {
- if (files.size() <= 0) {
- return false;
- }
- for (int fileId = 0; fileId < files.size(); fileId++) {
- RCFile.Reader reader = null;
- try {
- reader = new RCFile.Reader(fs, files.get(fileId)
- .getPath(), conf);
- reader.close();
- reader = null;
- } catch (IOException e) {
- return false;
- } finally {
- if (null != reader) {
- reader.close();
- }
- }
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
deleted file mode 100644
index 4cc1c2f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.WeakHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-
-/**
- * RCFileRecordReader.
- */
-public class VectorizedRCFileRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> {
-
- private final Reader in;
- private final long start;
- private final long end;
- private boolean more = true;
- protected Configuration conf;
- private final FileSplit split;
- private final boolean useCache;
- private VectorizedRowBatchCtx rbCtx;
- private final LongWritable keyCache = new LongWritable();
- private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable();
- private boolean addPartitionCols = true;
- private final DataOutputBuffer buffer = new DataOutputBuffer();
-
- private static RCFileSyncCache syncCache = new RCFileSyncCache();
-
- private static final class RCFileSyncEntry {
- long end;
- long endSync;
- }
-
- private static final class RCFileSyncCache {
-
- private final Map<String, RCFileSyncEntry> cache;
-
- public RCFileSyncCache() {
- cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>());
- }
-
- public void put(FileSplit split, long endSync) {
- Path path = split.getPath();
- long end = split.getStart() + split.getLength();
- String key = path.toString() + "+" + String.format("%d", end);
-
- RCFileSyncEntry entry = new RCFileSyncEntry();
- entry.end = end;
- entry.endSync = endSync;
- if (entry.endSync >= entry.end) {
- cache.put(key, entry);
- }
- }
-
- public long get(FileSplit split) {
- Path path = split.getPath();
- long start = split.getStart();
- String key = path.toString() + "+" + String.format("%d", start);
- RCFileSyncEntry entry = cache.get(key);
- if (entry != null) {
- return entry.endSync;
- }
- return -1;
- }
- }
-
- public VectorizedRCFileRecordReader(Configuration conf, FileSplit split)
- throws IOException {
-
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(conf);
- this.in = new RCFile.Reader(fs, path, conf);
- this.end = split.getStart() + split.getLength();
- this.conf = conf;
- this.split = split;
-
- useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE);
-
- if (split.getStart() > in.getPosition()) {
- long oldSync = useCache ? syncCache.get(split) : -1;
- if (oldSync == -1) {
- in.sync(split.getStart()); // sync to start
- } else {
- in.seek(oldSync);
- }
- }
-
- this.start = in.getPosition();
-
- more = start < end;
- try {
- rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(conf, split);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public Class<?> getKeyClass() {
- return LongWritable.class;
- }
-
- public Class<?> getValueClass() {
- return BytesRefArrayWritable.class;
- }
-
- @Override
- public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override
- public VectorizedRowBatch createValue() {
- VectorizedRowBatch result;
- try {
- result = rbCtx.createVectorizedRowBatch();
- } catch (HiveException e) {
- throw new RuntimeException("Error creating a batch", e);
- }
- return result;
- }
-
- public boolean nextBlock() throws IOException {
- return in.nextBlock();
- }
-
- @Override
- public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
-
- // Reset column fields noNull values to true
- VectorizedBatchUtil.setNoNullFields(value);
- buffer.reset();
- value.selectedInUse = false;
- for (int i = 0; i < value.numCols; i++) {
- value.cols[i].isRepeating = false;
- }
-
- int i = 0;
- try {
-
- for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
- more = next(keyCache);
- if (more) {
- // Check and update partition cols if necessary. Ideally this should be done
- // in CreateValue() as the partition is constant per split. But since Hive uses
- // CombineHiveRecordReader and as this does not call CreateValue() for
- // each new RecordReader it creates, this check is required in next()
- if (addPartitionCols) {
- rbCtx.addPartitionColsToBatch(value);
- addPartitionCols = false;
- }
- in.getCurrentRow(colsCache);
- // Currently RCFile reader does not support reading vectorized
- // data. Populating the batch by adding one row at a time.
- rbCtx.addRowToBatch(i, (Writable) colsCache, value, buffer);
- } else {
- break;
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("Error while getting next row", e);
- }
- value.size = i;
- return more;
- }
-
- protected boolean next(LongWritable key) throws IOException {
- if (!more) {
- return false;
- }
-
- more = in.next(key);
-
- long lastSeenSyncPos = in.lastSeenSyncPos();
-
- if (lastSeenSyncPos >= end) {
- if (useCache) {
- syncCache.put(split, lastSeenSyncPos);
- }
- more = false;
- return more;
- }
- return more;
- }
-
- /**
- * Return the progress within the input split.
- *
- * @return 0.0 to 1.0 of the input byte range
- */
- public float getProgress() throws IOException {
- if (end == start) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start));
- }
- }
-
- public long getPos() throws IOException {
- return in.getPosition();
- }
-
- public KeyBuffer getKeyBuffer() {
- return in.getCurrentKeyBufferObj();
- }
-
- protected void seek(long pos) throws IOException {
- in.seek(pos);
- }
-
- public void sync(long pos) throws IOException {
- in.sync(pos);
- }
-
- public void resetBuffer() {
- in.resetBuffer();
- }
-
- public long getStart() {
- return start;
- }
-
- public void close() throws IOException {
- in.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
deleted file mode 100644
index aaf4eb4..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Factory for creating ORC tree readers. These tree readers can handle type promotions and type
- * conversions.
- */
-public class ConversionTreeReaderFactory extends TreeReaderFactory {
-
- // TODO: This is currently only a place holder for type conversions.
-
- public static TreeReader createTreeReader(int columnId,
- List<OrcProto.Type> types,
- boolean[] included,
- boolean skipCorrupt
- ) throws IOException {
- return TreeReaderFactory.createTreeReader(columnId, types, included, skipCorrupt);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index e3e6893..d81a12d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -53,7 +54,9 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -103,7 +106,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/
public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
InputFormatChecker, VectorizedInputFormatInterface,
- AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination {
+ SelfDescribingInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>,
+ CombineHiveInputFormat.AvoidSplitCombination {
static enum SplitStrategyKind{
HYBRID,
@@ -222,7 +226,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Configuration conf,
long offset, long length
) throws IOException {
+
+ /**
+ * Do we have schema on read in the configuration variables?
+ *
+ * NOTE: This code path is NOT used by ACID. OrcInputFormat.getRecordReader intercepts for
+ * ACID tables creates raw record merger, etc.
+ */
+ TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ false);
+
Reader.Options options = new Reader.Options().range(offset, length);
+ options.schema(schema);
boolean isOriginal = isOriginal(file);
List<OrcProto.Type> types = file.getTypes();
options.include(genIncludedColumns(types, conf, isOriginal));
@@ -1167,7 +1181,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (vectorMode) {
return (org.apache.hadoop.mapred.RecordReader)
- new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit);
+ new VectorizedOrcAcidRowReader(inner, conf,
+ Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit);
}
return new NullKeyRecordReader(inner, conf);
}
@@ -1218,10 +1233,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
+ // The schema type description does not include the ACID fields (i.e. it is the
+ // non-ACID original schema).
+ private static boolean SCHEMA_TYPES_IS_ORIGINAL = true;
@Override
public RowReader<OrcStruct> getReader(InputSplit inputSplit,
- Options options) throws IOException {
+ Options options)
+ throws IOException {
final OrcSplit split = (OrcSplit) inputSplit;
final Path path = split.getPath();
Path root;
@@ -1236,36 +1255,33 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas());
final Configuration conf = options.getConfiguration();
+
+
+ /**
+ * Do we have schema on read in the configuration variables?
+ */
+ TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true);
+ if (schema == null) {
+ throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg());
+ }
+
final Reader reader;
final int bucket;
- Reader.Options readOptions = new Reader.Options();
+ Reader.Options readOptions = new Reader.Options().schema(schema);
readOptions.range(split.getStart(), split.getLength());
+
+ // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
+ final List<Type> schemaTypes = OrcUtils.getOrcTypes(schema);
+ readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
+ setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
+
if (split.hasBase()) {
bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
.getBucket();
reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
- final List<OrcProto.Type> types = reader.getTypes();
- readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
- setSearchArgument(readOptions, types, conf, split.isOriginal());
} else {
bucket = (int) split.getStart();
reader = null;
- if(deltas != null && deltas.length > 0) {
- Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket);
- OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
- FileSystem fs = readerOptions.getFilesystem();
- if(fs == null) {
- fs = path.getFileSystem(options.getConfiguration());
- }
- if(fs.exists(bucketPath)) {
- /* w/o schema evolution (which ACID doesn't support yet) all delta
- files have the same schema, so choosing the 1st one*/
- final List<OrcProto.Type> types =
- OrcFile.createReader(bucketPath, readerOptions).getTypes();
- readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
- setSearchArgument(readOptions, types, conf, split.isOriginal());
- }
- }
}
String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
Long.MAX_VALUE + ":");
@@ -1278,9 +1294,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@Override
public ObjectInspector getObjectInspector() {
- return ((StructObjectInspector) records.getObjectInspector())
- .getAllStructFieldRefs().get(OrcRecordUpdater.ROW)
- .getFieldObjectInspector();
+ return OrcStruct.createObjectInspector(0, schemaTypes);
}
@Override
@@ -1367,5 +1381,4 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
bucket, validTxnList, new Reader.Options(), deltaDirectory);
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index ab0c364..bad2a4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -18,30 +18,25 @@
package org.apache.hadoop.hive.ql.io.orc;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -57,6 +52,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final Configuration conf;
private final boolean collapse;
private final RecordReader baseReader;
+ private final ObjectInspector objectInspector;
private final long offset;
private final long length;
private final ValidTxnList validTxnList;
@@ -443,6 +439,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
this.offset = options.getOffset();
this.length = options.getLength();
this.validTxnList = validTxnList;
+
+ TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true);
+ if (typeDescr == null) {
+ throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg());
+ }
+
+ objectInspector = OrcRecordUpdater.createEventSchema
+ (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));
+
// modify the options to reflect the event instead of the base row
Reader.Options eventOptions = createEventOptions(options);
if (reader == null) {
@@ -672,46 +677,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
@Override
public ObjectInspector getObjectInspector() {
- // Read the configuration parameters
- String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
- // NOTE: if "columns.types" is missing, all columns will be of String type
- String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
-
- // Parse the configuration parameters
- ArrayList<String> columnNames = new ArrayList<String>();
- Deque<Integer> virtualColumns = new ArrayDeque<Integer>();
- if (columnNameProperty != null && columnNameProperty.length() > 0) {
- String[] colNames = columnNameProperty.split(",");
- for (int i = 0; i < colNames.length; i++) {
- if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) {
- virtualColumns.addLast(i);
- } else {
- columnNames.add(colNames[i]);
- }
- }
- }
- if (columnTypeProperty == null) {
- // Default type: all string
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < columnNames.size(); i++) {
- if (i > 0) {
- sb.append(":");
- }
- sb.append("string");
- }
- columnTypeProperty = sb.toString();
- }
-
- ArrayList<TypeInfo> fieldTypes =
- TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
- while (virtualColumns.size() > 0) {
- fieldTypes.remove(virtualColumns.removeLast());
- }
- StructTypeInfo rowType = new StructTypeInfo();
- rowType.setAllStructFieldNames(columnNames);
- rowType.setAllStructFieldTypeInfos(fieldTypes);
- return OrcRecordUpdater.createEventSchema
- (OrcStruct.createObjectInspector(rowType));
+ return objectInspector;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
index db2ca15..ad4a9e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -30,6 +31,21 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import com.google.common.collect.Lists;
@@ -204,4 +220,550 @@ public class OrcUtils {
return numWriters;
}
+ /**
+ * Convert a Hive type property string that contains separated type names into a list of
+ * TypeDescription objects.
+ * @return the list of TypeDescription objects.
+ */
+ public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty(
+ String hiveTypeProperty) {
+
+ // CONSDIER: We need a type name parser for TypeDescription.
+
+ ArrayList<TypeInfo> typeInfoList = TypeInfoUtils.getTypeInfosFromTypeString(hiveTypeProperty);
+ ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size());
+ for (TypeInfo typeInfo : typeInfoList) {
+ typeDescrList.add(convertTypeInfo(typeInfo));
+ }
+ return typeDescrList;
+ }
+
+ public static TypeDescription convertTypeInfo(TypeInfo info) {
+ switch (info.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
+ switch (pinfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ return TypeDescription.createBoolean();
+ case BYTE:
+ return TypeDescription.createByte();
+ case SHORT:
+ return TypeDescription.createShort();
+ case INT:
+ return TypeDescription.createInt();
+ case LONG:
+ return TypeDescription.createLong();
+ case FLOAT:
+ return TypeDescription.createFloat();
+ case DOUBLE:
+ return TypeDescription.createDouble();
+ case STRING:
+ return TypeDescription.createString();
+ case DATE:
+ return TypeDescription.createDate();
+ case TIMESTAMP:
+ return TypeDescription.createTimestamp();
+ case BINARY:
+ return TypeDescription.createBinary();
+ case DECIMAL: {
+ DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
+ return TypeDescription.createDecimal()
+ .withScale(dinfo.getScale())
+ .withPrecision(dinfo.getPrecision());
+ }
+ case VARCHAR: {
+ BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+ return TypeDescription.createVarchar()
+ .withMaxLength(cinfo.getLength());
+ }
+ case CHAR: {
+ BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+ return TypeDescription.createChar()
+ .withMaxLength(cinfo.getLength());
+ }
+ default:
+ throw new IllegalArgumentException("ORC doesn't handle primitive" +
+ " category " + pinfo.getPrimitiveCategory());
+ }
+ }
+ case LIST: {
+ ListTypeInfo linfo = (ListTypeInfo) info;
+ return TypeDescription.createList
+ (convertTypeInfo(linfo.getListElementTypeInfo()));
+ }
+ case MAP: {
+ MapTypeInfo minfo = (MapTypeInfo) info;
+ return TypeDescription.createMap
+ (convertTypeInfo(minfo.getMapKeyTypeInfo()),
+ convertTypeInfo(minfo.getMapValueTypeInfo()));
+ }
+ case UNION: {
+ UnionTypeInfo minfo = (UnionTypeInfo) info;
+ TypeDescription result = TypeDescription.createUnion();
+ for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
+ result.addUnionChild(convertTypeInfo(child));
+ }
+ return result;
+ }
+ case STRUCT: {
+ StructTypeInfo sinfo = (StructTypeInfo) info;
+ TypeDescription result = TypeDescription.createStruct();
+ for(String fieldName: sinfo.getAllStructFieldNames()) {
+ result.addField(fieldName,
+ convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
+ }
+ return result;
+ }
+ default:
+ throw new IllegalArgumentException("ORC doesn't handle " +
+ info.getCategory());
+ }
+ }
+
+ public static List<OrcProto.Type> getOrcTypes(TypeDescription typeDescr) {
+ List<OrcProto.Type> result = Lists.newArrayList();
+ appendOrcTypes(result, typeDescr);
+ return result;
+ }
+
+ private static void appendOrcTypes(List<OrcProto.Type> result, TypeDescription typeDescr) {
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ List<TypeDescription> children = typeDescr.getChildren();
+ switch (typeDescr.getCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ type.setKind(OrcProto.Type.Kind.CHAR);
+ type.setMaximumLength(typeDescr.getMaxLength());
+ break;
+ case VARCHAR:
+ type.setKind(Type.Kind.VARCHAR);
+ type.setMaximumLength(typeDescr.getMaxLength());
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ type.setPrecision(typeDescr.getPrecision());
+ type.setScale(typeDescr.getScale());
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(children.get(0).getId());
+ break;
+ case MAP:
+ type.setKind(OrcProto.Type.Kind.MAP);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ case STRUCT:
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ for(String field: typeDescr.getFieldNames()) {
+ type.addFieldNames(field);
+ }
+ break;
+ case UNION:
+ type.setKind(OrcProto.Type.Kind.UNION);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " +
+ typeDescr.getCategory());
+ }
+ result.add(type.build());
+ if (children != null) {
+ for(TypeDescription child: children) {
+ appendOrcTypes(result, child);
+ }
+ }
+ }
+
+ /**
+ * NOTE: This method ignores the subtype numbers in the TypeDescription rebuilds the subtype
+ * numbers based on the length of the result list being appended.
+ *
+ * @param result
+ * @param typeInfo
+ */
+ public static void appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result,
+ TypeDescription typeDescr) {
+
+ int subtype = result.size();
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ boolean needsAdd = true;
+ List<TypeDescription> children = typeDescr.getChildren();
+ switch (typeDescr.getCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ type.setKind(OrcProto.Type.Kind.CHAR);
+ type.setMaximumLength(typeDescr.getMaxLength());
+ break;
+ case VARCHAR:
+ type.setKind(Type.Kind.VARCHAR);
+ type.setMaximumLength(typeDescr.getMaxLength());
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ type.setPrecision(typeDescr.getPrecision());
+ type.setScale(typeDescr.getScale());
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(++subtype);
+ result.add(type.build());
+ needsAdd = false;
+ appendOrcTypesRebuildSubtypes(result, children.get(0));
+ break;
+ case MAP:
+ {
+ // Make room for MAP type.
+ result.add(null);
+
+ // Add MAP type pair in order to determine their subtype values.
+ appendOrcTypesRebuildSubtypes(result, children.get(0));
+ int subtype2 = result.size();
+ appendOrcTypesRebuildSubtypes(result, children.get(1));
+ type.setKind(OrcProto.Type.Kind.MAP);
+ type.addSubtypes(subtype + 1);
+ type.addSubtypes(subtype2);
+ result.set(subtype, type.build());
+ needsAdd = false;
+ }
+ break;
+ case STRUCT:
+ {
+ List<String> fieldNames = typeDescr.getFieldNames();
+
+ // Make room for STRUCT type.
+ result.add(null);
+
+ List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size());
+ for(TypeDescription child: children) {
+ int fieldSubtype = result.size();
+ fieldSubtypes.add(fieldSubtype);
+ appendOrcTypesRebuildSubtypes(result, child);
+ }
+
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+
+ for (int i = 0 ; i < fieldNames.size(); i++) {
+ type.addSubtypes(fieldSubtypes.get(i));
+ type.addFieldNames(fieldNames.get(i));
+ }
+ result.set(subtype, type.build());
+ needsAdd = false;
+ }
+ break;
+ case UNION:
+ {
+ // Make room for UNION type.
+ result.add(null);
+
+ List<Integer> unionSubtypes = new ArrayList<Integer>(children.size());
+ for(TypeDescription child: children) {
+ int unionSubtype = result.size();
+ unionSubtypes.add(unionSubtype);
+ appendOrcTypesRebuildSubtypes(result, child);
+ }
+
+ type.setKind(OrcProto.Type.Kind.UNION);
+ for (int i = 0 ; i < children.size(); i++) {
+ type.addSubtypes(unionSubtypes.get(i));
+ }
+ result.set(subtype, type.build());
+ needsAdd = false;
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " + typeDescr.getCategory());
+ }
+ if (needsAdd) {
+ result.add(type.build());
+ }
+ }
+
+ /**
+ * NOTE: This method ignores the subtype numbers in the OrcProto.Type rebuilds the subtype
+ * numbers based on the length of the result list being appended.
+ *
+ * @param result
+ * @param typeInfo
+ */
+ public static int appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result,
+ List<OrcProto.Type> types, int columnId) {
+
+ OrcProto.Type oldType = types.get(columnId++);
+
+ int subtype = result.size();
+ OrcProto.Type.Builder builder = OrcProto.Type.newBuilder();
+ boolean needsAdd = true;
+ switch (oldType.getKind()) {
+ case BOOLEAN:
+ builder.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ builder.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ builder.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ builder.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ builder.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ builder.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ builder.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ builder.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ builder.setKind(OrcProto.Type.Kind.CHAR);
+ builder.setMaximumLength(oldType.getMaximumLength());
+ break;
+ case VARCHAR:
+ builder.setKind(Type.Kind.VARCHAR);
+ builder.setMaximumLength(oldType.getMaximumLength());
+ break;
+ case BINARY:
+ builder.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ builder.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ builder.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ builder.setKind(OrcProto.Type.Kind.DECIMAL);
+ builder.setPrecision(oldType.getPrecision());
+ builder.setScale(oldType.getScale());
+ break;
+ case LIST:
+ builder.setKind(OrcProto.Type.Kind.LIST);
+ builder.addSubtypes(++subtype);
+ result.add(builder.build());
+ needsAdd = false;
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ break;
+ case MAP:
+ {
+ // Make room for MAP type.
+ result.add(null);
+
+ // Add MAP type pair in order to determine their subtype values.
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ int subtype2 = result.size();
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ builder.setKind(OrcProto.Type.Kind.MAP);
+ builder.addSubtypes(subtype + 1);
+ builder.addSubtypes(subtype2);
+ result.set(subtype, builder.build());
+ needsAdd = false;
+ }
+ break;
+ case STRUCT:
+ {
+ List<String> fieldNames = oldType.getFieldNamesList();
+
+ // Make room for STRUCT type.
+ result.add(null);
+
+ List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size());
+ for(int i = 0 ; i < fieldNames.size(); i++) {
+ int fieldSubtype = result.size();
+ fieldSubtypes.add(fieldSubtype);
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ }
+
+ builder.setKind(OrcProto.Type.Kind.STRUCT);
+
+ for (int i = 0 ; i < fieldNames.size(); i++) {
+ builder.addSubtypes(fieldSubtypes.get(i));
+ builder.addFieldNames(fieldNames.get(i));
+ }
+ result.set(subtype, builder.build());
+ needsAdd = false;
+ }
+ break;
+ case UNION:
+ {
+ int subtypeCount = oldType.getSubtypesCount();
+
+ // Make room for UNION type.
+ result.add(null);
+
+ List<Integer> unionSubtypes = new ArrayList<Integer>(subtypeCount);
+ for(int i = 0 ; i < subtypeCount; i++) {
+ int unionSubtype = result.size();
+ unionSubtypes.add(unionSubtype);
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ }
+
+ builder.setKind(OrcProto.Type.Kind.UNION);
+ for (int i = 0 ; i < subtypeCount; i++) {
+ builder.addSubtypes(unionSubtypes.get(i));
+ }
+ result.set(subtype, builder.build());
+ needsAdd = false;
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " + oldType.getKind());
+ }
+ if (needsAdd) {
+ result.add(builder.build());
+ }
+ return columnId;
+ }
+
+ public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcid) {
+
+ String columnNameProperty = null;
+ String columnTypeProperty = null;
+
+ ArrayList<String> schemaEvolutionColumnNames = null;
+ ArrayList<TypeDescription> schemaEvolutionTypeDescrs = null;
+
+ boolean haveSchemaEvolutionProperties = false;
+ if (isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) {
+
+ columnNameProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+ columnTypeProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+
+ haveSchemaEvolutionProperties =
+ (columnNameProperty != null && columnTypeProperty != null);
+
+ if (haveSchemaEvolutionProperties) {
+ schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+ if (schemaEvolutionColumnNames.size() == 0) {
+ haveSchemaEvolutionProperties = false;
+ } else {
+ schemaEvolutionTypeDescrs =
+ OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+ if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+ haveSchemaEvolutionProperties = false;
+ }
+ }
+ }
+ }
+
+ if (haveSchemaEvolutionProperties) {
+ LOG.info("Using schema evolution configuration variables " +
+ "schema.evolution.columns " +
+ schemaEvolutionColumnNames.toString() +
+ " / schema.evolution.columns.types " +
+ schemaEvolutionTypeDescrs.toString() +
+ " (isAcid " +
+ isAcid +
+ ")");
+
+ } else {
+
+ // Try regular properties;
+ columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
+ columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+ if (columnTypeProperty == null || columnNameProperty == null) {
+ return null;
+ }
+
+ schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+ if (schemaEvolutionColumnNames.size() == 0) {
+ return null;
+ }
+ schemaEvolutionTypeDescrs =
+ OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+ if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+ return null;
+ }
+ LOG.info("Using column configuration variables " +
+ "columns " +
+ schemaEvolutionColumnNames.toString() +
+ " / columns.types " +
+ schemaEvolutionTypeDescrs.toString() +
+ " (isAcid " +
+ isAcid +
+ ")");
+ }
+
+ // Desired schema does not include virtual columns or partition columns.
+ TypeDescription result = TypeDescription.createStruct();
+ for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
+ result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i));
+ }
+
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 8558592..6dbe461 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -152,6 +153,7 @@ public interface Reader {
private boolean[] include;
private long offset = 0;
private long length = Long.MAX_VALUE;
+ private TypeDescription schema;
private SearchArgument sarg = null;
private String[] columnNames = null;
@@ -178,6 +180,14 @@ public interface Reader {
}
/**
+ * Set the schema on read type description.
+ */
+ public Options schema(TypeDescription schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ /**
* Set search argument for predicate push down.
* @param sarg the search argument
* @param columnNames the column names for
@@ -201,6 +211,10 @@ public interface Reader {
return length;
}
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
public SearchArgument getSearchArgument() {
return sarg;
}
@@ -222,6 +236,7 @@ public interface Reader {
result.include = include;
result.offset = offset;
result.length = length;
+ result.schema = schema;
result.sarg = sarg;
result.columnNames = columnNames;
return result;
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
deleted file mode 100644
index 8740ee6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-
-import com.google.common.collect.Lists;
-
-/**
- * Factory to create ORC tree readers. It also compares file schema with schema specified on read
- * to see if type promotions are possible.
- */
-public class RecordReaderFactory {
- static final Log LOG = LogFactory.getLog(RecordReaderFactory.class);
- private static final boolean isLogInfoEnabled = LOG.isInfoEnabled();
-
- public static TreeReaderFactory.TreeReader createTreeReader(int colId,
- Configuration conf,
- List<OrcProto.Type> fileSchema,
- boolean[] included,
- boolean skipCorrupt) throws IOException {
- final boolean isAcid = checkAcidSchema(fileSchema);
- final List<OrcProto.Type> originalFileSchema;
- if (isAcid) {
- originalFileSchema = fileSchema.subList(fileSchema.get(0).getSubtypesCount(),
- fileSchema.size());
- } else {
- originalFileSchema = fileSchema;
- }
- final int numCols = originalFileSchema.get(0).getSubtypesCount();
- List<OrcProto.Type> schemaOnRead = getSchemaOnRead(numCols, conf);
- List<OrcProto.Type> schemaUsed = getMatchingSchema(fileSchema, schemaOnRead);
- if (schemaUsed == null) {
- return TreeReaderFactory.createTreeReader(colId, fileSchema, included, skipCorrupt);
- } else {
- return ConversionTreeReaderFactory.createTreeReader(colId, schemaUsed, included, skipCorrupt);
- }
- }
-
- private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
- if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
- List<String> acidFields = OrcRecordUpdater.getAcidEventFields();
- List<String> rootFields = fileSchema.get(0).getFieldNamesList();
- if (acidFields.equals(rootFields)) {
- return true;
- }
- }
- return false;
- }
-
- private static List<OrcProto.Type> getMatchingSchema(List<OrcProto.Type> fileSchema,
- List<OrcProto.Type> schemaOnRead) {
- if (schemaOnRead == null) {
- if (isLogInfoEnabled) {
- LOG.info("Schema is not specified on read. Using file schema.");
- }
- return null;
- }
-
- if (fileSchema.size() != schemaOnRead.size()) {
- if (isLogInfoEnabled) {
- LOG.info("Schema on read column count does not match file schema's column count." +
- " Falling back to using file schema.");
- }
- return null;
- } else {
- List<OrcProto.Type> result = Lists.newArrayList(fileSchema);
- // check type promotion. ORC can only support type promotions for integer types
- // short -> int -> bigint as same integer readers are used for the above types.
- boolean canPromoteType = false;
- for (int i = 0; i < fileSchema.size(); i++) {
- OrcProto.Type fColType = fileSchema.get(i);
- OrcProto.Type rColType = schemaOnRead.get(i);
- if (!fColType.getKind().equals(rColType.getKind())) {
-
- if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
-
- if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
- rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
- // type promotion possible, converting SHORT to INT/LONG requested type
- result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build());
- canPromoteType = true;
- } else {
- canPromoteType = false;
- }
-
- } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
-
- if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
- // type promotion possible, converting INT to LONG requested type
- result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build());
- canPromoteType = true;
- } else {
- canPromoteType = false;
- }
-
- } else {
- canPromoteType = false;
- }
- }
- }
-
- if (canPromoteType) {
- if (isLogInfoEnabled) {
- LOG.info("Integer type promotion happened in ORC record reader. Using promoted schema.");
- }
- return result;
- }
- }
-
- return null;
- }
-
- private static List<OrcProto.Type> getSchemaOnRead(int numCols, Configuration conf) {
- String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
- final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
- if (columnTypeProperty == null || columnNameProperty == null) {
- return null;
- }
-
- ArrayList<String> columnNames = Lists.newArrayList(columnNameProperty.split(","));
- ArrayList<TypeInfo> fieldTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
- StructTypeInfo structTypeInfo = new StructTypeInfo();
- // Column types from conf includes virtual and partition columns at the end. We consider only
- // the actual columns in the file.
- structTypeInfo.setAllStructFieldNames(Lists.newArrayList(columnNames.subList(0, numCols)));
- structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(fieldTypes.subList(0, numCols)));
- ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(structTypeInfo);
- return getOrcTypes(oi);
- }
-
- private static List<OrcProto.Type> getOrcTypes(ObjectInspector inspector) {
- List<OrcProto.Type> result = Lists.newArrayList();
- getOrcTypesImpl(result, inspector);
- return result;
- }
-
- private static void getOrcTypesImpl(List<OrcProto.Type> result, ObjectInspector inspector) {
- OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
- switch (inspector.getCategory()) {
- case PRIMITIVE:
- switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
- case BOOLEAN:
- type.setKind(OrcProto.Type.Kind.BOOLEAN);
- break;
- case BYTE:
- type.setKind(OrcProto.Type.Kind.BYTE);
- break;
- case SHORT:
- type.setKind(OrcProto.Type.Kind.SHORT);
- break;
- case INT:
- type.setKind(OrcProto.Type.Kind.INT);
- break;
- case LONG:
- type.setKind(OrcProto.Type.Kind.LONG);
- break;
- case FLOAT:
- type.setKind(OrcProto.Type.Kind.FLOAT);
- break;
- case DOUBLE:
- type.setKind(OrcProto.Type.Kind.DOUBLE);
- break;
- case STRING:
- type.setKind(OrcProto.Type.Kind.STRING);
- break;
- case CHAR:
- // The char length needs to be written to file and should be available
- // from the object inspector
- CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) inspector)
- .getTypeInfo();
- type.setKind(OrcProto.Type.Kind.CHAR);
- type.setMaximumLength(charTypeInfo.getLength());
- break;
- case VARCHAR:
- // The varchar length needs to be written to file and should be available
- // from the object inspector
- VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) inspector)
- .getTypeInfo();
- type.setKind(OrcProto.Type.Kind.VARCHAR);
- type.setMaximumLength(typeInfo.getLength());
- break;
- case BINARY:
- type.setKind(OrcProto.Type.Kind.BINARY);
- break;
- case TIMESTAMP:
- type.setKind(OrcProto.Type.Kind.TIMESTAMP);
- break;
- case DATE:
- type.setKind(OrcProto.Type.Kind.DATE);
- break;
- case DECIMAL:
- DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) ((PrimitiveObjectInspector) inspector)
- .getTypeInfo();
- type.setKind(OrcProto.Type.Kind.DECIMAL);
- type.setPrecision(decTypeInfo.precision());
- type.setScale(decTypeInfo.scale());
- break;
- default:
- throw new IllegalArgumentException("Unknown primitive category: " +
- ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
- }
- result.add(type.build());
- break;
- case LIST:
- type.setKind(OrcProto.Type.Kind.LIST);
- result.add(type.build());
- getOrcTypesImpl(result, ((ListObjectInspector) inspector).getListElementObjectInspector());
- break;
- case MAP:
- type.setKind(OrcProto.Type.Kind.MAP);
- result.add(type.build());
- getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapKeyObjectInspector());
- getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapValueObjectInspector());
- break;
- case STRUCT:
- type.setKind(OrcProto.Type.Kind.STRUCT);
- result.add(type.build());
- for (StructField field : ((StructObjectInspector) inspector).getAllStructFieldRefs()) {
- getOrcTypesImpl(result, field.getFieldObjectInspector());
- }
- break;
- case UNION:
- type.setKind(OrcProto.Type.Kind.UNION);
- result.add(type.build());
- for (ObjectInspector oi : ((UnionObjectInspector) inspector).getObjectInspectors()) {
- getOrcTypesImpl(result, oi);
- }
- break;
- default:
- throw new IllegalArgumentException("Unknown category: " + inspector.getCategory());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index c2d280d..24834a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -154,15 +156,27 @@ class RecordReaderImpl implements RecordReader {
}
protected RecordReaderImpl(List<StripeInformation> stripes,
- FileSystem fileSystem,
- Path path,
- Reader.Options options,
- List<OrcProto.Type> types,
- CompressionCodec codec,
- int bufferSize,
- long strideRate,
- Configuration conf
- ) throws IOException {
+ FileSystem fileSystem,
+ Path path,
+ Reader.Options options,
+ List<OrcProto.Type> types,
+ CompressionCodec codec,
+ int bufferSize,
+ long strideRate,
+ Configuration conf
+ ) throws IOException {
+
+ TreeReaderSchema treeReaderSchema;
+ if (options.getSchema() == null) {
+ treeReaderSchema = new TreeReaderSchema().fileTypes(types).schemaTypes(types);
+ } else {
+
+ // Now that we are creating a record reader for a file, validate that the schema to read
+ // is compatible with the file schema.
+ //
+ List<Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
+ treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
+ }
this.path = path;
this.file = fileSystem.open(path);
this.codec = codec;
@@ -200,7 +214,7 @@ class RecordReaderImpl implements RecordReader {
firstRow = skippedRows;
totalRowCount = rows;
boolean skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
- reader = RecordReaderFactory.createTreeReader(0, conf, types, included, skipCorrupt);
+ reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt);
indexes = new OrcProto.RowIndex[types.size()];
bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
advanceToNextRow(reader, 0L, true);
@@ -1085,6 +1099,7 @@ class RecordReaderImpl implements RecordReader {
} else {
result = (VectorizedRowBatch) previous;
result.selectedInUse = false;
+ reader.setVectorColumnCount(result.getDataColumnCount());
reader.nextVector(result.cols, (int) batchSize);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
new file mode 100644
index 0000000..9d00eb2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
+
+/**
+ * Take the file types and the (optional) configuration column names/types and see if there
+ * has been schema evolution.
+ */
+public class SchemaEvolution {
+
+ private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
+
+ public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes,
+ List<OrcProto.Type> schemaTypes) throws IOException {
+
+ // For ACID, the row is the ROW field in the outer STRUCT.
+ final boolean isAcid = checkAcidSchema(fileTypes);
+ final List<OrcProto.Type> rowSchema;
+ int rowSubtype;
+ if (isAcid) {
+ rowSubtype = OrcRecordUpdater.ROW + 1;
+ rowSchema = fileTypes.subList(rowSubtype, fileTypes.size());
+ } else {
+ rowSubtype = 0;
+ rowSchema = fileTypes;
+ }
+
+ // Do checking on the overlap. Additional columns will be defaulted to NULL.
+
+ int numFileColumns = rowSchema.get(0).getSubtypesCount();
+ int numDesiredColumns = schemaTypes.get(0).getSubtypesCount();
+
+ int numReadColumns = Math.min(numFileColumns, numDesiredColumns);
+
+ /**
+ * Check type promotion.
+ *
+ * Currently, we only support integer type promotions that can be done "implicitly".
+ * That is, we know that using a bigger integer tree reader on the original smaller integer
+ * column will "just work".
+ *
+ * In the future, other type promotions might require type conversion.
+ */
+ // short -> int -> bigint as same integer readers are used for the above types.
+
+ for (int i = 0; i < numReadColumns; i++) {
+ OrcProto.Type fColType = fileTypes.get(rowSubtype + i);
+ OrcProto.Type rColType = schemaTypes.get(i);
+ if (!fColType.getKind().equals(rColType.getKind())) {
+
+ boolean ok = false;
+ if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
+
+ if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
+ rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+ // type promotion possible, converting SHORT to INT/LONG requested type
+ ok = true;
+ }
+ } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
+
+ if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+ // type promotion possible, converting INT to LONG requested type
+ ok = true;
+ }
+ }
+
+ if (!ok) {
+ throw new IOException("ORC does not support type conversion from " +
+ fColType.getKind().name() + " to " + rColType.getKind().name());
+ }
+ }
+ }
+
+ List<Type> fullSchemaTypes;
+
+ if (isAcid) {
+ fullSchemaTypes = new ArrayList<OrcProto.Type>();
+
+ // This copies the ACID struct type which is subtype = 0.
+ // It has field names "operation" through "row".
+ // And we copy the types for all fields EXCEPT ROW (which must be last!).
+
+ for (int i = 0; i < rowSubtype; i++) {
+ fullSchemaTypes.add(fileTypes.get(i).toBuilder().build());
+ }
+
+ // Add the row struct type.
+ OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0);
+ } else {
+ fullSchemaTypes = schemaTypes;
+ }
+
+ int innerStructSubtype = rowSubtype;
+
+ // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() +
+ // " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString());
+
+ return new TreeReaderSchema().
+ fileTypes(fileTypes).
+ schemaTypes(fullSchemaTypes).
+ innerStructSubtype(innerStructSubtype);
+ }
+
+ private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
+ if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
+ List<String> rootFields = fileSchema.get(0).getFieldNamesList();
+ if (acidEventFieldNames.equals(rootFields)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param typeDescr
+ * @return ORC types for the ACID event based on the row's type description
+ */
+ public static List<Type> createEventSchema(TypeDescription typeDescr) {
+
+ List<Type> result = new ArrayList<Type>();
+
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ type.addAllFieldNames(acidEventFieldNames);
+ for (int i = 0; i < acidEventFieldNames.size(); i++) {
+ type.addSubtypes(i + 1);
+ }
+ result.add(type.build());
+
+ // Automatically add all fields except the last (ROW).
+ for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) {
+ type.clear();
+ type.setKind(acidEventOrcTypeKinds.get(i));
+ result.add(type.build());
+ }
+
+ OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr);
+ return result;
+ }
+
+ public static final List<String> acidEventFieldNames= new ArrayList<String>();
+ static {
+ acidEventFieldNames.add("operation");
+ acidEventFieldNames.add("originalTransaction");
+ acidEventFieldNames.add("bucket");
+ acidEventFieldNames.add("rowId");
+ acidEventFieldNames.add("currentTransaction");
+ acidEventFieldNames.add("row");
+ }
+ public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds =
+ new ArrayList<OrcProto.Type.Kind>();
+ static {
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT);
+ }
+}
\ No newline at end of file