You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/01/12 18:56:49 UTC
[18/18] hive git commit: HIVE-12625: Backport to branch-1 HIVE-11981
ORC Schema Evolution Issues (Vectorized, ACID,
and Non-Vectorized) (Matt McCline,
reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema
evolution (Prasanth Jayachan
HIVE-12625: Backport to branch-1 HIVE-11981 ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)
HIVE-12728: Apply DDL restrictions for ORC schema evolution (Prasanth Jayachandran reviewed by Matt McCline and Gunther Hagleitner)
HIVE-12799: Always use Schema Evolution for ACID (Matt McCline, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0fd9069e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0fd9069e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0fd9069e
Branch: refs/heads/branch-1
Commit: 0fd9069e9b8e6c97d2068cc449aa14c9773027ca
Parents: 9b5f1ff
Author: Matt McCline <mm...@hortonworks.com>
Authored: Tue Jan 12 09:55:57 2016 -0800
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Tue Jan 12 09:55:57 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../mapreduce/FosterStorageHandler.java | 37 +
.../hive/hcatalog/mapreduce/InputJobInfo.java | 8 +-
.../hive/hcatalog/streaming/TestStreaming.java | 2 +
.../hive/ql/txn/compactor/TestCompactor.java | 246 ++-
.../test/resources/testconfiguration.properties | 16 +
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 13 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 102 +-
.../hadoop/hive/ql/exec/FetchOperator.java | 8 +
.../apache/hadoop/hive/ql/exec/MapOperator.java | 25 +-
.../hadoop/hive/ql/exec/TableScanOperator.java | 20 +
.../apache/hadoop/hive/ql/exec/Utilities.java | 110 +-
.../ql/exec/spark/SparkReduceRecordHandler.java | 11 +-
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 2 +-
.../hive/ql/exec/tez/ReduceRecordSource.java | 15 +-
.../hive/ql/exec/vector/BytesColumnVector.java | 99 +-
.../hive/ql/exec/vector/ColumnVector.java | 92 +-
.../ql/exec/vector/DecimalColumnVector.java | 82 +-
.../hive/ql/exec/vector/DoubleColumnVector.java | 67 +-
.../hive/ql/exec/vector/LongColumnVector.java | 73 +-
.../ql/exec/vector/VectorDeserializeRow.java | 37 +-
.../hive/ql/exec/vector/VectorExtractRow.java | 11 +-
.../ql/exec/vector/VectorGroupByOperator.java | 2 +-
.../exec/vector/VectorMapJoinBaseOperator.java | 4 +-
.../exec/vector/VectorSMBMapJoinOperator.java | 2 +-
.../hive/ql/exec/vector/VectorSerializeRow.java | 12 +-
.../ql/exec/vector/VectorizationContext.java | 113 +-
.../ql/exec/vector/VectorizedBatchUtil.java | 201 +-
.../ql/exec/vector/VectorizedColumnarSerDe.java | 277 ---
.../hive/ql/exec/vector/VectorizedRowBatch.java | 20 +
.../ql/exec/vector/VectorizedRowBatchCtx.java | 556 ++----
.../mapjoin/VectorMapJoinCommonOperator.java | 65 +-
.../VectorMapJoinGenerateResultOperator.java | 28 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 19 +
.../hadoop/hive/ql/io/HiveInputFormat.java | 5 +
.../apache/hadoop/hive/ql/io/IOConstants.java | 11 +
.../io/SelfDescribingInputFormatInterface.java | 27 +
.../hive/ql/io/VectorizedRCFileInputFormat.java | 81 -
.../ql/io/VectorizedRCFileRecordReader.java | 261 ---
.../ql/io/orc/ConversionTreeReaderFactory.java | 38 -
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 67 +-
.../hive/ql/io/orc/OrcRawRecordMerger.java | 62 +-
.../apache/hadoop/hive/ql/io/orc/OrcUtils.java | 562 ++++++
.../apache/hadoop/hive/ql/io/orc/Reader.java | 15 +
.../hive/ql/io/orc/RecordReaderFactory.java | 269 ---
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 35 +-
.../hadoop/hive/ql/io/orc/SchemaEvolution.java | 185 ++
.../hive/ql/io/orc/TreeReaderFactory.java | 188 +-
.../hadoop/hive/ql/io/orc/TypeDescription.java | 514 ++++++
.../ql/io/orc/VectorizedOrcAcidRowReader.java | 45 +-
.../ql/io/orc/VectorizedOrcInputFormat.java | 53 +-
.../parquet/VectorizedParquetInputFormat.java | 26 +-
.../hive/ql/optimizer/GenMapRedUtils.java | 18 +
.../hive/ql/optimizer/SimpleFetchOptimizer.java | 1 +
.../hive/ql/optimizer/physical/Vectorizer.java | 501 +++--
.../ql/optimizer/physical/Vectorizer.java.orig | 1744 ------------------
.../apache/hadoop/hive/ql/plan/BaseWork.java | 30 +-
.../hadoop/hive/ql/plan/PartitionDesc.java | 15 +-
.../hive/ql/plan/VectorPartitionConversion.java | 166 ++
.../hive/ql/plan/VectorPartitionDesc.java | 110 ++
.../ql/exec/vector/TestVectorRowObject.java | 5 +-
.../hive/ql/exec/vector/TestVectorSerDeRow.java | 9 +-
.../exec/vector/TestVectorizedRowBatchCtx.java | 355 ----
.../hive/ql/io/orc/TestInputOutputFormat.java | 49 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 42 +
.../ql/io/orc/TestOrcRawRecordMerger.java.orig | 1150 ++++++++++++
.../hive/ql/io/orc/TestVectorizedORCReader.java | 75 +-
.../clientnegative/orc_change_fileformat.q | 3 +
.../clientnegative/orc_change_fileformat_acid.q | 3 +
.../queries/clientnegative/orc_change_serde.q | 3 +
.../clientnegative/orc_change_serde_acid.q | 3 +
.../clientnegative/orc_reorder_columns1.q | 3 +
.../clientnegative/orc_reorder_columns1_acid.q | 3 +
.../clientnegative/orc_reorder_columns2.q | 3 +
.../clientnegative/orc_reorder_columns2_acid.q | 3 +
.../clientnegative/orc_replace_columns1.q | 3 +
.../clientnegative/orc_replace_columns1_acid.q | 3 +
.../clientnegative/orc_replace_columns2.q | 3 +
.../clientnegative/orc_replace_columns2_acid.q | 3 +
.../clientnegative/orc_replace_columns3.q | 4 +
.../clientnegative/orc_replace_columns3_acid.q | 4 +
.../clientnegative/orc_type_promotion1.q | 3 +
.../clientnegative/orc_type_promotion1_acid.q | 3 +
.../clientnegative/orc_type_promotion2.q | 10 +
.../clientnegative/orc_type_promotion2_acid.q | 10 +
.../clientnegative/orc_type_promotion3.q | 3 +
.../clientnegative/orc_type_promotion3_acid.q | 3 +
.../test/queries/clientpositive/dbtxnmgr_ddl1.q | 1 -
.../test/queries/clientpositive/load_orc_part.q | 10 -
.../clientpositive/orc_int_type_promotion.q | 14 +-
.../clientpositive/orc_schema_evolution.q | 39 +
.../schema_evol_orc_acid_mapwork_part.q | 173 ++
.../schema_evol_orc_acid_mapwork_table.q | 131 ++
.../schema_evol_orc_acidvec_mapwork_part.q | 173 ++
.../schema_evol_orc_acidvec_mapwork_table.q | 131 ++
.../schema_evol_orc_nonvec_fetchwork_part.q | 97 +
.../schema_evol_orc_nonvec_fetchwork_table.q | 57 +
.../schema_evol_orc_nonvec_mapwork_part.q | 97 +
.../schema_evol_orc_nonvec_mapwork_table.q | 57 +
.../schema_evol_orc_vec_mapwork_part.q | 97 +
.../schema_evol_orc_vec_mapwork_table.q | 57 +
.../schema_evol_text_fetchwork_table.q | 57 +
.../schema_evol_text_mapwork_table.q | 57 +
.../schema_evol_text_nonvec_fetchwork_part.q | 97 +
.../schema_evol_text_nonvec_fetchwork_table.q | 67 +
.../schema_evol_text_nonvec_mapwork_part.q | 97 +
.../schema_evol_text_nonvec_mapwork_table.q | 67 +
.../clientnegative/orc_change_fileformat.q.out | 13 +
.../orc_change_fileformat_acid.q.out | 13 +
.../clientnegative/orc_change_serde.q.out | 13 +
.../clientnegative/orc_change_serde_acid.q.out | 13 +
.../clientnegative/orc_reorder_columns1.q.out | 13 +
.../orc_reorder_columns1_acid.q.out | 13 +
.../clientnegative/orc_reorder_columns2.q.out | 13 +
.../orc_reorder_columns2_acid.q.out | 13 +
.../clientnegative/orc_replace_columns1.q.out | 13 +
.../orc_replace_columns1_acid.q.out | 13 +
.../clientnegative/orc_replace_columns2.q.out | 13 +
.../orc_replace_columns2_acid.q.out | 13 +
.../clientnegative/orc_replace_columns3.q.out | 21 +
.../orc_replace_columns3_acid.q.out | 21 +
.../clientnegative/orc_type_promotion1.q.out | 13 +
.../orc_type_promotion1_acid.q.out | 13 +
.../clientnegative/orc_type_promotion2.q.out | 69 +
.../orc_type_promotion2_acid.q.out | 69 +
.../clientnegative/orc_type_promotion3.q.out | 13 +
.../orc_type_promotion3_acid.q.out | 13 +
.../results/clientpositive/dbtxnmgr_ddl1.q.out | 9 -
.../results/clientpositive/load_orc_part.q.out | 52 -
.../clientpositive/orc_int_type_promotion.q.out | 86 +-
.../clientpositive/orc_schema_evolution.q.out | 211 +++
.../schema_evol_orc_acid_mapwork_part.q.out | 1037 +++++++++++
.../schema_evol_orc_acid_mapwork_table.q.out | 651 +++++++
.../schema_evol_orc_acidvec_mapwork_part.q.out | 1037 +++++++++++
.../schema_evol_orc_acidvec_mapwork_table.q.out | 651 +++++++
.../schema_evol_orc_nonvec_fetchwork_part.q.out | 642 +++++++
...schema_evol_orc_nonvec_fetchwork_table.q.out | 298 +++
.../schema_evol_orc_nonvec_mapwork_part.q.out | 642 +++++++
.../schema_evol_orc_nonvec_mapwork_table.q.out | 298 +++
.../schema_evol_orc_vec_mapwork_part.q.out | 642 +++++++
.../schema_evol_orc_vec_mapwork_table.q.out | 298 +++
.../schema_evol_text_fetchwork_table.q.out | 298 +++
.../schema_evol_text_mapwork_table.q.out | 298 +++
...schema_evol_text_nonvec_fetchwork_part.q.out | 642 +++++++
...chema_evol_text_nonvec_fetchwork_table.q.out | 297 +++
.../schema_evol_text_nonvec_mapwork_part.q.out | 642 +++++++
.../schema_evol_text_nonvec_mapwork_table.q.out | 297 +++
.../tez/schema_evol_orc_acid_mapwork_part.q.out | 1037 +++++++++++
.../schema_evol_orc_acid_mapwork_table.q.out | 651 +++++++
.../schema_evol_orc_acidvec_mapwork_part.q.out | 1037 +++++++++++
.../schema_evol_orc_acidvec_mapwork_table.q.out | 651 +++++++
.../schema_evol_orc_nonvec_fetchwork_part.q.out | 642 +++++++
...schema_evol_orc_nonvec_fetchwork_table.q.out | 298 +++
.../schema_evol_orc_nonvec_mapwork_part.q.out | 642 +++++++
.../schema_evol_orc_nonvec_mapwork_table.q.out | 298 +++
.../tez/schema_evol_orc_vec_mapwork_part.q.out | 642 +++++++
.../tez/schema_evol_orc_vec_mapwork_table.q.out | 298 +++
.../tez/schema_evol_text_fetchwork_table.q.out | 298 +++
.../tez/schema_evol_text_mapwork_table.q.out | 298 +++
...schema_evol_text_nonvec_fetchwork_part.q.out | 642 +++++++
...chema_evol_text_nonvec_fetchwork_table.q.out | 297 +++
.../schema_evol_text_nonvec_mapwork_part.q.out | 642 +++++++
.../schema_evol_text_nonvec_mapwork_table.q.out | 297 +++
.../tez/vector_partition_diff_num_cols.q.out | 1 +
.../vector_partition_diff_num_cols.q.out | 1 +
.../fast/BinarySortableDeserializeRead.java | 23 +-
.../fast/BinarySortableSerializeWrite.java | 5 -
.../hive/serde2/fast/DeserializeRead.java | 6 +-
.../lazy/fast/LazySimpleDeserializeRead.java | 175 +-
.../lazy/fast/LazySimpleSerializeWrite.java | 11 +-
.../hive/serde2/lazybinary/LazyBinarySerDe.java | 29 +
.../fast/LazyBinaryDeserializeRead.java | 29 +-
.../fast/LazyBinarySerializeWrite.java | 3 +-
.../hive/serde2/typeinfo/TypeInfoUtils.java | 36 +
174 files changed, 24596 insertions(+), 4554 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e793174..787e25e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -910,6 +910,10 @@ public class HiveConf extends Configuration {
"The threshold for the input file size of the small tables; if the file size is smaller \n" +
"than this threshold, it will try to convert the common join into map join"),
+
+ HIVE_SCHEMA_EVOLUTION("hive.exec.schema.evolution", false,
+ "Use schema evolution to convert self-describing file format's data to the schema desired by the reader."),
+
HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0,
"A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."),
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
index 5a95467..bc56d77 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -35,6 +36,10 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hive.hcatalog.common.HCatConstants;
import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -48,6 +53,8 @@ import java.util.Map;
*/
public class FosterStorageHandler extends DefaultStorageHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(FosterStorageHandler.class);
+
public Configuration conf;
/** The directory under which data is initially written for a non partitioned table */
protected static final String TEMP_DIR_NAME = "_TEMP";
@@ -98,6 +105,36 @@ public class FosterStorageHandler extends DefaultStorageHandler {
@Override
public void configureInputJobProperties(TableDesc tableDesc,
Map<String, String> jobProperties) {
+
+ try {
+ Map<String, String> tableProperties = tableDesc.getJobProperties();
+
+ String jobInfoProperty = tableProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ if (jobInfoProperty != null) {
+
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobInfoProperty);
+
+ HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+ HCatSchema dataColumns = tableInfo.getDataColumns();
+ List<HCatFieldSchema> dataFields = dataColumns.getFields();
+ StringBuilder columnNamesSb = new StringBuilder();
+ StringBuilder typeNamesSb = new StringBuilder();
+ for (HCatFieldSchema dataField : dataFields) {
+ if (columnNamesSb.length() > 0) {
+ columnNamesSb.append(",");
+ typeNamesSb.append(":");
+ }
+ columnNamesSb.append(dataField.getName());
+ typeNamesSb.append(dataField.getTypeString());
+ }
+ jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString());
+ jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString());
+
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to set output path", e);
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
index 1f23f3f..7ec6ae3 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
@@ -182,9 +182,11 @@ public class InputJobInfo implements Serializable {
ObjectInputStream partInfoReader =
new ObjectInputStream(new InflaterInputStream(ois));
partitions = (List<PartInfo>)partInfoReader.readObject();
- for (PartInfo partInfo : partitions) {
- if (partInfo.getTableInfo() == null) {
- partInfo.setTableInfo(this.tableInfo);
+ if (partitions != null) {
+ for (PartInfo partInfo : partitions) {
+ if (partInfo.getTableInfo() == null) {
+ partInfo.setTableInfo(this.tableInfo);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 3458b65..ff2598f 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -464,6 +464,8 @@ public class TestStreaming {
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionPath.toString());
job.set("bucket_count", Integer.toString(buckets));
+ job.set("columns", "id,msg");
+ job.set("columns.types", "bigint:string");
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
InputSplit[] splits = inf.getSplits(job, buckets);
Assert.assertEquals(buckets, splits.length);
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index e2910dd..dabe434 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -128,7 +128,194 @@ public class TestCompactor {
driver.close();
}
}
-
+
+ /**
+ * Simple schema evolution add columns with partitioning.
+ * @throws Exception
+ */
+ @Test
+ public void schemaEvolutionAddColDynamicPartitioningInsert() throws Exception {
+ String tblName = "dpct";
+ List<String> colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " PARTITIONED BY(ds string)" +
+ " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+
+ // First INSERT round.
+ executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " +
+ "'today'), (2, 'wilma', 'yesterday')", driver);
+
+ // ALTER TABLE ... ADD COLUMNS
+ executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver);
+
+ // Validate there is an added NULL for column c.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+
+ // Second INSERT round with new inserts into previously existing partition 'yesterday'.
+ executeStatementOnDriver("insert into " + tblName + " partition (ds) values " +
+ "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " +
+ "(5, 'doc', 1902, 'yesterday')",
+ driver);
+
+ // Validate there the new insertions for column c.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+ Initiator initiator = new Initiator();
+ initiator.setThreadId((int)initiator.getId());
+ conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0);
+ initiator.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean();
+ stop.set(true);
+ initiator.init(stop, new AtomicBoolean());
+ initiator.run();
+
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(4, compacts.size());
+ SortedSet<String> partNames = new TreeSet<String>();
+ for (int i = 0; i < compacts.size(); i++) {
+ Assert.assertEquals("default", compacts.get(i).getDbname());
+ Assert.assertEquals(tblName, compacts.get(i).getTablename());
+ Assert.assertEquals("initiated", compacts.get(i).getState());
+ partNames.add(compacts.get(i).getPartitionname());
+ }
+ List<String> names = new ArrayList<String>(partNames);
+ Assert.assertEquals("ds=last_century", names.get(0));
+ Assert.assertEquals("ds=soon", names.get(1));
+ Assert.assertEquals("ds=today", names.get(2));
+ Assert.assertEquals("ds=yesterday", names.get(3));
+
+ // Validate after compaction.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+ }
+
+ @Test
+ public void schemaEvolutionAddColDynamicPartitioningUpdate() throws Exception {
+ String tblName = "udpct";
+ List<String> colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " PARTITIONED BY(ds string)" +
+ " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+ executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " +
+ "'today'), (2, 'wilma', 'yesterday')", driver);
+
+ executeStatementOnDriver("update " + tblName + " set b = 'barney'", driver);
+
+ // Validate the update.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\tyesterday", valuesReadFromHiveDriver.get(1));
+
+ // ALTER TABLE ... ADD COLUMNS
+ executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver);
+
+ // Validate there is an added NULL for column c.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+
+ // Second INSERT round with new inserts into previously existing partition 'yesterday'.
+ executeStatementOnDriver("insert into " + tblName + " partition (ds) values " +
+ "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " +
+ "(5, 'doc', 1902, 'yesterday')",
+ driver);
+
+ // Validate there the new insertions for column c.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+ executeStatementOnDriver("update " + tblName + " set c = 2000", driver);
+
+ // Validate the update of new column c, even in old rows.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4));
+
+ Initiator initiator = new Initiator();
+ initiator.setThreadId((int)initiator.getId());
+ // Set to 1 so insert doesn't set it off but update does
+ conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 1);
+ initiator.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean();
+ stop.set(true);
+ initiator.init(stop, new AtomicBoolean());
+ initiator.run();
+
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(4, compacts.size());
+ SortedSet<String> partNames = new TreeSet<String>();
+ for (int i = 0; i < compacts.size(); i++) {
+ Assert.assertEquals("default", compacts.get(i).getDbname());
+ Assert.assertEquals(tblName, compacts.get(i).getTablename());
+ Assert.assertEquals("initiated", compacts.get(i).getState());
+ partNames.add(compacts.get(i).getPartitionname());
+ }
+ List<String> names = new ArrayList<String>(partNames);
+ Assert.assertEquals("ds=last_century", names.get(0));
+ Assert.assertEquals("ds=soon", names.get(1));
+ Assert.assertEquals("ds=today", names.get(2));
+ Assert.assertEquals("ds=yesterday", names.get(3));
+
+ // Validate after compaction.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4));
+ }
+
/**
* After each major compaction, stats need to be updated on each column of the
* table/partition which previously had stats.
@@ -255,7 +442,9 @@ public class TestCompactor {
t.run();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
+ if (1 != compacts.size()) {
+ Assert.fail("Expecting 1 file and found " + compacts.size() + " files " + compacts.toString());
+ }
Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName,
@@ -409,6 +598,8 @@ public class TestCompactor {
String dbName = "default";
String tblName = "cws";
List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -452,9 +643,12 @@ public class TestCompactor {
}
}
Arrays.sort(names);
- Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"});
- checkExpectedTxnsPresent(null, new Path[]{resultFile}, 0, 1L, 4L);
+ String[] expected = new String[]{"delta_0000001_0000002",
+ "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+ if (!Arrays.deepEquals(expected, names)) {
+ Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+ }
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
} finally {
connection.close();
@@ -466,6 +660,8 @@ public class TestCompactor {
String dbName = "default";
String tblName = "cws";
List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -500,10 +696,12 @@ public class TestCompactor {
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat =
fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
- Assert.assertEquals(1, stat.length);
+ if (1 != stat.length) {
+ Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+ }
String name = stat[0].getPath().getName();
Assert.assertEquals(name, "base_0000004");
- checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
} finally {
connection.close();
}
@@ -514,6 +712,8 @@ public class TestCompactor {
String dbName = "default";
String tblName = "cws";
List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -561,9 +761,12 @@ public class TestCompactor {
}
}
Arrays.sort(names);
- Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
- "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"});
- checkExpectedTxnsPresent(null, new Path[]{resultDelta}, 0, 1L, 4L);
+ String[] expected = new String[]{"delta_0000001_0000002",
+ "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"};
+ if (!Arrays.deepEquals(expected, names)) {
+ Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+ }
+ checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
} finally {
connection.close();
}
@@ -574,6 +777,8 @@ public class TestCompactor {
String dbName = "default";
String tblName = "cws";
List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -613,10 +818,17 @@ public class TestCompactor {
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat =
fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
- Assert.assertEquals(1, stat.length);
+ if (1 != stat.length) {
+ Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat));
+ }
+ if (1 != stat.length) {
+ Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
+ }
String name = stat[0].getPath().getName();
- Assert.assertEquals(name, "base_0000006");
- checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+ if (!name.equals("base_0000006")) {
+ Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
+ }
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
} finally {
connection.close();
}
@@ -642,7 +854,8 @@ public class TestCompactor {
}
}
- private void checkExpectedTxnsPresent(Path base, Path[] deltas, int bucket, long min, long max)
+ private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty,
+ String columnTypesProperty, int bucket, long min, long max)
throws IOException {
ValidTxnList txnList = new ValidTxnList() {
@Override
@@ -678,8 +891,11 @@ public class TestCompactor {
OrcInputFormat aif = new OrcInputFormat();
+ Configuration conf = new Configuration();
+ conf.set("columns", columnNamesProperty);
+ conf.set("columns.types", columnTypesProperty);
AcidInputFormat.RawReader<OrcStruct> reader =
- aif.getRawReader(new Configuration(), false, bucket, txnList, base, deltas);
+ aif.getRawReader(conf, false, bucket, txnList, base, deltas);
RecordIdentifier identifier = reader.createKey();
OrcStruct value = reader.createValue();
long currentTxn = min;
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 1c8a80d..290cff2 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -156,6 +156,22 @@ minitez.query.files.shared=acid_globallimit.q,\
ptf_matchpath.q,\
ptf_streaming.q,\
sample1.q,\
+ schema_evol_text_nonvec_mapwork_table.q,\
+ schema_evol_text_nonvec_fetchwork_table.q,\
+ schema_evol_orc_nonvec_fetchwork_part.q,\
+ schema_evol_orc_nonvec_mapwork_part.q,\
+ schema_evol_text_nonvec_fetchwork_part.q,\
+ schema_evol_text_nonvec_mapwork_part.q,\
+ schema_evol_orc_acid_mapwork_part.q,\
+ schema_evol_orc_acid_mapwork_table.q,\
+ schema_evol_orc_acidvec_mapwork_table.q,\
+ schema_evol_orc_acidvec_mapwork_part.q,\
+ schema_evol_orc_vec_mapwork_part.q,\
+ schema_evol_text_fetchwork_table.q,\
+ schema_evol_text_mapwork_table.q,\
+ schema_evol_orc_vec_mapwork_table.q,\
+ schema_evol_orc_nonvec_mapwork_table.q,\
+ schema_evol_orc_nonvec_fetchwork_table.q,\
selectDistinctStar.q,\
script_env_var1.q,\
script_env_var2.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 34461ed..7acca77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -435,6 +435,12 @@ public enum ErrorMsg {
CTAS_CREATES_VOID_TYPE(10305, "CREATE-TABLE-AS-SELECT creates a VOID type, please use CAST to specify the type, near field: "),
//{2} should be lockid
LOCK_ACQUIRE_TIMEDOUT(10307, "Lock acquisition for {0} timed out after {1}ms. {2}", true),
+ CANNOT_CHANGE_SERDE(10309, "Changing SerDe (from {0}) is not supported for table {1}. File format may be incompatible", true),
+ CANNOT_CHANGE_FILEFORMAT(10310, "Changing file format (from {0}) is not supported for table {1}", true),
+ CANNOT_REORDER_COLUMNS(10311, "Reordering columns is not supported for table {0}. SerDe may be incompatible", true),
+ CANNOT_CHANGE_COLUMN_TYPE(10312, "Changing from type {0} to {1} is not supported for column {2}. SerDe may be incompatible", true),
+ REPLACE_CANNOT_DROP_COLUMNS(10313, "Replacing columns cannot drop columns for table {0}. SerDe may be incompatible", true),
+ REPLACE_UNSUPPORTED_TYPE_CONVERSION(10314, "Replacing columns with unsupported type conversion (from {0} to {1}) for column {2}. SerDe may be incompatible", true),
//========================== 20000 range starts here ========================//
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -497,8 +503,13 @@ public enum ErrorMsg {
+ "data, set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true"),
INVALID_FILE_FORMAT_IN_LOAD(30019, "The file that you are trying to load does not match the" +
- " file format of the destination table.")
+ " file format of the destination table."),
+ SCHEMA_REQUIRED_TO_READ_ACID_TABLES(30020, "Neither the configuration variables " +
+ "schema.evolution.columns / schema.evolution.columns.types " +
+ "nor the " +
+ "columns / columns.types " +
+ "are set. Table schema information is required to read ACID tables")
;
private int errorCode;
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 2a64da3..56a0a4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -48,6 +48,7 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.commons.lang.StringEscapeUtils;
@@ -97,9 +98,12 @@ import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
@@ -3291,6 +3295,14 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
return 0;
}
+ private boolean isSchemaEvolutionEnabled(Table tbl) {
+ boolean isAcid = AcidUtils.isTablePropertyTransactional(tbl.getMetadata());
+ if (isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) {
+ return true;
+ }
+ return false;
+ }
+
private int alterTableOrSinglePartition(AlterTableDesc alterTbl, Table tbl, Partition part)
throws HiveException {
@@ -3336,6 +3348,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
String comment = alterTbl.getNewColComment();
boolean first = alterTbl.getFirst();
String afterCol = alterTbl.getAfterCol();
+ // if orc table, restrict reordering columns as it will break schema evolution
+ boolean isOrcSchemaEvolution =
+ sd.getInputFormat().equals(OrcInputFormat.class.getName()) &&
+ isSchemaEvolutionEnabled(tbl);
+ if (isOrcSchemaEvolution && (first || (afterCol != null && !afterCol.trim().isEmpty()))) {
+ throw new HiveException(ErrorMsg.CANNOT_REORDER_COLUMNS, alterTbl.getOldName());
+ }
FieldSchema column = null;
boolean found = false;
@@ -3352,6 +3371,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
&& !oldColName.equalsIgnoreCase(oldName)) {
throw new HiveException(ErrorMsg.DUPLICATE_COLUMN_NAMES, newName);
} else if (oldColName.equalsIgnoreCase(oldName)) {
+ // if orc table, restrict changing column types. Only integer type promotion is supported.
+ // smallint -> int -> bigint
+ if (isOrcSchemaEvolution && !isSupportedTypeChange(col.getType(), type)) {
+ throw new HiveException(ErrorMsg.CANNOT_CHANGE_COLUMN_TYPE, col.getType(), type,
+ newName);
+ }
col.setName(newName);
if (type != null && !type.trim().equals("")) {
col.setType(type);
@@ -3403,9 +3428,31 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
&& !serializationLib.equals(LazySimpleSerDe.class.getName())
&& !serializationLib.equals(ColumnarSerDe.class.getName())
&& !serializationLib.equals(DynamicSerDe.class.getName())
- && !serializationLib.equals(ParquetHiveSerDe.class.getName())) {
+ && !serializationLib.equals(ParquetHiveSerDe.class.getName())
+ && !serializationLib.equals(OrcSerde.class.getName())) {
throw new HiveException(ErrorMsg.CANNOT_REPLACE_COLUMNS, alterTbl.getOldName());
}
+ final boolean isOrcSchemaEvolution =
+ serializationLib.equals(OrcSerde.class.getName()) &&
+ isSchemaEvolutionEnabled(tbl);
+ // adding columns and limited integer type promotion is supported for ORC schema evolution
+ if (isOrcSchemaEvolution) {
+ final List<FieldSchema> existingCols = sd.getCols();
+ final List<FieldSchema> replaceCols = alterTbl.getNewCols();
+
+ if (replaceCols.size() < existingCols.size()) {
+ throw new HiveException(ErrorMsg.REPLACE_CANNOT_DROP_COLUMNS, alterTbl.getOldName());
+ }
+
+ for (int i = 0; i < existingCols.size(); i++) {
+ final String currentColType = existingCols.get(i).getType().toLowerCase().trim();
+ final String newColType = replaceCols.get(i).getType().toLowerCase().trim();
+ if (!isSupportedTypeChange(currentColType, newColType)) {
+ throw new HiveException(ErrorMsg.REPLACE_UNSUPPORTED_TYPE_CONVERSION, currentColType,
+ newColType, replaceCols.get(i).getName());
+ }
+ }
+ }
sd.setCols(alterTbl.getNewCols());
} else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS) {
tbl.getTTable().getParameters().putAll(alterTbl.getProps());
@@ -3420,6 +3467,14 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
} else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSERDE) {
StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
String serdeName = alterTbl.getSerdeName();
+ String oldSerdeName = sd.getSerdeInfo().getSerializationLib();
+ // if orc table, restrict changing the serde as it can break schema evolution
+ if (isSchemaEvolutionEnabled(tbl) &&
+ oldSerdeName.equalsIgnoreCase(OrcSerde.class.getName()) &&
+ !serdeName.equalsIgnoreCase(OrcSerde.class.getName())) {
+ throw new HiveException(ErrorMsg.CANNOT_CHANGE_SERDE, OrcSerde.class.getSimpleName(),
+ alterTbl.getOldName());
+ }
sd.getSerdeInfo().setSerializationLib(serdeName);
if ((alterTbl.getProps() != null) && (alterTbl.getProps().size() > 0)) {
sd.getSerdeInfo().getParameters().putAll(alterTbl.getProps());
@@ -3434,6 +3489,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
} else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDFILEFORMAT) {
StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
+ // if orc table, restrict changing the file format as it can break schema evolution
+ if (isSchemaEvolutionEnabled(tbl) &&
+ sd.getInputFormat().equals(OrcInputFormat.class.getName())
+ && !alterTbl.getInputFormat().equals(OrcInputFormat.class.getName())) {
+ throw new HiveException(ErrorMsg.CANNOT_CHANGE_FILEFORMAT, "ORC", alterTbl.getOldName());
+ }
sd.setInputFormat(alterTbl.getInputFormat());
sd.setOutputFormat(alterTbl.getOutputFormat());
if (alterTbl.getSerdeName() != null) {
@@ -3554,6 +3615,45 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
return 0;
}
+
+ // don't change the order of enums as ordinal values are used to check for valid type promotions
+ enum PromotableTypes {
+ SMALLINT,
+ INT,
+ BIGINT;
+
+ static List<String> types() {
+ return ImmutableList.of(SMALLINT.toString().toLowerCase(),
+ INT.toString().toLowerCase(), BIGINT.toString().toLowerCase());
+ }
+ }
+
+ // for ORC, only supported type promotions are smallint -> int -> bigint. No other
+ // type promotions are supported at this point
+ private boolean isSupportedTypeChange(String currentType, String newType) {
+ if (currentType != null && newType != null) {
+ currentType = currentType.toLowerCase().trim();
+ newType = newType.toLowerCase().trim();
+ // no type change
+ if (currentType.equals(newType)) {
+ return true;
+ }
+ if (PromotableTypes.types().contains(currentType)
+ && PromotableTypes.types().contains(newType)) {
+ PromotableTypes pCurrentType = PromotableTypes.valueOf(currentType.toUpperCase());
+ PromotableTypes pNewType = PromotableTypes.valueOf(newType.toUpperCase());
+ if (pNewType.ordinal() >= pCurrentType.ordinal()) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Drop a given table or some partitions. DropTableDesc is currently used for both.
*
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 258d28e..b6e5739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -133,6 +133,10 @@ public class FetchOperator implements Serializable {
this.job = job;
this.work = work;
this.operator = operator;
+ if (operator instanceof TableScanOperator) {
+ Utilities.addTableSchemaToConf(job,
+ (TableScanOperator) operator);
+ }
this.vcCols = vcCols;
this.hasVC = vcCols != null && !vcCols.isEmpty();
this.isStatReader = work.getTblDesc() == null;
@@ -598,6 +602,10 @@ public class FetchOperator implements Serializable {
}
private boolean needConversion(PartitionDesc partitionDesc) {
+ boolean isAcid = AcidUtils.isTablePropertyTransactional(partitionDesc.getTableDesc().getProperties());
+ if (Utilities.isSchemaEvolutionEnabled(job, isAcid) && Utilities.isInputFileFormatSelfDescribing(partitionDesc)) {
+ return false;
+ }
return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index f8717ae..afc03ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -38,8 +38,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.MapOperator.MapOpCtx;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
@@ -200,8 +203,14 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
opCtx.partName = String.valueOf(partSpec);
opCtx.deserializer = pd.getDeserializer(hconf);
- StructObjectInspector partRawRowObjectInspector =
- (StructObjectInspector) opCtx.deserializer.getObjectInspector();
+ StructObjectInspector partRawRowObjectInspector;
+ boolean isAcid = AcidUtils.isTablePropertyTransactional(td.getProperties());
+ if (Utilities.isSchemaEvolutionEnabled(hconf, isAcid) && Utilities.isInputFileFormatSelfDescribing(pd)) {
+ partRawRowObjectInspector = tableRowOI;
+ } else {
+ partRawRowObjectInspector =
+ (StructObjectInspector) opCtx.deserializer.getObjectInspector();
+ }
opCtx.partTblObjectInspectorConverter =
ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI);
@@ -302,8 +311,16 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
TableDesc tableDesc = pd.getTableDesc();
Deserializer partDeserializer = pd.getDeserializer(hconf);
- StructObjectInspector partRawRowObjectInspector =
- (StructObjectInspector) partDeserializer.getObjectInspector();
+
+ StructObjectInspector partRawRowObjectInspector;
+ boolean isAcid = AcidUtils.isTablePropertyTransactional(tableDesc.getProperties());
+ if (Utilities.isSchemaEvolutionEnabled(hconf, isAcid) && Utilities.isInputFileFormatSelfDescribing(pd)) {
+ Deserializer tblDeserializer = tableDesc.getDeserializer(hconf);
+ partRawRowObjectInspector = (StructObjectInspector) tblDeserializer.getObjectInspector();
+ } else {
+ partRawRowObjectInspector =
+ (StructObjectInspector) partDeserializer.getObjectInspector();
+ }
StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc);
if ((tblRawRowObjectInspector == null) ||
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index cbf02e9..d98ea84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -70,6 +70,13 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
private String defaultPartitionName;
+ /**
+ * These values are saved during MapWork, FetchWork, etc preparation and later added to the the
+ * JobConf of each task.
+ */
+ private String schemaEvolutionColumns;
+ private String schemaEvolutionColumnsTypes;
+
public TableDesc getTableDesc() {
return tableDesc;
}
@@ -78,6 +85,19 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
this.tableDesc = tableDesc;
}
+ public void setSchemaEvolution(String schemaEvolutionColumns, String schemaEvolutionColumnsTypes) {
+ this.schemaEvolutionColumns = schemaEvolutionColumns;
+ this.schemaEvolutionColumnsTypes = schemaEvolutionColumnsTypes;
+ }
+
+ public String getSchemaEvolutionColumns() {
+ return schemaEvolutionColumns;
+ }
+
+ public String getSchemaEvolutionColumnsTypes() {
+ return schemaEvolutionColumnsTypes;
+ }
+
/**
* Other than gathering statistics for the ANALYZE command, the table scan operator
* does not do anything special other than just forwarding the row. Since the table
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 3352b49..1d8e3b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -107,6 +107,7 @@ import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -122,15 +123,19 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
@@ -175,6 +180,12 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
@@ -478,11 +489,6 @@ public final class Utilities {
}
}
- public static Map<Integer, String> getMapWorkVectorScratchColumnTypeMap(Configuration hiveConf) {
- MapWork mapWork = getMapWork(hiveConf);
- return mapWork.getVectorScratchColumnTypeMap();
- }
-
public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
try {
Graph stageGraph = plan.getQueryPlan().getStageGraph();
@@ -3901,6 +3907,27 @@ public final class Utilities {
return false;
}
+ /**
+ * @param conf
+ * @return the configured VectorizedRowBatchCtx for a MapWork task.
+ */
+ public static VectorizedRowBatchCtx getVectorizedRowBatchCtx(Configuration conf) {
+ VectorizedRowBatchCtx result = null;
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) &&
+ Utilities.getPlanPath(conf) != null) {
+ MapWork mapWork = Utilities.getMapWork(conf);
+ if (mapWork != null && mapWork.getVectorMode()) {
+ result = mapWork.getVectorizedRowBatchCtx();
+ }
+ }
+ return result;
+ }
+
+ public static boolean isVectorMode(Configuration conf, MapWork mapWork) {
+ return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
+ && mapWork.getVectorMode();
+ }
+
public static void clearWorkMapForConf(Configuration conf) {
// Remove cached query plans for the current query only
Path mapPath = getPlanPath(conf, MAP_PLAN_NAME);
@@ -4057,4 +4084,77 @@ public final class Utilities {
(loggingLevel.equalsIgnoreCase("PERFORMANCE") || loggingLevel.equalsIgnoreCase("VERBOSE"));
}
+ public static boolean isSchemaEvolutionEnabled(Configuration conf, boolean isAcid) {
+ return isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
+ }
+
+ public static boolean isInputFileFormatSelfDescribing(PartitionDesc pd) {
+ Class<?> inputFormatClass = pd.getInputFileFormatClass();
+ return SelfDescribingInputFormatInterface.class.isAssignableFrom(inputFormatClass);
+ }
+
+ public static boolean isInputFileFormatVectorized(PartitionDesc pd) {
+ Class<?> inputFormatClass = pd.getInputFileFormatClass();
+ return VectorizedInputFormatInterface.class.isAssignableFrom(inputFormatClass);
+ }
+
+ public static void addSchemaEvolutionToTableScanOperator(Table table,
+ TableScanOperator tableScanOp) {
+ String colNames = MetaStoreUtils.getColumnNamesFromFieldSchema(table.getSd().getCols());
+ String colTypes = MetaStoreUtils.getColumnTypesFromFieldSchema(table.getSd().getCols());
+ tableScanOp.setSchemaEvolution(colNames, colTypes);
+ }
+
+ public static void addSchemaEvolutionToTableScanOperator(StructObjectInspector structOI,
+ TableScanOperator tableScanOp) {
+ String colNames = ObjectInspectorUtils.getFieldNames(structOI);
+ String colTypes = ObjectInspectorUtils.getFieldTypes(structOI);
+ tableScanOp.setSchemaEvolution(colNames, colTypes);
+ }
+
+ public static void unsetSchemaEvolution(Configuration conf) {
+ conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+ conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+ }
+
+ public static void addTableSchemaToConf(Configuration conf,
+ TableScanOperator tableScanOp) {
+ String schemaEvolutionColumns = tableScanOp.getSchemaEvolutionColumns();
+ if (schemaEvolutionColumns != null) {
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, tableScanOp.getSchemaEvolutionColumns());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, tableScanOp.getSchemaEvolutionColumnsTypes());
+ } else {
+ LOG.info("schema.evolution.columns and schema.evolution.columns.types not available");
+ }
+ }
+
+ /**
+ * Create row key and value object inspectors for reduce vectorization.
+ * The row object inspector used by ReduceWork needs to be a **standard**
+ * struct object inspector, not just any struct object inspector.
+ * @param keyInspector
+ * @param valueInspector
+ * @return OI
+ * @throws HiveException
+ */
+ public static StandardStructObjectInspector constructVectorizedReduceRowOI(
+ StructObjectInspector keyInspector, StructObjectInspector valueInspector)
+ throws HiveException {
+
+ ArrayList<String> colNames = new ArrayList<String>();
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
+ for (StructField field: fields) {
+ colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+ ois.add(field.getFieldObjectInspector());
+ }
+ fields = valueInspector.getAllStructFieldRefs();
+ for (StructField field: fields) {
+ colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+ ois.add(field.getFieldObjectInspector());
+ }
+ StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
+
+ return rowObjectInspector;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index ac5e3ca..f9e10c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -153,10 +154,6 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
/* vectorization only works with struct object inspectors */
valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag];
- ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = VectorizedBatchUtil.
- constructVectorizedRowBatch(keyStructInspector,
- valueStructInspectors[tag], gWork.getVectorScratchColumnTypeMap());
- batches[tag] = pair.getFirst();
final int totalColumns = keysColumnOffset
+ valueStructInspectors[tag].getAllStructFieldRefs().size();
valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
@@ -165,7 +162,11 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
.genVectorStructExpressionWritables(valueStructInspectors[tag])));
- rowObjectInspector[tag] = pair.getSecond();
+ rowObjectInspector[tag] = Utilities.constructVectorizedReduceRowOI(keyStructInspector,
+ valueStructInspectors[tag]);
+ batches[tag] = gWork.getVectorizedRowBatchCtx().createVectorizedRowBatch();
+
+
} else {
ois.add(keyObjectInspector);
ois.add(valueObjectInspector[tag]);
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index d649672..5edd587 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -237,7 +237,7 @@ public class ReduceRecordProcessor extends RecordProcessor{
boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode();
sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc,
valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
- redWork.getVectorScratchColumnTypeMap());
+ redWork.getVectorizedRowBatchCtx());
ois[tag] = sources[tag].getObjectInspector();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index b634877..b1d2f52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -51,7 +52,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
@@ -123,7 +123,7 @@ public class ReduceRecordSource implements RecordSource {
void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag,
- Map<Integer, String> vectorScratchColumnTypeMap)
+ VectorizedRowBatchCtx batchContext)
throws Exception {
ObjectInspector keyObjectInspector;
@@ -174,10 +174,9 @@ public class ReduceRecordSource implements RecordSource {
.asList(VectorExpressionWriterFactory
.genVectorStructExpressionWritables(valueStructInspectors)));
- ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair =
- VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, valueStructInspectors, vectorScratchColumnTypeMap);
- rowObjectInspector = pair.getSecond();
- batch = pair.getFirst();
+ rowObjectInspector = Utilities.constructVectorizedReduceRowOI(keyStructInspector,
+ valueStructInspectors);
+ batch = batchContext.createVectorizedRowBatch();
// Setup vectorized deserialization for the key and value.
BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) inputKeyDeserializer;
@@ -185,7 +184,7 @@ public class ReduceRecordSource implements RecordSource {
keyBinarySortableDeserializeToRow =
new VectorDeserializeRow(
new BinarySortableDeserializeRead(
- VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+ VectorizedBatchUtil.typeInfosFromStructObjectInspector(
keyStructInspector),
binarySortableSerDe.getSortOrders()));
keyBinarySortableDeserializeToRow.init(0);
@@ -195,7 +194,7 @@ public class ReduceRecordSource implements RecordSource {
valueLazyBinaryDeserializeToRow =
new VectorDeserializeRow(
new LazyBinaryDeserializeRead(
- VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+ VectorizedBatchUtil.typeInfosFromStructObjectInspector(
valueStructInspectors)));
valueLazyBinaryDeserializeToRow.init(firstValueColumnOffset);
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 8ec7ead..99744cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -18,11 +18,6 @@
package org.apache.hadoop.hive.ql.exec.vector;
-import java.util.Arrays;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
/**
* This class supports string and binary data by value reference -- i.e. each field is
@@ -51,9 +46,6 @@ public class BytesColumnVector extends ColumnVector {
private byte[] buffer; // optional buffer to use when actually copying in data
private int nextFree; // next free position in buffer
- // Reusable text object
- private final Text textObject = new Text();
-
// Estimate that there will be 16 bytes per entry
static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
@@ -165,6 +157,19 @@ public class BytesColumnVector extends ColumnVector {
}
/**
+ * Set a field by actually copying in to a local buffer.
+ * If you must actually copy data in to the array, use this method.
+ * DO NOT USE this method unless it's not practical to set data by reference with setRef().
+ * Setting data by reference tends to run a lot faster than copying data in.
+ *
+ * @param elementNum index within column vector to set
+ * @param sourceBuf container of source data
+ */
+ public void setVal(int elementNum, byte[] sourceBuf) {
+ setVal(elementNum, sourceBuf, 0, sourceBuf.length);
+ }
+
+ /**
* Set a field to the concatenation of two string values. Result data is copied
* into the internal buffer.
*
@@ -215,22 +220,6 @@ public class BytesColumnVector extends ColumnVector {
buffer = newBuffer;
}
- @Override
- public Writable getWritableObject(int index) {
- if (this.isRepeating) {
- index = 0;
- }
- Writable result = null;
- if (!isNull[index] && vector[index] != null) {
- textObject.clear();
- textObject.append(vector[index], start[index], length[index]);
- result = textObject;
- } else {
- result = NullWritable.get();
- }
- return result;
- }
-
/** Copy the current object contents into the output. Only copy selected entries,
* as indicated by selectedInUse and the sel array.
*/
@@ -294,7 +283,7 @@ public class BytesColumnVector extends ColumnVector {
// Only copy data values if entry is not null. The string value
// at position 0 is undefined if the position 0 value is null.
- if (noNulls || (!noNulls && !isNull[0])) {
+ if (noNulls || !isNull[0]) {
// loops start at position 1 because position 0 is already set
if (selectedInUse) {
@@ -320,14 +309,70 @@ public class BytesColumnVector extends ColumnVector {
setRef(0, value, 0, value.length);
}
+ // Fill the column vector with nulls
+ public void fillWithNulls() {
+ noNulls = false;
+ isRepeating = true;
+ vector[0] = null;
+ isNull[0] = true;
+ }
+
@Override
public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
- BytesColumnVector in = (BytesColumnVector) inputVector;
- setVal(outElementNum, in.vector[inputElementNum], in.start[inputElementNum], in.length[inputElementNum]);
+ if (inputVector.isRepeating) {
+ inputElementNum = 0;
+ }
+ if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) {
+ isNull[outElementNum] = false;
+ BytesColumnVector in = (BytesColumnVector) inputVector;
+ setVal(outElementNum, in.vector[inputElementNum],
+ in.start[inputElementNum], in.length[inputElementNum]);
+ } else {
+ isNull[outElementNum] = true;
+ noNulls = false;
+ }
}
@Override
public void init() {
initBuffer(0);
}
+
+ @Override
+ public void stringifyValue(StringBuilder buffer, int row) {
+ if (isRepeating) {
+ row = 0;
+ }
+ if (noNulls || !isNull[row]) {
+ buffer.append('"');
+ buffer.append(new String(this.buffer, start[row], length[row]));
+ buffer.append('"');
+ } else {
+ buffer.append("null");
+ }
+ }
+
+ @Override
+ public void ensureSize(int size, boolean preserveData) {
+ if (size > vector.length) {
+ super.ensureSize(size, preserveData);
+ int[] oldStart = start;
+ start = new int[size];
+ int[] oldLength = length;
+ length = new int[size];
+ byte[][] oldVector = vector;
+ vector = new byte[size][];
+ if (preserveData) {
+ if (isRepeating) {
+ vector[0] = oldVector[0];
+ start[0] = oldStart[0];
+ length[0] = oldLength[0];
+ } else {
+ System.arraycopy(oldVector, 0, vector, 0, oldVector.length);
+ System.arraycopy(oldStart, 0, start, 0 , oldStart.length);
+ System.arraycopy(oldLength, 0, length, 0, oldLength.length);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
index 6654166..fcb1ae9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
@@ -18,10 +18,9 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.io.Writable;
-
/**
* ColumnVector contains the shared structure for the sub-types,
* including NULL information, and whether this vector
@@ -38,10 +37,15 @@ public abstract class ColumnVector {
* The current kinds of column vectors.
*/
public static enum Type {
+ NONE, // Useful when the type of column vector has not be determined yet.
LONG,
DOUBLE,
BYTES,
- DECIMAL
+ DECIMAL,
+ STRUCT,
+ LIST,
+ MAP,
+ UNION
}
/*
@@ -64,8 +68,6 @@ public abstract class ColumnVector {
private boolean preFlattenIsRepeating;
private boolean preFlattenNoNulls;
- public abstract Writable getWritableObject(int index);
-
/**
* Constructor for super-class ColumnVector. This is not called directly,
* but used to initialize inherited fields.
@@ -76,28 +78,42 @@ public abstract class ColumnVector {
isNull = new boolean[len];
noNulls = true;
isRepeating = false;
+ preFlattenNoNulls = true;
+ preFlattenIsRepeating = false;
}
/**
- * Resets the column to default state
- * - fills the isNull array with false
- * - sets noNulls to true
- * - sets isRepeating to false
- */
- public void reset() {
- if (false == noNulls) {
- Arrays.fill(isNull, false);
- }
- noNulls = true;
- isRepeating = false;
+ * Resets the column to default state
+ * - fills the isNull array with false
+ * - sets noNulls to true
+ * - sets isRepeating to false
+ */
+ public void reset() {
+ if (!noNulls) {
+ Arrays.fill(isNull, false);
}
+ noNulls = true;
+ isRepeating = false;
+ preFlattenNoNulls = true;
+ preFlattenIsRepeating = false;
+ }
+
+ /**
+ * Sets the isRepeating flag. Recurses over structs and unions so that the
+ * flags are set correctly.
+ * @param isRepeating
+ */
+ public void setRepeating(boolean isRepeating) {
+ this.isRepeating = isRepeating;
+ }
- abstract public void flatten(boolean selectedInUse, int[] sel, int size);
+ abstract public void flatten(boolean selectedInUse, int[] sel, int size);
// Simplify vector by brute-force flattening noNulls if isRepeating
// This can be used to reduce combinatorial explosion of code paths in VectorExpressions
// with many arguments.
- public void flattenRepeatingNulls(boolean selectedInUse, int[] sel, int size) {
+ protected void flattenRepeatingNulls(boolean selectedInUse, int[] sel,
+ int size) {
boolean nullFillValue;
@@ -120,13 +136,13 @@ public abstract class ColumnVector {
noNulls = false;
}
- public void flattenNoNulls(boolean selectedInUse, int[] sel, int size) {
+ protected void flattenNoNulls(boolean selectedInUse, int[] sel,
+ int size) {
if (noNulls) {
noNulls = false;
if (selectedInUse) {
for (int j = 0; j < size; j++) {
- int i = sel[j];
- isNull[i] = false;
+ isNull[sel[j]] = false;
}
} else {
Arrays.fill(isNull, 0, size, false);
@@ -155,8 +171,10 @@ public abstract class ColumnVector {
/**
* Set the element in this column vector from the given input vector.
+ * This method can assume that the output does not have isRepeating set.
*/
- public abstract void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector);
+ public abstract void setElement(int outElementNum, int inputElementNum,
+ ColumnVector inputVector);
/**
* Initialize the column vector. This method can be overridden by specific column vector types.
@@ -166,5 +184,33 @@ public abstract class ColumnVector {
public void init() {
// Do nothing by default
}
- }
+ /**
+ * Ensure the ColumnVector can hold at least size values.
+ * This method is deliberately *not* recursive because the complex types
+ * can easily have more (or less) children than the upper levels.
+ * @param size the new minimum size
+ * @param presesrveData should the old data be preserved?
+ */
+ public void ensureSize(int size, boolean presesrveData) {
+ if (isNull.length < size) {
+ boolean[] oldArray = isNull;
+ isNull = new boolean[size];
+ if (presesrveData && !noNulls) {
+ if (isRepeating) {
+ isNull[0] = oldArray[0];
+ } else {
+ System.arraycopy(oldArray, 0, isNull, 0, oldArray.length);
+ }
+ }
+ }
+ }
+
+ /**
+ * Print the value for this column into the given string builder.
+ * @param buffer the buffer to print into
+ * @param row the id of the row to print
+ */
+ public abstract void stringifyValue(StringBuilder buffer,
+ int row);
+ }
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
index 5009a42..fe8ad85 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
@@ -17,14 +17,10 @@
*/
package org.apache.hadoop.hive.ql.exec.vector;
-
import java.math.BigInteger;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
public class DecimalColumnVector extends ColumnVector {
@@ -39,8 +35,6 @@ public class DecimalColumnVector extends ColumnVector {
public short scale;
public short precision;
- private final HiveDecimalWritable writableObj = new HiveDecimalWritable();
-
public DecimalColumnVector(int precision, int scale) {
this(VectorizedRowBatch.DEFAULT_SIZE, precision, scale);
}
@@ -49,26 +43,31 @@ public class DecimalColumnVector extends ColumnVector {
super(size);
this.precision = (short) precision;
this.scale = (short) scale;
- final int len = size;
- vector = new HiveDecimalWritable[len];
- for (int i = 0; i < len; i++) {
+ vector = new HiveDecimalWritable[size];
+ for (int i = 0; i < size; i++) {
vector[i] = new HiveDecimalWritable(HiveDecimal.ZERO);
}
}
- @Override
- public Writable getWritableObject(int index) {
- if (isRepeating) {
- index = 0;
- }
- if (!noNulls && isNull[index]) {
- return NullWritable.get();
+ // Fill the all the vector entries with provided value
+ public void fill(HiveDecimal value) {
+ noNulls = true;
+ isRepeating = true;
+ if (vector[0] == null) {
+ vector[0] = new HiveDecimalWritable(value);
} else {
- writableObj.set(vector[index]);
- return writableObj;
+ vector[0].set(value);
}
}
+ // Fill the column vector with nulls
+ public void fillWithNulls() {
+ noNulls = false;
+ isRepeating = true;
+ vector[0] = null;
+ isNull[0] = true;
+ }
+
@Override
public void flatten(boolean selectedInUse, int[] sel, int size) {
// TODO Auto-generated method stub
@@ -76,12 +75,35 @@ public class DecimalColumnVector extends ColumnVector {
@Override
public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
- HiveDecimal hiveDec = ((DecimalColumnVector) inputVector).vector[inputElementNum].getHiveDecimal(precision, scale);
- if (hiveDec == null) {
- noNulls = false;
+ if (inputVector.isRepeating) {
+ inputElementNum = 0;
+ }
+ if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) {
+ HiveDecimal hiveDec =
+ ((DecimalColumnVector) inputVector).vector[inputElementNum]
+ .getHiveDecimal(precision, scale);
+ if (hiveDec == null) {
+ isNull[outElementNum] = true;
+ noNulls = false;
+ } else {
+ isNull[outElementNum] = false;
+ vector[outElementNum].set(hiveDec);
+ }
+ } else {
isNull[outElementNum] = true;
+ noNulls = false;
+ }
+ }
+
+ @Override
+ public void stringifyValue(StringBuilder buffer, int row) {
+ if (isRepeating) {
+ row = 0;
+ }
+ if (noNulls || !isNull[row]) {
+ buffer.append(vector[row].toString());
} else {
- vector[outElementNum].set(hiveDec);
+ buffer.append("null");
}
}
@@ -110,4 +132,20 @@ public class DecimalColumnVector extends ColumnVector {
HiveDecimal minimumNonZeroValue = HiveDecimal.create(BigInteger.ONE, scale);
vector[elementNum].set(minimumNonZeroValue);
}
+
+ @Override
+ public void ensureSize(int size, boolean preserveData) {
+ if (size > vector.length) {
+ super.ensureSize(size, preserveData);
+ HiveDecimalWritable[] oldArray = vector;
+ vector = new HiveDecimalWritable[size];
+ if (preserveData) {
+ // we copy all of the values to avoid creating more objects
+ System.arraycopy(oldArray, 0, vector, 0 , oldArray.length);
+ for(int i= oldArray.length; i < vector.length; ++i) {
+ vector[i] = new HiveDecimalWritable(HiveDecimal.ZERO);
+ }
+ }
+ }
+ }
}