You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/18 23:41:06 UTC
[24/34] hive git commit: HIVE-11981: ORC Schema Evolution Issues
(Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)
HIVE-11981: ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/30f20e99
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/30f20e99
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/30f20e99
Branch: refs/heads/master-fixed
Commit: 30f20e992e05754efc4b984030b01f0184e0359d
Parents: f5f9f30
Author: Matt McCline <mm...@hortonworks.com>
Authored: Wed Nov 18 05:11:35 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Wed Nov 18 13:56:47 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../mapreduce/FosterStorageHandler.java | 37 +
.../hive/hcatalog/mapreduce/InputJobInfo.java | 8 +-
.../hive/hcatalog/streaming/TestStreaming.java | 2 +
.../streaming/mutate/StreamingAssert.java | 2 +
.../hive/ql/txn/compactor/TestCompactor.java | 246 ++-
.../test/resources/testconfiguration.properties | 16 +
.../hive/llap/io/api/impl/LlapInputFormat.java | 40 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 7 +-
.../hadoop/hive/ql/exec/FetchOperator.java | 7 +
.../apache/hadoop/hive/ql/exec/MapOperator.java | 22 +-
.../hadoop/hive/ql/exec/TableScanOperator.java | 20 +
.../apache/hadoop/hive/ql/exec/Utilities.java | 104 +-
.../ql/exec/spark/SparkReduceRecordHandler.java | 11 +-
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 2 +-
.../hive/ql/exec/tez/ReduceRecordSource.java | 18 +-
.../hive/ql/exec/vector/VectorExtractRow.java | 9 +-
.../ql/exec/vector/VectorGroupByOperator.java | 2 +-
.../exec/vector/VectorMapJoinBaseOperator.java | 2 +-
.../exec/vector/VectorSMBMapJoinOperator.java | 2 +-
.../ql/exec/vector/VectorizationContext.java | 35 +-
.../ql/exec/vector/VectorizedBatchUtil.java | 204 +-
.../ql/exec/vector/VectorizedColumnarSerDe.java | 277 ---
.../ql/exec/vector/VectorizedRowBatchCtx.java | 509 ++---
.../mapjoin/VectorMapJoinCommonOperator.java | 66 +-
.../VectorMapJoinGenerateResultOperator.java | 5 +-
.../hadoop/hive/ql/io/HiveInputFormat.java | 5 +
.../apache/hadoop/hive/ql/io/IOConstants.java | 11 +
.../io/SelfDescribingInputFormatInterface.java | 27 +
.../hive/ql/io/VectorizedRCFileInputFormat.java | 81 -
.../ql/io/VectorizedRCFileRecordReader.java | 261 ---
.../ql/io/orc/ConversionTreeReaderFactory.java | 38 -
.../apache/hadoop/hive/ql/io/orc/OrcFile.java | 2 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 60 +-
.../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 84 +-
.../hive/ql/io/orc/OrcRawRecordMerger.java | 61 +-
.../apache/hadoop/hive/ql/io/orc/OrcUtils.java | 547 ++++++
.../apache/hadoop/hive/ql/io/orc/Reader.java | 15 +
.../hive/ql/io/orc/RecordReaderFactory.java | 274 ---
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 17 +-
.../hadoop/hive/ql/io/orc/SchemaEvolution.java | 185 ++
.../hive/ql/io/orc/TreeReaderFactory.java | 182 +-
.../ql/io/orc/VectorizedOrcAcidRowReader.java | 45 +-
.../ql/io/orc/VectorizedOrcInputFormat.java | 45 +-
.../parquet/VectorizedParquetInputFormat.java | 25 +-
.../hive/ql/optimizer/GenMapRedUtils.java | 18 +
.../hive/ql/optimizer/SimpleFetchOptimizer.java | 1 +
.../hive/ql/optimizer/physical/Vectorizer.java | 502 +++--
.../ql/optimizer/physical/Vectorizer.java.orig | 1744 ------------------
.../apache/hadoop/hive/ql/plan/BaseWork.java | 30 +-
.../hadoop/hive/ql/plan/PartitionDesc.java | 15 +-
.../hive/ql/plan/VectorPartitionConversion.java | 166 ++
.../hive/ql/plan/VectorPartitionDesc.java | 110 ++
.../ql/exec/vector/TestVectorRowObject.java | 5 +-
.../hive/ql/exec/vector/TestVectorSerDeRow.java | 10 +-
.../exec/vector/TestVectorizedRowBatchCtx.java | 357 ----
.../hive/ql/io/orc/TestInputOutputFormat.java | 34 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 42 +
.../ql/io/orc/TestOrcRawRecordMerger.java.orig | 1150 ++++++++++++
.../schema_evol_orc_acid_mapwork_part.q | 171 ++
.../schema_evol_orc_acid_mapwork_table.q | 129 ++
.../schema_evol_orc_acidvec_mapwork_part.q | 171 ++
.../schema_evol_orc_acidvec_mapwork_table.q | 129 ++
.../schema_evol_orc_nonvec_fetchwork_part.q | 96 +
.../schema_evol_orc_nonvec_fetchwork_table.q | 56 +
.../schema_evol_orc_nonvec_mapwork_part.q | 96 +
.../schema_evol_orc_nonvec_mapwork_table.q | 56 +
.../schema_evol_orc_vec_mapwork_part.q | 96 +
.../schema_evol_orc_vec_mapwork_table.q | 56 +
.../schema_evol_text_fetchwork_table.q | 56 +
.../schema_evol_text_mapwork_table.q | 56 +
.../schema_evol_text_nonvec_fetchwork_part.q | 96 +
.../schema_evol_text_nonvec_fetchwork_table.q | 66 +
.../schema_evol_text_nonvec_mapwork_part.q | 96 +
.../schema_evol_text_nonvec_mapwork_table.q | 66 +
.../schema_evol_orc_acid_mapwork_part.q.out | 1035 +++++++++++
.../schema_evol_orc_acid_mapwork_table.q.out | 649 +++++++
.../schema_evol_orc_acidvec_mapwork_part.q.out | 1035 +++++++++++
.../schema_evol_orc_acidvec_mapwork_table.q.out | 649 +++++++
.../schema_evol_orc_nonvec_fetchwork_part.q.out | 642 +++++++
...schema_evol_orc_nonvec_fetchwork_table.q.out | 298 +++
.../schema_evol_orc_nonvec_mapwork_part.q.out | 642 +++++++
.../schema_evol_orc_nonvec_mapwork_table.q.out | 298 +++
.../schema_evol_orc_vec_mapwork_part.q.out | 642 +++++++
.../schema_evol_orc_vec_mapwork_table.q.out | 298 +++
.../schema_evol_text_fetchwork_table.q.out | 298 +++
.../schema_evol_text_mapwork_table.q.out | 298 +++
...schema_evol_text_nonvec_fetchwork_part.q.out | 642 +++++++
...chema_evol_text_nonvec_fetchwork_table.q.out | 297 +++
.../schema_evol_text_nonvec_mapwork_part.q.out | 642 +++++++
.../schema_evol_text_nonvec_mapwork_table.q.out | 297 +++
.../tez/schema_evol_orc_acid_mapwork_part.q.out | 1035 +++++++++++
.../schema_evol_orc_acid_mapwork_table.q.out | 649 +++++++
.../schema_evol_orc_acidvec_mapwork_part.q.out | 1035 +++++++++++
.../schema_evol_orc_acidvec_mapwork_table.q.out | 649 +++++++
.../schema_evol_orc_nonvec_fetchwork_part.q.out | 642 +++++++
...schema_evol_orc_nonvec_fetchwork_table.q.out | 298 +++
.../schema_evol_orc_nonvec_mapwork_part.q.out | 642 +++++++
.../schema_evol_orc_nonvec_mapwork_table.q.out | 298 +++
.../tez/schema_evol_orc_vec_mapwork_part.q.out | 642 +++++++
.../tez/schema_evol_orc_vec_mapwork_table.q.out | 298 +++
.../tez/schema_evol_text_fetchwork_table.q.out | 298 +++
.../tez/schema_evol_text_mapwork_table.q.out | 298 +++
...schema_evol_text_nonvec_fetchwork_part.q.out | 642 +++++++
...chema_evol_text_nonvec_fetchwork_table.q.out | 297 +++
.../schema_evol_text_nonvec_mapwork_part.q.out | 642 +++++++
.../schema_evol_text_nonvec_mapwork_table.q.out | 297 +++
.../tez/vector_partition_diff_num_cols.q.out | 1 +
.../vector_partition_diff_num_cols.q.out | 1 +
.../hive/serde2/typeinfo/TypeInfoUtils.java | 36 +
.../hive/ql/exec/vector/VectorizedRowBatch.java | 20 +
111 files changed, 22617 insertions(+), 4063 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 953e52c..7cab9ae 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -969,6 +969,10 @@ public class HiveConf extends Configuration {
"The threshold for the input file size of the small tables; if the file size is smaller \n" +
"than this threshold, it will try to convert the common join into map join"),
+
+ HIVE_SCHEMA_EVOLUTION("hive.exec.schema.evolution", true,
+ "Use schema evolution to convert self-describing file format's data to the schema desired by the reader."),
+
HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0,
"A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."),
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
index 5a95467..bc56d77 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -35,6 +36,10 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hive.hcatalog.common.HCatConstants;
import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -48,6 +53,8 @@ import java.util.Map;
*/
public class FosterStorageHandler extends DefaultStorageHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(FosterStorageHandler.class);
+
public Configuration conf;
/** The directory under which data is initially written for a non partitioned table */
protected static final String TEMP_DIR_NAME = "_TEMP";
@@ -98,6 +105,36 @@ public class FosterStorageHandler extends DefaultStorageHandler {
@Override
public void configureInputJobProperties(TableDesc tableDesc,
Map<String, String> jobProperties) {
+
+ try {
+ Map<String, String> tableProperties = tableDesc.getJobProperties();
+
+ String jobInfoProperty = tableProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ if (jobInfoProperty != null) {
+
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobInfoProperty);
+
+ HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+ HCatSchema dataColumns = tableInfo.getDataColumns();
+ List<HCatFieldSchema> dataFields = dataColumns.getFields();
+ StringBuilder columnNamesSb = new StringBuilder();
+ StringBuilder typeNamesSb = new StringBuilder();
+ for (HCatFieldSchema dataField : dataFields) {
+ if (columnNamesSb.length() > 0) {
+ columnNamesSb.append(",");
+ typeNamesSb.append(":");
+ }
+ columnNamesSb.append(dataField.getName());
+ typeNamesSb.append(dataField.getTypeString());
+ }
+ jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString());
+ jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString());
+
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to set output path", e);
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
index 1f23f3f..7ec6ae3 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
@@ -182,9 +182,11 @@ public class InputJobInfo implements Serializable {
ObjectInputStream partInfoReader =
new ObjectInputStream(new InflaterInputStream(ois));
partitions = (List<PartInfo>)partInfoReader.readObject();
- for (PartInfo partInfo : partitions) {
- if (partInfo.getTableInfo() == null) {
- partInfo.setTableInfo(this.tableInfo);
+ if (partitions != null) {
+ for (PartInfo partInfo : partitions) {
+ if (partInfo.getTableInfo() == null) {
+ partInfo.setTableInfo(this.tableInfo);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 806dbdb..1723ff1 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -451,6 +451,8 @@ public class TestStreaming {
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionPath.toString());
job.set("bucket_count", Integer.toString(buckets));
+ job.set("columns", "id,msg");
+ job.set("columns.types", "bigint:string");
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
InputSplit[] splits = inf.getSplits(job, buckets);
Assert.assertEquals(buckets, splits.length);
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index 477ed8c..339e9ef 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -128,6 +128,8 @@ public class StreamingAssert {
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionLocation.toString());
job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets()));
+ job.set("columns", "id,msg");
+ job.set("columns.types", "bigint:string");
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
InputSplit[] splits = inputFormat.getSplits(job, 1);
assertEquals(1, splits.length);
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index e2910dd..dabe434 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -128,7 +128,194 @@ public class TestCompactor {
driver.close();
}
}
-
+
+ /**
+ * Simple schema evolution add columns with partitioning.
+ * @throws Exception
+ */
+ @Test
+ public void schemaEvolutionAddColDynamicPartitioningInsert() throws Exception {
+ String tblName = "dpct";
+ List<String> colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " PARTITIONED BY(ds string)" +
+ " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+
+ // First INSERT round.
+ executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " +
+ "'today'), (2, 'wilma', 'yesterday')", driver);
+
+ // ALTER TABLE ... ADD COLUMNS
+ executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver);
+
+ // Validate there is an added NULL for column c.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+
+ // Second INSERT round with new inserts into previously existing partition 'yesterday'.
+ executeStatementOnDriver("insert into " + tblName + " partition (ds) values " +
+ "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " +
+ "(5, 'doc', 1902, 'yesterday')",
+ driver);
+
+ // Validate there the new insertions for column c.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+ Initiator initiator = new Initiator();
+ initiator.setThreadId((int)initiator.getId());
+ conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0);
+ initiator.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean();
+ stop.set(true);
+ initiator.init(stop, new AtomicBoolean());
+ initiator.run();
+
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(4, compacts.size());
+ SortedSet<String> partNames = new TreeSet<String>();
+ for (int i = 0; i < compacts.size(); i++) {
+ Assert.assertEquals("default", compacts.get(i).getDbname());
+ Assert.assertEquals(tblName, compacts.get(i).getTablename());
+ Assert.assertEquals("initiated", compacts.get(i).getState());
+ partNames.add(compacts.get(i).getPartitionname());
+ }
+ List<String> names = new ArrayList<String>(partNames);
+ Assert.assertEquals("ds=last_century", names.get(0));
+ Assert.assertEquals("ds=soon", names.get(1));
+ Assert.assertEquals("ds=today", names.get(2));
+ Assert.assertEquals("ds=yesterday", names.get(3));
+
+ // Validate after compaction.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+ }
+
+ @Test
+ public void schemaEvolutionAddColDynamicPartitioningUpdate() throws Exception {
+ String tblName = "udpct";
+ List<String> colNames = Arrays.asList("a", "b");
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " PARTITIONED BY(ds string)" +
+ " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+ executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " +
+ "'today'), (2, 'wilma', 'yesterday')", driver);
+
+ executeStatementOnDriver("update " + tblName + " set b = 'barney'", driver);
+
+ // Validate the update.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\tyesterday", valuesReadFromHiveDriver.get(1));
+
+ // ALTER TABLE ... ADD COLUMNS
+ executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver);
+
+ // Validate there is an added NULL for column c.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+
+ // Second INSERT round with new inserts into previously existing partition 'yesterday'.
+ executeStatementOnDriver("insert into " + tblName + " partition (ds) values " +
+ "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " +
+ "(5, 'doc', 1902, 'yesterday')",
+ driver);
+
+ // Validate there the new insertions for column c.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+ executeStatementOnDriver("update " + tblName + " set c = 2000", driver);
+
+ // Validate the update of new column c, even in old rows.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4));
+
+ Initiator initiator = new Initiator();
+ initiator.setThreadId((int)initiator.getId());
+ // Set to 1 so insert doesn't set it off but update does
+ conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 1);
+ initiator.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean();
+ stop.set(true);
+ initiator.init(stop, new AtomicBoolean());
+ initiator.run();
+
+ CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(4, compacts.size());
+ SortedSet<String> partNames = new TreeSet<String>();
+ for (int i = 0; i < compacts.size(); i++) {
+ Assert.assertEquals("default", compacts.get(i).getDbname());
+ Assert.assertEquals(tblName, compacts.get(i).getTablename());
+ Assert.assertEquals("initiated", compacts.get(i).getState());
+ partNames.add(compacts.get(i).getPartitionname());
+ }
+ List<String> names = new ArrayList<String>(partNames);
+ Assert.assertEquals("ds=last_century", names.get(0));
+ Assert.assertEquals("ds=soon", names.get(1));
+ Assert.assertEquals("ds=today", names.get(2));
+ Assert.assertEquals("ds=yesterday", names.get(3));
+
+ // Validate after compaction.
+ executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+ valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+ Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0));
+ Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1));
+ Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2));
+ Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3));
+ Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4));
+ }
+
/**
* After each major compaction, stats need to be updated on each column of the
* table/partition which previously had stats.
@@ -255,7 +442,9 @@ public class TestCompactor {
t.run();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
+ if (1 != compacts.size()) {
+ Assert.fail("Expecting 1 file and found " + compacts.size() + " files " + compacts.toString());
+ }
Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName,
@@ -409,6 +598,8 @@ public class TestCompactor {
String dbName = "default";
String tblName = "cws";
List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -452,9 +643,12 @@ public class TestCompactor {
}
}
Arrays.sort(names);
- Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"});
- checkExpectedTxnsPresent(null, new Path[]{resultFile}, 0, 1L, 4L);
+ String[] expected = new String[]{"delta_0000001_0000002",
+ "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+ if (!Arrays.deepEquals(expected, names)) {
+ Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+ }
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
} finally {
connection.close();
@@ -466,6 +660,8 @@ public class TestCompactor {
String dbName = "default";
String tblName = "cws";
List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -500,10 +696,12 @@ public class TestCompactor {
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat =
fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
- Assert.assertEquals(1, stat.length);
+ if (1 != stat.length) {
+ Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+ }
String name = stat[0].getPath().getName();
Assert.assertEquals(name, "base_0000004");
- checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
} finally {
connection.close();
}
@@ -514,6 +712,8 @@ public class TestCompactor {
String dbName = "default";
String tblName = "cws";
List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -561,9 +761,12 @@ public class TestCompactor {
}
}
Arrays.sort(names);
- Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
- "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"});
- checkExpectedTxnsPresent(null, new Path[]{resultDelta}, 0, 1L, 4L);
+ String[] expected = new String[]{"delta_0000001_0000002",
+ "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"};
+ if (!Arrays.deepEquals(expected, names)) {
+ Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+ }
+ checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
} finally {
connection.close();
}
@@ -574,6 +777,8 @@ public class TestCompactor {
String dbName = "default";
String tblName = "cws";
List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -613,10 +818,17 @@ public class TestCompactor {
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat =
fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
- Assert.assertEquals(1, stat.length);
+ if (1 != stat.length) {
+ Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat));
+ }
+ if (1 != stat.length) {
+ Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
+ }
String name = stat[0].getPath().getName();
- Assert.assertEquals(name, "base_0000006");
- checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+ if (!name.equals("base_0000006")) {
+ Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
+ }
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
} finally {
connection.close();
}
@@ -642,7 +854,8 @@ public class TestCompactor {
}
}
- private void checkExpectedTxnsPresent(Path base, Path[] deltas, int bucket, long min, long max)
+ private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty,
+ String columnTypesProperty, int bucket, long min, long max)
throws IOException {
ValidTxnList txnList = new ValidTxnList() {
@Override
@@ -678,8 +891,11 @@ public class TestCompactor {
OrcInputFormat aif = new OrcInputFormat();
+ Configuration conf = new Configuration();
+ conf.set("columns", columnNamesProperty);
+ conf.set("columns.types", columnTypesProperty);
AcidInputFormat.RawReader<OrcStruct> reader =
- aif.getRawReader(new Configuration(), false, bucket, txnList, base, deltas);
+ aif.getRawReader(conf, false, bucket, txnList, base, deltas);
RecordIdentifier identifier = reader.createKey();
OrcStruct value = reader.createValue();
long currentTxn = min;
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 402914c..ece43cc 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -162,6 +162,22 @@ minitez.query.files.shared=alter_merge_2_orc.q,\
ptf_matchpath.q,\
ptf_streaming.q,\
sample1.q,\
+ schema_evol_text_nonvec_mapwork_table.q,\
+ schema_evol_text_nonvec_fetchwork_table.q,\
+ schema_evol_orc_nonvec_fetchwork_part.q,\
+ schema_evol_orc_nonvec_mapwork_part.q,\
+ schema_evol_text_nonvec_fetchwork_part.q,\
+ schema_evol_text_nonvec_mapwork_part.q,\
+ schema_evol_orc_acid_mapwork_part.q,\
+ schema_evol_orc_acid_mapwork_table.q,\
+ schema_evol_orc_acidvec_mapwork_table.q,\
+ schema_evol_orc_acidvec_mapwork_part.q,\
+ schema_evol_orc_vec_mapwork_part.q,\
+ schema_evol_text_fetchwork_table.q,\
+ schema_evol_text_mapwork_table.q,\
+ schema_evol_orc_vec_mapwork_table.q,\
+ schema_evol_orc_nonvec_mapwork_table.q,\
+ schema_evol_orc_nonvec_fetchwork_table.q,\
selectDistinctStar.q,\
script_env_var1.q,\
script_env_var2.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 51f4c8e..b57366c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -33,11 +33,13 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
@@ -54,7 +56,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
public class LlapInputFormat
- implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface {
+ implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface,
+ SelfDescribingInputFormatInterface {
@SuppressWarnings("rawtypes")
private final InputFormat sourceInputFormat;
private final ColumnVectorProducer cvp;
@@ -104,6 +107,8 @@ public class LlapInputFormat
private final SearchArgument sarg;
private final String[] columnNames;
private final VectorizedRowBatchCtx rbCtx;
+ private final boolean[] columnsToIncludeTruncated;
+ private final Object[] partitionValues;
private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
private ColumnVectorBatch lastCvb = null;
@@ -118,19 +123,28 @@ public class LlapInputFormat
private long firstReturnTime;
public LlapRecordReader(
- JobConf job, FileSplit split, List<Integer> includedCols, String hostName) {
+ JobConf job, FileSplit split, List<Integer> includedCols, String hostName)
+ throws IOException {
this.split = split;
this.columnIds = includedCols;
this.sarg = ConvertAstToSearchArg.createFromConf(job);
this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
this.counters = new QueryFragmentCounters(job);
this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
- try {
- rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(job, split);
- } catch (Exception e) {
- throw new RuntimeException(e);
+
+ MapWork mapWork = Utilities.getMapWork(job);
+ rbCtx = mapWork.getVectorizedRowBatchCtx();
+
+ columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(job);
+
+ int partitionColumnCount = rbCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ rbCtx.getPartitionValues(rbCtx, job, split, partitionValues);
+ } else {
+ partitionValues = null;
}
+
startRead();
}
@@ -143,10 +157,8 @@ public class LlapInputFormat
// Add partition cols if necessary (see VectorizedOrcInputFormat for details).
boolean wasFirst = isFirst;
if (isFirst) {
- try {
- rbCtx.addPartitionColsToBatch(value);
- } catch (HiveException e) {
- throw new IOException(e);
+ if (partitionValues != null) {
+ rbCtx.addPartitionColsToBatch(value, partitionValues);
}
isFirst = false;
}
@@ -244,11 +256,7 @@ public class LlapInputFormat
@Override
public VectorizedRowBatch createValue() {
- try {
- return rbCtx.createVectorizedRowBatch();
- } catch (HiveException e) {
- throw new RuntimeException("Error creating a batch", e);
- }
+ return rbCtx.createVectorizedRowBatch(columnsToIncludeTruncated);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 39a881a..892587a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -484,8 +484,13 @@ public enum ErrorMsg {
INVALID_FILE_FORMAT_IN_LOAD(30019, "The file that you are trying to load does not match the" +
- " file format of the destination table.")
+ " file format of the destination table."),
+ SCHEMA_REQUIRED_TO_READ_ACID_TABLES(30020, "Neither the configuration variables " +
+ "schema.evolution.columns / schema.evolution.columns.types " +
+ "nor the " +
+ "columns / columns.types " +
+ "are set. Table schema information is required to read ACID tables")
;
private int errorCode;
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 157115b..ad36093 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -132,6 +132,10 @@ public class FetchOperator implements Serializable {
this.job = job;
this.work = work;
this.operator = operator;
+ if (operator instanceof TableScanOperator) {
+ Utilities.addTableSchemaToConf(job,
+ (TableScanOperator) operator);
+ }
this.vcCols = vcCols;
this.hasVC = vcCols != null && !vcCols.isEmpty();
this.isStatReader = work.getTblDesc() == null;
@@ -599,6 +603,9 @@ public class FetchOperator implements Serializable {
}
private boolean needConversion(PartitionDesc partitionDesc) {
+ if (Utilities.isInputFileFormatSelfDescribing(partitionDesc)) {
+ return false;
+ }
return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index c2a5726..99724c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.MapOperator.MapOpCtx;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
@@ -202,8 +204,13 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
opCtx.partName = String.valueOf(partSpec);
opCtx.deserializer = pd.getDeserializer(hconf);
- StructObjectInspector partRawRowObjectInspector =
- (StructObjectInspector) opCtx.deserializer.getObjectInspector();
+ StructObjectInspector partRawRowObjectInspector;
+ if (Utilities.isInputFileFormatSelfDescribing(pd)) {
+ partRawRowObjectInspector = tableRowOI;
+ } else {
+ partRawRowObjectInspector =
+ (StructObjectInspector) opCtx.deserializer.getObjectInspector();
+ }
opCtx.partTblObjectInspectorConverter =
ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI);
@@ -304,8 +311,15 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
TableDesc tableDesc = pd.getTableDesc();
Deserializer partDeserializer = pd.getDeserializer(hconf);
- StructObjectInspector partRawRowObjectInspector =
- (StructObjectInspector) partDeserializer.getObjectInspector();
+
+ StructObjectInspector partRawRowObjectInspector;
+ if (Utilities.isInputFileFormatSelfDescribing(pd)) {
+ Deserializer tblDeserializer = tableDesc.getDeserializer(hconf);
+ partRawRowObjectInspector = (StructObjectInspector) tblDeserializer.getObjectInspector();
+ } else {
+ partRawRowObjectInspector =
+ (StructObjectInspector) partDeserializer.getObjectInspector();
+ }
StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc);
if ((tblRawRowObjectInspector == null) ||
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index 6e4f474..90c83e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -67,6 +67,13 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
private String defaultPartitionName;
+ /**
+ * These values are saved during MapWork, FetchWork, etc preparation and later added to the the
+ * JobConf of each task.
+ */
+ private String schemaEvolutionColumns;
+ private String schemaEvolutionColumnsTypes;
+
public TableDesc getTableDesc() {
return tableDesc;
}
@@ -75,6 +82,19 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
this.tableDesc = tableDesc;
}
+ public void setSchemaEvolution(String schemaEvolutionColumns, String schemaEvolutionColumnsTypes) {
+ this.schemaEvolutionColumns = schemaEvolutionColumns;
+ this.schemaEvolutionColumnsTypes = schemaEvolutionColumnsTypes;
+ }
+
+ public String getSchemaEvolutionColumns() {
+ return schemaEvolutionColumns;
+ }
+
+ public String getSchemaEvolutionColumnsTypes() {
+ return schemaEvolutionColumnsTypes;
+ }
+
/**
* Other than gathering statistics for the ANALYZE command, the table scan operator
* does not do anything special other than just forwarding the row. Since the table
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index de2eb98..0700e2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -106,6 +106,7 @@ import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -121,6 +122,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -128,11 +131,14 @@ import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
@@ -173,6 +179,12 @@ import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
@@ -473,11 +485,6 @@ public final class Utilities {
}
}
- public static Map<Integer, String> getMapWorkVectorScratchColumnTypeMap(Configuration hiveConf) {
- MapWork mapWork = getMapWork(hiveConf);
- return mapWork.getVectorScratchColumnTypeMap();
- }
-
public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
try {
Graph stageGraph = plan.getQueryPlan().getStageGraph();
@@ -3782,6 +3789,22 @@ public final class Utilities {
return false;
}
+ /**
+ * @param conf
+ * @return the configured VectorizedRowBatchCtx for a MapWork task.
+ */
+ public static VectorizedRowBatchCtx getVectorizedRowBatchCtx(Configuration conf) {
+ VectorizedRowBatchCtx result = null;
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) &&
+ Utilities.getPlanPath(conf) != null) {
+ MapWork mapWork = Utilities.getMapWork(conf);
+ if (mapWork != null && mapWork.getVectorMode()) {
+ result = mapWork.getVectorizedRowBatchCtx();
+ }
+ }
+ return result;
+ }
+
public static boolean isVectorMode(Configuration conf, MapWork mapWork) {
return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
&& mapWork.getVectorMode();
@@ -3993,6 +4016,7 @@ public final class Utilities {
}
}
+
public static List<String> getStatsTmpDirs(BaseWork work, Configuration conf) {
List<String> statsTmpDirs = new ArrayList<>();
@@ -4020,4 +4044,74 @@ public final class Utilities {
}
return statsTmpDirs;
}
+
+ public static boolean isInputFileFormatSelfDescribing(PartitionDesc pd) {
+ Class<?> inputFormatClass = pd.getInputFileFormatClass();
+ return SelfDescribingInputFormatInterface.class.isAssignableFrom(inputFormatClass);
+ }
+
+ public static boolean isInputFileFormatVectorized(PartitionDesc pd) {
+ Class<?> inputFormatClass = pd.getInputFileFormatClass();
+ return VectorizedInputFormatInterface.class.isAssignableFrom(inputFormatClass);
+ }
+
+ public static void addSchemaEvolutionToTableScanOperator(Table table,
+ TableScanOperator tableScanOp) {
+ String colNames = MetaStoreUtils.getColumnNamesFromFieldSchema(table.getSd().getCols());
+ String colTypes = MetaStoreUtils.getColumnTypesFromFieldSchema(table.getSd().getCols());
+ tableScanOp.setSchemaEvolution(colNames, colTypes);
+ }
+
+ public static void addSchemaEvolutionToTableScanOperator(StructObjectInspector structOI,
+ TableScanOperator tableScanOp) {
+ String colNames = ObjectInspectorUtils.getFieldNames(structOI);
+ String colTypes = ObjectInspectorUtils.getFieldTypes(structOI);
+ tableScanOp.setSchemaEvolution(colNames, colTypes);
+ }
+
+ public static void unsetSchemaEvolution(Configuration conf) {
+ conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+ conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+ }
+
+ public static void addTableSchemaToConf(Configuration conf,
+ TableScanOperator tableScanOp) {
+ String schemaEvolutionColumns = tableScanOp.getSchemaEvolutionColumns();
+ if (schemaEvolutionColumns != null) {
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, tableScanOp.getSchemaEvolutionColumns());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, tableScanOp.getSchemaEvolutionColumnsTypes());
+ } else {
+ LOG.info("schema.evolution.columns and schema.evolution.columns.types not available");
+ }
+ }
+
+ /**
+ * Create row key and value object inspectors for reduce vectorization.
+ * The row object inspector used by ReduceWork needs to be a **standard**
+ * struct object inspector, not just any struct object inspector.
+ * @param keyInspector
+ * @param valueInspector
+ * @return OI
+ * @throws HiveException
+ */
+ public static StandardStructObjectInspector constructVectorizedReduceRowOI(
+ StructObjectInspector keyInspector, StructObjectInspector valueInspector)
+ throws HiveException {
+
+ ArrayList<String> colNames = new ArrayList<String>();
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
+ for (StructField field: fields) {
+ colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+ ois.add(field.getFieldObjectInspector());
+ }
+ fields = valueInspector.getAllStructFieldRefs();
+ for (StructField field: fields) {
+ colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+ ois.add(field.getFieldObjectInspector());
+ }
+ StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
+
+ return rowObjectInspector;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 5fbefec..439e0df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -153,10 +154,6 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
/* vectorization only works with struct object inspectors */
valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag];
- ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = VectorizedBatchUtil.
- constructVectorizedRowBatch(keyStructInspector,
- valueStructInspectors[tag], gWork.getVectorScratchColumnTypeMap());
- batches[tag] = pair.getFirst();
final int totalColumns = keysColumnOffset
+ valueStructInspectors[tag].getAllStructFieldRefs().size();
valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
@@ -165,7 +162,11 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
.genVectorStructExpressionWritables(valueStructInspectors[tag])));
- rowObjectInspector[tag] = pair.getSecond();
+ rowObjectInspector[tag] = Utilities.constructVectorizedReduceRowOI(keyStructInspector,
+ valueStructInspectors[tag]);
+ batches[tag] = gWork.getVectorizedRowBatchCtx().createVectorizedRowBatch();
+
+
} else {
ois.add(keyObjectInspector);
ois.add(valueObjectInspector[tag]);
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 8768847..efcf88c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -244,7 +244,7 @@ public class ReduceRecordProcessor extends RecordProcessor{
boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode();
sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc,
valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
- redWork.getVectorScratchColumnTypeMap());
+ redWork.getVectorizedRowBatchCtx());
ois[tag] = sources[tag].getObjectInspector();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index aff5765..1f75d07 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -22,11 +22,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -35,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -52,7 +50,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
@@ -124,7 +121,7 @@ public class ReduceRecordSource implements RecordSource {
void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag,
- Map<Integer, String> vectorScratchColumnTypeMap)
+ VectorizedRowBatchCtx batchContext)
throws Exception {
ObjectInspector keyObjectInspector;
@@ -175,10 +172,9 @@ public class ReduceRecordSource implements RecordSource {
.asList(VectorExpressionWriterFactory
.genVectorStructExpressionWritables(valueStructInspectors)));
- ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair =
- VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, valueStructInspectors, vectorScratchColumnTypeMap);
- rowObjectInspector = pair.getSecond();
- batch = pair.getFirst();
+ rowObjectInspector = Utilities.constructVectorizedReduceRowOI(keyStructInspector,
+ valueStructInspectors);
+ batch = batchContext.createVectorizedRowBatch();
// Setup vectorized deserialization for the key and value.
BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) inputKeyDeserializer;
@@ -186,7 +182,7 @@ public class ReduceRecordSource implements RecordSource {
keyBinarySortableDeserializeToRow =
new VectorDeserializeRow<BinarySortableDeserializeRead>(
new BinarySortableDeserializeRead(
- VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+ VectorizedBatchUtil.typeInfosFromStructObjectInspector(
keyStructInspector),
binarySortableSerDe.getSortOrders()));
keyBinarySortableDeserializeToRow.init(0);
@@ -196,7 +192,7 @@ public class ReduceRecordSource implements RecordSource {
valueLazyBinaryDeserializeToRow =
new VectorDeserializeRow<LazyBinaryDeserializeRead>(
new LazyBinaryDeserializeRead(
- VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+ VectorizedBatchUtil.typeInfosFromStructObjectInspector(
valueStructInspectors)));
valueLazyBinaryDeserializeToRow.init(firstValueColumnOffset);
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index 94a60be..4100bc5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -468,6 +468,9 @@ public abstract class VectorExtractRow {
int start = colVector.start[adjustedIndex];
int length = colVector.length[adjustedIndex];
+ if (value == null) {
+ LOG.info("null string entry: batchIndex " + batchIndex + " columnIndex " + columnIndex);
+ }
// Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
text.set(value, start, length);
@@ -727,9 +730,9 @@ public abstract class VectorExtractRow {
}
public void extractRow(int batchIndex, Object[] objects) {
- int i = 0;
- for (Extractor extracter : extracters) {
- objects[i++] = extracter.extract(batchIndex);
+ for (int i = 0; i < extracters.length; i++) {
+ Extractor extracter = extracters[i];
+ objects[i] = extracter.extract(batchIndex);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 35bbaef..0524c08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -813,7 +813,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
outputFieldNames, objectInspectors);
if (isVectorOutput) {
vrbCtx = new VectorizedRowBatchCtx();
- vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) outputObjInspector);
+ vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames());
outputBatch = vrbCtx.createVectorizedRowBatch();
vectorAssignRowSameBatch = new VectorAssignRowSameBatch();
vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns());
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
index e378d0d..4b1d9ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
@@ -90,7 +90,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
super.initializeOp(hconf);
vrbCtx = new VectorizedRowBatchCtx();
- vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector);
+ vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
outputBatch = vrbCtx.createVectorizedRowBatch();
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
index dcd2d57..9ff9b77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
@@ -146,7 +146,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
super.initializeOp(hconf);
vrbCtx = new VectorizedRowBatchCtx();
- vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector);
+ vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
outputBatch = vrbCtx.createVectorizedRowBatch();
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index e7a829e..95a4b9d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -144,6 +144,8 @@ public class VectorizationContext {
VectorExpressionDescriptor vMap;
+ private List<String> initialColumnNames;
+
private List<Integer> projectedColumns;
private List<String> projectionColumnNames;
private Map<String, Integer> projectionColumnMap;
@@ -161,6 +163,7 @@ public class VectorizationContext {
LOG.debug("VectorizationContext consructor contextName " + contextName + " level "
+ level + " initialColumnNames " + initialColumnNames);
}
+ this.initialColumnNames = initialColumnNames;
this.projectionColumnNames = initialColumnNames;
projectedColumns = new ArrayList<Integer>();
@@ -183,6 +186,7 @@ public class VectorizationContext {
if (LOG.isDebugEnabled()) {
LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level);
}
+ initialColumnNames = new ArrayList<String>();
projectedColumns = new ArrayList<Integer>();
projectionColumnNames = new ArrayList<String>();
projectionColumnMap = new HashMap<String, Integer>();
@@ -198,6 +202,7 @@ public class VectorizationContext {
this.contextName = contextName;
level = vContext.level + 1;
LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level);
+ this.initialColumnNames = vContext.initialColumnNames;
this.projectedColumns = new ArrayList<Integer>();
this.projectionColumnNames = new ArrayList<String>();
this.projectionColumnMap = new HashMap<String, Integer>();
@@ -210,6 +215,7 @@ public class VectorizationContext {
// Add an initial column to a vectorization context when
// a vectorized row batch is being created.
public void addInitialColumn(String columnName) {
+ initialColumnNames.add(columnName);
int index = projectedColumns.size();
projectedColumns.add(index);
projectionColumnNames.add(columnName);
@@ -238,6 +244,10 @@ public class VectorizationContext {
projectionColumnMap.put(columnName, vectorBatchColIndex);
}
+ public List<String> getInitialColumnNames() {
+ return initialColumnNames;
+ }
+
public List<Integer> getProjectedColumns() {
return projectedColumns;
}
@@ -1038,7 +1048,9 @@ public class VectorizationContext {
VectorExpressionDescriptor.Descriptor descriptor = builder.build();
Class<?> vclass = this.vMap.getVectorExpressionClass(udfClass, descriptor);
if (vclass == null) {
- LOG.info("No vector udf found for " + udfClass.getSimpleName() + ", descriptor: " + descriptor);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No vector udf found for "+udfClass.getSimpleName() + ", descriptor: "+descriptor);
+ }
return null;
}
Mode childrenMode = getChildrenMode(mode, udfClass);
@@ -2337,11 +2349,11 @@ public class VectorizationContext {
return ColumnVector.Type.DECIMAL;
default:
- throw new HiveException("Unexpected primitive type category " + primitiveCategory);
+ throw new RuntimeException("Unexpected primitive type category " + primitiveCategory);
}
}
default:
- throw new HiveException("Unexpected type category " +
+ throw new RuntimeException("Unexpected type category " +
typeInfo.getCategory());
}
}
@@ -2452,13 +2464,16 @@ public class VectorizationContext {
return firstOutputColumnIndex;
}
- public Map<Integer, String> getScratchColumnTypeMap() {
- Map<Integer, String> map = new HashMap<Integer, String>();
+ public String[] getScratchColumnTypeNames() {
+ String[] result = new String[ocm.outputColCount];
for (int i = 0; i < ocm.outputColCount; i++) {
- String type = ocm.outputColumnsTypes[i];
- map.put(i+this.firstOutputColumnIndex, type);
+ String typeName = ocm.outputColumnsTypes[i];
+ if (typeName.equalsIgnoreCase("long")) {
+ typeName = "bigint"; // Convert our synonym to a real Hive type name.
+ }
+ result[i] = typeName;
}
- return map;
+ return result;
}
@Override
@@ -2478,9 +2493,7 @@ public class VectorizationContext {
}
sb.append("sorted projectionColumnMap ").append(sortedColumnMap).append(", ");
- Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger);
- sortedScratchColumnTypeMap.putAll(getScratchColumnTypeMap());
- sb.append("sorted scratchColumnTypeMap ").append(sortedScratchColumnTypeMap);
+ sb.append("scratchColumnTypeNames ").append(getScratchColumnTypeNames().toString());
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 3d6d6e0..d75d185 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -56,9 +56,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -114,16 +118,24 @@ public class VectorizedBatchUtil {
batch.size = size;
}
- /**
- * Convert an ObjectInspector into a ColumnVector of the appropriate
- * type.
- */
- public static ColumnVector createColumnVector(ObjectInspector inspector
- ) throws HiveException {
- switch(inspector.getCategory()) {
- case PRIMITIVE:
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector;
- switch(poi.getPrimitiveCategory()) {
+ public static ColumnVector createColumnVector(String typeName) {
+ typeName = typeName.toLowerCase();
+
+ // Allow undecorated CHAR and VARCHAR to support scratch column type names.
+ if (typeName.equals("char") || typeName.equals("varchar")) {
+ return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+ }
+
+ TypeInfo typeInfo = (TypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+ return createColumnVector(typeInfo);
+ }
+
+ public static ColumnVector createColumnVector(TypeInfo typeInfo) {
+ switch(typeInfo.getCategory()) {
+ case PRIMITIVE:
+ {
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+ switch(primitiveTypeInfo.getPrimitiveCategory()) {
case BOOLEAN:
case BYTE:
case SHORT:
@@ -143,142 +155,53 @@ public class VectorizedBatchUtil {
case VARCHAR:
return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
case DECIMAL:
- DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo();
+ DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo;
return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
tInfo.precision(), tInfo.scale());
default:
- throw new HiveException("Vectorizaton is not supported for datatype:"
- + poi.getPrimitiveCategory());
+ throw new RuntimeException("Vectorizaton is not supported for datatype:"
+ + primitiveTypeInfo.getPrimitiveCategory());
}
- case STRUCT: {
- StructObjectInspector soi = (StructObjectInspector) inspector;
- List<? extends StructField> fieldList = soi.getAllStructFieldRefs();
- ColumnVector[] children = new ColumnVector[fieldList.size()];
+ }
+ case STRUCT:
+ {
+ StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+ List<TypeInfo> typeInfoList = structTypeInfo.getAllStructFieldTypeInfos();
+ ColumnVector[] children = new ColumnVector[typeInfoList.size()];
for(int i=0; i < children.length; ++i) {
children[i] =
- createColumnVector(fieldList.get(i).getFieldObjectInspector());
+ createColumnVector(typeInfoList.get(i));
}
return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
children);
}
- case UNION: {
- UnionObjectInspector uoi = (UnionObjectInspector) inspector;
- List<ObjectInspector> fieldList = uoi.getObjectInspectors();
- ColumnVector[] children = new ColumnVector[fieldList.size()];
+ case UNION:
+ {
+ UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+ List<TypeInfo> typeInfoList = unionTypeInfo.getAllUnionObjectTypeInfos();
+ ColumnVector[] children = new ColumnVector[typeInfoList.size()];
for(int i=0; i < children.length; ++i) {
- children[i] = createColumnVector(fieldList.get(i));
+ children[i] = createColumnVector(typeInfoList.get(i));
}
return new UnionColumnVector(VectorizedRowBatch.DEFAULT_SIZE, children);
}
- case LIST: {
- ListObjectInspector loi = (ListObjectInspector) inspector;
+ case LIST:
+ {
+ ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
return new ListColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
- createColumnVector(loi.getListElementObjectInspector()));
+ createColumnVector(listTypeInfo.getListElementTypeInfo()));
}
- case MAP: {
- MapObjectInspector moi = (MapObjectInspector) inspector;
+ case MAP:
+ {
+ MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
return new MapColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
- createColumnVector(moi.getMapKeyObjectInspector()),
- createColumnVector(moi.getMapValueObjectInspector()));
+ createColumnVector(mapTypeInfo.getMapKeyTypeInfo()),
+ createColumnVector(mapTypeInfo.getMapValueTypeInfo()));
}
- default:
- throw new HiveException("Vectorization is not supported for datatype:"
- + inspector.getCategory());
- }
-
- }
-
- /**
- * Walk through the object inspector and add column vectors
- *
- * @param oi
- * @param cvList
- * ColumnVectors are populated in this list
- */
- private static void allocateColumnVector(StructObjectInspector oi,
- List<ColumnVector> cvList) throws HiveException {
- if (cvList == null) {
- throw new HiveException("Null columnvector list");
- }
- if (oi == null) {
- return;
- }
- final List<? extends StructField> fields = oi.getAllStructFieldRefs();
- for(StructField field : fields) {
- ObjectInspector fieldObjectInspector = field.getFieldObjectInspector();
- cvList.add(createColumnVector(fieldObjectInspector));
- }
- }
-
-
- /**
- * Create VectorizedRowBatch from ObjectInspector
- *
- * @param oi
- * @return
- * @throws HiveException
- */
- public static VectorizedRowBatch constructVectorizedRowBatch(
- StructObjectInspector oi) throws HiveException {
- final List<ColumnVector> cvList = new LinkedList<ColumnVector>();
- allocateColumnVector(oi, cvList);
- final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size());
- int i = 0;
- for(ColumnVector cv : cvList) {
- result.cols[i++] = cv;
- }
- return result;
- }
-
- /**
- * Create VectorizedRowBatch from key and value object inspectors
- * The row object inspector used by ReduceWork needs to be a **standard**
- * struct object inspector, not just any struct object inspector.
- * @param keyInspector
- * @param valueInspector
- * @param vectorScratchColumnTypeMap
- * @return VectorizedRowBatch, OI
- * @throws HiveException
- */
- public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch(
- StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap)
- throws HiveException {
-
- ArrayList<String> colNames = new ArrayList<String>();
- ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
- List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
- for (StructField field: fields) {
- colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
- }
- fields = valueInspector.getAllStructFieldRefs();
- for (StructField field: fields) {
- colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
+ default:
+ throw new RuntimeException("Vectorization is not supported for datatype:"
+ + typeInfo.getCategory());
}
- StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
-
- VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
- batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector);
- return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector);
- }
-
- /**
- * Iterates through all columns in a given row and populates the batch
- *
- * @param row
- * @param oi
- * @param rowIndex
- * @param batch
- * @param buffer
- * @throws HiveException
- */
- public static void addRowToBatch(Object row, StructObjectInspector oi,
- int rowIndex,
- VectorizedRowBatch batch,
- DataOutputBuffer buffer
- ) throws HiveException {
- addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer);
}
/**
@@ -621,31 +544,30 @@ public class VectorizedBatchUtil {
return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids);
}
- public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector(
+ public static String[] columnNamesFromStructObjectInspector(
StructObjectInspector structObjectInspector) throws HiveException {
List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
- PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()];
+ String[] result = new String[fields.size()];
int i = 0;
for(StructField field : fields) {
- TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
- field.getFieldObjectInspector().getTypeName());
- result[i++] = (PrimitiveTypeInfo) typeInfo;
+ result[i++] = field.getFieldName();
}
return result;
}
- public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames(
- String[] typeNames) throws HiveException {
-
- PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[typeNames.length];
+ public static TypeInfo[] typeInfosFromTypeNames(String[] typeNames) throws HiveException {
+ ArrayList<TypeInfo> typeInfoList =
+ TypeInfoUtils.typeInfosFromTypeNames(Arrays.asList(typeNames));
+ return typeInfoList.toArray(new TypeInfo[0]);
+ }
- for(int i = 0; i < typeNames.length; i++) {
- TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]);
- result[i] = (PrimitiveTypeInfo) typeInfo;
- }
- return result;
+ public static TypeInfo[] typeInfosFromStructObjectInspector(
+ StructObjectInspector structObjectInspector) {
+ ArrayList<TypeInfo> typeInfoList =
+ TypeInfoUtils.typeInfosFromStructObjectInspector(structObjectInspector);
+ return typeInfoList.toArray(new TypeInfo[0]);
}
static ColumnVector cloneColumnVector(ColumnVector source