You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mg...@apache.org on 2019/09/05 15:24:27 UTC
[hive] branch master updated: HIVE-22028 Clean up Add Partition
(Miklos Gergely reviewd by Jesus Camacho Rodriguez)
This is an automated email from the ASF dual-hosted git repository.
mgergely pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0213afb HIVE-22028 Clean up Add Partition (Miklos Gergely reviewd by Jesus Camacho Rodriguez)
0213afb is described below
commit 0213afb8a31af1f48d009edd41cec9e6c8942354
Author: miklosgergely <mg...@cloudera.com>
AuthorDate: Mon Jul 22 02:24:14 2019 +0200
HIVE-22028 Clean up Add Partition (Miklos Gergely reviewd by Jesus Camacho Rodriguez)
---
.../hive/hcatalog/streaming/HiveEndPoint.java | 60 +---
.../partition/AlterTableAddPartitionDesc.java | 304 +++++++--------------
.../partition/AlterTableAddPartitionOperation.java | 196 ++++++++++++-
.../bootstrap/events/filesystem/FSTableEvent.java | 54 ++--
.../repl/bootstrap/load/table/LoadPartitions.java | 8 +-
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 160 ++---------
.../hadoop/hive/ql/parse/DDLSemanticAnalyzer.java | 72 +++--
.../hive/ql/parse/ImportSemanticAnalyzer.java | 92 +++----
.../results/clientpositive/add_part_multiple.q.out | 18 +-
.../clientpositive/drop_partitions_filter.q.out | 8 +-
.../clientpositive/llap/add_part_with_loc.q.out | 8 +-
.../clientpositive/spark/add_part_multiple.q.out | 18 +-
.../hive/streaming/HiveStreamingConnection.java | 12 +-
13 files changed, 478 insertions(+), 532 deletions(-)
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index fee7ffc..e249b77 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -25,12 +25,10 @@ import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
@@ -42,14 +40,10 @@ import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -229,18 +223,6 @@ public class HiveEndPoint {
return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo);
}
- private static UserGroupInformation getUserGroupInfo(String user)
- throws ImpersonationFailed {
- try {
- return UserGroupInformation.createProxyUser(
- user, UserGroupInformation.getLoginUser());
- } catch (IOException e) {
- LOG.error("Unable to get UserGroupInfo for user : " + user, e);
- throw new ImpersonationFailed(user,e);
- }
- }
-
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -467,12 +449,10 @@ public class HiveEndPoint {
Map<String, String> partSpec =
Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), ep.partitionVals);
- AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(ep.database, ep.table, true);
- String partLocation = new Path(tableObject.getDataLocation(),
- Warehouse.makePartPath(partSpec)).toString();
- addPartitionDesc.addPartition(partSpec, partLocation);
- Partition partition = Hive.convertAddSpecToMetaPartition(tableObject,
- addPartitionDesc.getPartition(0), conf);
+ Path location = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec));
+ location = new Path(Utilities.getQualifiedPath(conf, location));
+ Partition partition =
+ org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(tableObject, partSpec, location);
msClient.add_partition(partition);
}
catch (AlreadyExistsException e) {
@@ -486,36 +466,6 @@ public class HiveEndPoint {
}
}
- private static boolean runDDL(IDriver driver, String sql) throws QueryFailedException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running Hive Query: " + sql);
- }
- driver.run(sql);
- return true;
- }
-
- private static String partSpecStr(List<FieldSchema> partKeys, ArrayList<String> partVals) {
- if (partKeys.size()!=partVals.size()) {
- throw new IllegalArgumentException("Partition values:" + partVals +
- ", does not match the partition Keys in table :" + partKeys );
- }
- StringBuilder buff = new StringBuilder(partKeys.size()*20);
- buff.append(" ( ");
- int i=0;
- for (FieldSchema schema : partKeys) {
- buff.append(schema.getName());
- buff.append("='");
- buff.append(partVals.get(i));
- buff.append("'");
- if (i!=partKeys.size()-1) {
- buff.append(",");
- }
- ++i;
- }
- buff.append(" )");
- return buff.toString();
- }
-
private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf, boolean secureMode)
throws ConnectionError {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java
index e1aecbc..9339144 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.ddl.table.partition;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -32,45 +31,67 @@ import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
/**
- * DDL task description for ALTER TABLE ... DROP PARTITION ... commands.
+ * DDL task description for ALTER TABLE ... ADD PARTITION ... commands.
*/
@Explain(displayName = "Add Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public class AlterTableAddPartitionDesc implements DDLDesc, Serializable {
private static final long serialVersionUID = 1L;
+ /**
+ * Description of a partition to add.
+ */
+ @Explain(displayName = "Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public static class PartitionDesc {
- PartitionDesc(
- Map<String, String> partSpec, String location, Map<String, String> params) {
- this(partSpec, location);
- this.partParams = params;
- }
-
- PartitionDesc(Map<String, String> partSpec, String location) {
- this.partSpec = partSpec;
+ private final Map<String, String> partitionSpec;
+ private String location; // TODO: make location final too
+ private final Map<String, String> params;
+ private final String inputFormat;
+ private final String outputFormat;
+ private final int numBuckets;
+ private final List<FieldSchema> columns;
+ private final String serializationLib;
+ private final Map<String, String> serdeParams;
+ private final List<String> bucketColumns;
+ private final List<Order> sortColumns;
+ private final ColumnStatistics columnStats;
+ private final long writeId;
+
+ public PartitionDesc(Map<String, String> partitionSpec, String location, Map<String, String> params) {
+ this(partitionSpec, location, params, null, null, -1, null, null, null, null, null, null, -1);
+ }
+
+ public PartitionDesc(Map<String, String> partitionSpec, String location, Map<String, String> params,
+ String inputFormat, String outputFormat, int numBuckets, List<FieldSchema> columns, String serializationLib,
+ Map<String, String> serdeParams, List<String> bucketColumns, List<Order> sortColumns,
+ ColumnStatistics columnStats, long writeId) {
+ this.partitionSpec = partitionSpec;
this.location = location;
+ this.params = params;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.numBuckets = numBuckets;
+ this.columns = columns;
+ this.serializationLib = serializationLib;
+ this.serdeParams = serdeParams;
+ this.bucketColumns = bucketColumns;
+ this.sortColumns = sortColumns;
+ this.columnStats = columnStats;
+ this.writeId = writeId;
}
- Map<String, String> partSpec;
- Map<String, String> partParams;
- String location;
- String inputFormat = null;
- String outputFormat = null;
- int numBuckets = -1;
- List<FieldSchema> cols = null;
- String serializationLib = null;
- Map<String, String> serdeParams = null;
- List<String> bucketCols = null;
- List<Order> sortCols = null;
- ColumnStatistics colStats = null;
- long writeId = -1;
-
public Map<String, String> getPartSpec() {
- return partSpec;
+ return partitionSpec;
+ }
+
+ @Explain(displayName = "partition spec", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public String getPartSpecForExplain() {
+ return partitionSpec.toString();
}
/**
* @return location of partition in relation to table
*/
+ @Explain(displayName = "location", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public String getLocation() {
return location;
}
@@ -80,241 +101,102 @@ public class AlterTableAddPartitionDesc implements DDLDesc, Serializable {
}
public Map<String, String> getPartParams() {
- return partParams;
+ return params;
+ }
+
+ @Explain(displayName = "params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public String getPartParamsForExplain() {
+ return params.toString();
}
- public void setPartParams(Map<String, String> partParams) {
- this.partParams = partParams;
+ @Explain(displayName = "input format", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public String getInputFormat() {
+ return inputFormat;
+ }
+
+ @Explain(displayName = "output format", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public String getOutputFormat() {
+ return outputFormat;
}
public int getNumBuckets() {
return numBuckets;
}
- public void setNumBuckets(int numBuckets) {
- this.numBuckets = numBuckets;
+ @Explain(displayName = "num buckets", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public Integer getNumBucketsExplain() {
+ return numBuckets == -1 ? null : numBuckets;
}
+ @Explain(displayName = "columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public List<FieldSchema> getCols() {
- return cols;
- }
-
- public void setCols(List<FieldSchema> cols) {
- this.cols = cols;
+ return columns;
}
+ @Explain(displayName = "serialization lib", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public String getSerializationLib() {
return serializationLib;
}
- public void setSerializationLib(String serializationLib) {
- this.serializationLib = serializationLib;
- }
-
+ @Explain(displayName = "serde params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public Map<String, String> getSerdeParams() {
return serdeParams;
}
- public void setSerdeParams(Map<String, String> serdeParams) {
- this.serdeParams = serdeParams;
- }
-
+ @Explain(displayName = "bucket columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public List<String> getBucketCols() {
- return bucketCols;
- }
-
- public void setBucketCols(List<String> bucketCols) {
- this.bucketCols = bucketCols;
+ return bucketColumns;
}
+ @Explain(displayName = "sort columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public List<Order> getSortCols() {
- return sortCols;
+ return sortColumns;
}
- public void setSortCols(List<Order> sortCols) {
- this.sortCols = sortCols;
+ @Explain(displayName = "column stats", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public ColumnStatistics getColStats() {
+ return columnStats;
}
- public String getInputFormat() {
- return inputFormat;
+ public long getWriteId() {
+ return writeId;
}
-
- public void setInputFormat(String inputFormat) {
- this.inputFormat = inputFormat;
- }
-
- public String getOutputFormat() {
- return outputFormat;
- }
-
- public void setOutputFormat(String outputFormat) {
- this.outputFormat = outputFormat;
- }
-
- public ColumnStatistics getColStats() { return colStats; }
-
- public void setColStats(ColumnStatistics colStats) { this.colStats = colStats; }
-
- public long getWriteId() { return writeId; }
-
- public void setWriteId(long writeId) { this.writeId = writeId; }
}
- String tableName;
- String dbName;
- boolean ifNotExists;
- List<PartitionDesc> partitions = null;
- boolean replaceMode = false;
- private ReplicationSpec replicationSpec = null;
-
+ private final String dbName;
+ private final String tableName;
+ private final boolean ifNotExists;
+ private final List<PartitionDesc> partitions;
- /**
- * For serialization only.
- */
- public AlterTableAddPartitionDesc() {
- }
+ private ReplicationSpec replicationSpec = null; // TODO: make replicationSpec final too
- public AlterTableAddPartitionDesc(
- String dbName, String tableName, boolean ifNotExists) {
- super();
+ public AlterTableAddPartitionDesc(String dbName, String tableName, boolean ifNotExists,
+ List<PartitionDesc> partitions) {
this.dbName = dbName;
this.tableName = tableName;
this.ifNotExists = ifNotExists;
+ this.partitions = partitions;
}
- /**
- * Legacy single-partition ctor for ImportSemanticAnalyzer
- * @param dbName
- * database to add to.
- * @param tableName
- * table to add to.
- * @param partSpec
- * partition specification.
- * @param location
- * partition location, relative to table location.
- * @param params
- * partition parameters.
- */
- @Deprecated
- public AlterTableAddPartitionDesc(String dbName, String tableName,
- Map<String, String> partSpec, String location, Map<String, String> params) {
- super();
- this.dbName = dbName;
- this.tableName = tableName;
- this.ifNotExists = true;
- addPartition(partSpec, location, params);
- }
-
- public void addPartition(Map<String, String> partSpec, String location) {
- addPartition(partSpec, location, null);
- }
-
- private void addPartition(
- Map<String, String> partSpec, String location, Map<String, String> params) {
- if (this.partitions == null) {
- this.partitions = new ArrayList<PartitionDesc>();
- }
- this.partitions.add(new PartitionDesc(partSpec, location, params));
- }
-
- /**
- * @return database name
- */
+ @Explain(displayName = "db name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public String getDbName() {
return dbName;
}
- /**
- * @param dbName
- * database name
- */
- public void setDbName(String dbName) {
- this.dbName = dbName;
- }
-
- /**
- * @return the table we're going to add the partitions to.
- */
+ @Explain(displayName = "table name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public String getTableName() {
return tableName;
}
- /**
- * @param tableName
- * the table we're going to add the partitions to.
- */
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- /**
- * @return location of partition in relation to table
- */
- @Explain(displayName = "Location")
- public String getLocationForExplain() {
- if (this.partitions == null || this.partitions.isEmpty()) return "<no partition>";
- boolean isFirst = true;
- StringBuilder sb = new StringBuilder();
- for (PartitionDesc desc : this.partitions) {
- if (!isFirst) {
- sb.append(", ");
- }
- isFirst = false;
- sb.append(desc.location);
- }
- return sb.toString();
- }
-
- @Explain(displayName = "Spec")
- public String getPartSpecStringForExplain() {
- if (this.partitions == null || this.partitions.isEmpty()) return "<no partition>";
- boolean isFirst = true;
- StringBuilder sb = new StringBuilder();
- for (PartitionDesc desc : this.partitions) {
- if (!isFirst) {
- sb.append(", ");
- }
- isFirst = false;
- sb.append(desc.partSpec.toString());
- }
- return sb.toString();
- }
-
- /**
- * @return if the partition should only be added if it doesn't exist already
- */
+ @Explain(displayName = "if not exists", displayOnlyOnTrue = true,
+ explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public boolean isIfNotExists() {
- return this.ifNotExists;
+ return ifNotExists;
}
- /**
- * @param ifNotExists
- * if the part should be added only if it doesn't exist
- */
- public void setIfNotExists(boolean ifNotExists) {
- this.ifNotExists = ifNotExists;
- }
-
- public int getPartitionCount() {
- return this.partitions.size();
- }
-
- public PartitionDesc getPartition(int i) {
- return this.partitions.get(i);
- }
-
- /**
- * @param replaceMode Determine if this AddPartition should behave like a replace-into alter instead
- */
- public void setReplaceMode(boolean replaceMode){
- this.replaceMode = replaceMode;
- }
-
- /**
- * @return true if this AddPartition should behave like a replace-into alter instead
- */
- public boolean getReplaceMode() {
- return this.replaceMode;
+ @Explain(displayName = "partitions", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public List<PartitionDesc> getPartitions() {
+ return partitions;
}
/**
@@ -333,6 +215,6 @@ public class AlterTableAddPartitionDesc implements DDLDesc, Serializable {
if (replicationSpec == null) {
this.replicationSpec = new ReplicationSpec();
}
- return this.replicationSpec;
+ return replicationSpec;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java
index 488fa59..d8597a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java
@@ -18,14 +18,29 @@
package org.apache.hadoop.hive.ql.ddl.table.partition;
+import java.util.ArrayList;
+import java.util.BitSet;
import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.ddl.DDLOperation;
import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
import org.apache.hadoop.hive.ql.ddl.DDLUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
/**
* Operation process of adding a partition to a table.
@@ -37,10 +52,181 @@ public class AlterTableAddPartitionOperation extends DDLOperation<AlterTableAddP
@Override
public int execute() throws HiveException {
- List<Partition> parts = context.getDb().createPartitions(desc);
- for (Partition part : parts) {
- DDLUtils.addIfAbsentByName(new WriteEntity(part, WriteEntity.WriteType.INSERT), context);
- }
+ // TODO: catalog name everywhere in this method
+ Table table = context.getDb().getTable(desc.getDbName(), desc.getTableName());
+ long writeId = getWriteId(table);
+
+ List<Partition> partitions = getPartitions(table, writeId);
+ addPartitions(table, partitions, writeId);
return 0;
}
+
+ private long getWriteId(Table table) throws LockException {
+ // In case of replication, get the writeId from the source and use valid write Id list for replication.
+ if (desc.getReplicationSpec().isInReplicationScope() && desc.getPartitions().get(0).getWriteId() > 0) {
+ return desc.getPartitions().get(0).getWriteId();
+ } else {
+ AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(context.getConf(), table, true);
+ if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) {
+ return tableSnapshot.getWriteId();
+ } else {
+ return -1;
+ }
+ }
+ }
+
+ private List<Partition> getPartitions(Table table, long writeId) throws HiveException {
+ List<Partition> partitions = new ArrayList<>(desc.getPartitions().size());
+ for (AlterTableAddPartitionDesc.PartitionDesc partitionDesc : desc.getPartitions()) {
+ Partition partition = convertPartitionSpecToMetaPartition(table, partitionDesc);
+ if (partition != null && writeId > 0) {
+ partition.setWriteId(writeId);
+ }
+ partitions.add(partition);
+ }
+
+ return partitions;
+ }
+
+ private Partition convertPartitionSpecToMetaPartition(Table table,
+ AlterTableAddPartitionDesc.PartitionDesc partitionSpec) throws HiveException {
+ Path location = partitionSpec.getLocation() != null ? new Path(table.getPath(), partitionSpec.getLocation()) : null;
+ if (location != null) {
+ // Ensure that it is a full qualified path (in most cases it will be since tbl.getPath() is full qualified)
+ location = new Path(Utilities.getQualifiedPath(context.getConf(), location));
+ }
+
+ Partition partition = org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(
+ table, partitionSpec.getPartSpec(), location);
+
+ if (partitionSpec.getPartParams() != null) {
+ partition.setParameters(partitionSpec.getPartParams());
+ }
+ if (partitionSpec.getInputFormat() != null) {
+ partition.getSd().setInputFormat(partitionSpec.getInputFormat());
+ }
+ if (partitionSpec.getOutputFormat() != null) {
+ partition.getSd().setOutputFormat(partitionSpec.getOutputFormat());
+ }
+ if (partitionSpec.getNumBuckets() != -1) {
+ partition.getSd().setNumBuckets(partitionSpec.getNumBuckets());
+ }
+ if (partitionSpec.getCols() != null) {
+ partition.getSd().setCols(partitionSpec.getCols());
+ }
+ if (partitionSpec.getSerializationLib() != null) {
+ partition.getSd().getSerdeInfo().setSerializationLib(partitionSpec.getSerializationLib());
+ }
+ if (partitionSpec.getSerdeParams() != null) {
+ partition.getSd().getSerdeInfo().setParameters(partitionSpec.getSerdeParams());
+ }
+ if (partitionSpec.getBucketCols() != null) {
+ partition.getSd().setBucketCols(partitionSpec.getBucketCols());
+ }
+ if (partitionSpec.getSortCols() != null) {
+ partition.getSd().setSortCols(partitionSpec.getSortCols());
+ }
+ if (partitionSpec.getColStats() != null) {
+ partition.setColStats(partitionSpec.getColStats());
+ // Statistics will have an associated write Id for a transactional table. We need it to update column statistics.
+ partition.setWriteId(partitionSpec.getWriteId());
+ }
+ return partition;
+ }
+
+ private void addPartitions(Table table, List<Partition> partitions, long writeId) throws HiveException {
+ List<org.apache.hadoop.hive.ql.metadata.Partition> outPartitions = null;
+ if (!desc.getReplicationSpec().isInReplicationScope()) {
+ outPartitions = addPartitionsNoReplication(table, partitions);
+ } else {
+ outPartitions = addPartitionsWithReplication(table, partitions, writeId);
+ }
+
+ for (org.apache.hadoop.hive.ql.metadata.Partition outPartition : outPartitions) {
+ DDLUtils.addIfAbsentByName(new WriteEntity(outPartition, WriteEntity.WriteType.INSERT), context);
+ }
+ }
+
+ private List<org.apache.hadoop.hive.ql.metadata.Partition> addPartitionsNoReplication(Table table,
+ List<Partition> partitions) throws HiveException {
+ // TODO: normally, the result is not necessary; might make sense to pass false
+ List<org.apache.hadoop.hive.ql.metadata.Partition> outPartitions = new ArrayList<>();
+ for (Partition outPart : context.getDb().addPartition(partitions, desc.isIfNotExists(), true)) {
+ outPartitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, outPart));
+ }
+ return outPartitions;
+ }
+
+ private List<org.apache.hadoop.hive.ql.metadata.Partition> addPartitionsWithReplication(Table table,
+ List<Partition> partitions, long writeId) throws HiveException {
+ // For replication add-ptns, we need to follow a insert-if-not-exist, alter-if-exists scenario.
+ // TODO : ideally, we should push this mechanism to the metastore, because, otherwise, we have
+ // no choice but to iterate over the partitions here.
+
+ List<Partition> partitionsToAdd = new ArrayList<>();
+ List<Partition> partitionssToAlter = new ArrayList<>();
+ List<String> partitionNames = new ArrayList<>();
+ for (Partition partition : partitions){
+ partitionNames.add(getPartitionName(table, partition));
+ try {
+ Partition p = context.getDb().getPartition(desc.getDbName(), desc.getTableName(), partition.getValues());
+ if (desc.getReplicationSpec().allowReplacementInto(p.getParameters())){
+ ReplicationSpec.copyLastReplId(p.getParameters(), partition.getParameters());
+ partitionssToAlter.add(partition);
+ } // else ptn already exists, but we do nothing with it.
+ } catch (HiveException e){
+ if (e.getCause() instanceof NoSuchObjectException) {
+ // if the object does not exist, we want to add it.
+ partitionsToAdd.add(partition);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ List<org.apache.hadoop.hive.ql.metadata.Partition> outPartitions = new ArrayList<>();
+ for (Partition outPartition : context.getDb().addPartition(partitionsToAdd, desc.isIfNotExists(), true)) {
+ outPartitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, outPartition));
+ }
+
+ // In case of replication, statistics is obtained from the source, so do not update those on replica.
+ EnvironmentContext ec = new EnvironmentContext();
+ ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+ String validWriteIdList = getValidWriteIdList(table, writeId);
+ context.getDb().alterPartitions(desc.getDbName(), desc.getTableName(), partitionssToAlter, ec, validWriteIdList,
+ writeId);
+
+ for (Partition outPartition : context.getDb().getPartitionsByNames(desc.getDbName(), desc.getTableName(),
+ partitionNames)){
+ outPartitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, outPartition));
+ }
+
+ return outPartitions;
+ }
+
+ private String getPartitionName(Table table, Partition partition) throws HiveException {
+ try {
+ return Warehouse.makePartName(table.getPartitionKeys(), partition.getValues());
+ } catch (MetaException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ private String getValidWriteIdList(Table table, long writeId) throws LockException {
+ if (desc.getReplicationSpec().isInReplicationScope() && desc.getPartitions().get(0).getWriteId() > 0) {
+ // We need a valid writeId list for a transactional change. During replication we do not
+ // have a valid writeId list which was used for this on the source. But we know for sure
+ // that the writeId associated with it was valid then (otherwise the change would have
+ // failed on the source). So use a valid transaction list with only that writeId.
+ return new ValidReaderWriteIdList(TableName.getDbTable(table.getDbName(), table.getTableName()),
+ new long[0], new BitSet(), writeId).writeToString();
+ } else {
+ AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(context.getConf(), table, true);
+ if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) {
+ return tableSnapshot.getValidWriteIdList();
+ } else {
+ return null;
+ }
+ }
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 6b9ea77..5eb4c8b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -41,9 +42,12 @@ import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration;
+import com.google.common.collect.ImmutableList;
+
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater;
@@ -146,7 +150,7 @@ public class FSTableEvent implements TableEvent {
//TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose.
for (Partition partition : metadata.getPartitions()) {
// TODO: this should ideally not create AddPartitionDesc per partition
- AlterTableAddPartitionDesc partsDesc = partitionDesc(fromPath, tblDesc, partition);
+ AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPath, tblDesc, partition);
descs.add(partsDesc);
}
return descs;
@@ -167,46 +171,42 @@ public class FSTableEvent implements TableEvent {
return partitions;
}
- private AlterTableAddPartitionDesc partitionDesc(Path fromPath,
- ImportTableDesc tblDesc, Partition partition) throws SemanticException {
+ private AlterTableAddPartitionDesc addPartitionDesc(Path fromPath, ImportTableDesc tblDesc, Partition partition)
+ throws SemanticException {
try {
- AlterTableAddPartitionDesc partsDesc =
- new AlterTableAddPartitionDesc(tblDesc.getDatabaseName(), tblDesc.getTableName(),
- EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
- partition.getSd().getLocation(), partition.getParameters());
- AlterTableAddPartitionDesc.PartitionDesc partDesc = partsDesc.getPartition(0);
- partDesc.setInputFormat(partition.getSd().getInputFormat());
- partDesc.setOutputFormat(partition.getSd().getOutputFormat());
- partDesc.setNumBuckets(partition.getSd().getNumBuckets());
- partDesc.setCols(partition.getSd().getCols());
- partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib());
- partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters());
- partDesc.setBucketCols(partition.getSd().getBucketCols());
- partDesc.setSortCols(partition.getSd().getSortCols());
- if (tblDesc.isExternal() && !replicationSpec().isMigratingToExternalTable()) {
- // we have to provide the source location so target location can be derived.
- partDesc.setLocation(partition.getSd().getLocation());
- } else {
+ Map<String, String> partitionSpec = EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues());
+
+ StorageDescriptor sd = partition.getSd();
+ String location = sd.getLocation();
+ if (!tblDesc.isExternal() || replicationSpec().isMigratingToExternalTable()) {
/**
* this is required for file listing of all files in a partition for managed table as described in
* {@link org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator}
*/
- partDesc.setLocation(new Path(fromPath,
- Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+ location = new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString();
}
- partsDesc.setReplicationSpec(replicationSpec());
+ ColumnStatistics columnStatistics = null;
+ long writeId = -1;
if (partition.isSetColStats()) {
ColumnStatistics colStats = partition.getColStats();
ColumnStatisticsDesc colStatsDesc = new ColumnStatisticsDesc(colStats.getStatsDesc());
colStatsDesc.setTableName(tblDesc.getTableName());
colStatsDesc.setDbName(tblDesc.getDatabaseName());
- partDesc.setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj(), colStats.getEngine()));
- long writeId = replicationSpec().isMigratingToTxnTable() ?
+ columnStatistics = new ColumnStatistics(colStatsDesc, colStats.getStatsObj(), colStats.getEngine());
+ writeId = replicationSpec().isMigratingToTxnTable() ?
ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID : partition.getWriteId();
- partDesc.setWriteId(writeId);
}
- return partsDesc;
+
+ AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc(
+ partitionSpec, location, partition.getParameters(), sd.getInputFormat(), sd.getOutputFormat(),
+ sd.getNumBuckets(), sd.getCols(), sd.getSerdeInfo().getSerializationLib(), sd.getSerdeInfo().getParameters(),
+ sd.getBucketCols(), sd.getSortCols(), columnStatistics, writeId);
+
+ AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(tblDesc.getDatabaseName(),
+ tblDesc.getTableName(), true, ImmutableList.of(partitionDesc));
+ addPartitionDesc.setReplicationSpec(replicationSpec());
+ return addPartitionDesc;
} catch (Exception e) {
throw new SemanticException(e);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index c728e2d..40020ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -182,7 +182,7 @@ public class LoadPartitions {
*/
private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
throws MetaException, HiveException {
- AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0);
+ AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0);
Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
partSpec.setLocation(replicaWarehousePartitionLocation.toString());
@@ -362,7 +362,7 @@ public class LoadPartitions {
boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null);
Map<String, String> lastReplicatedPartSpec = null;
if (!encounteredTheLastReplicatedPartition) {
- lastReplicatedPartSpec = lastPartitionReplicated.getPartition(0).getPartSpec();
+ lastReplicatedPartSpec = lastPartitionReplicated.getPartitions().get(0).getPartSpec();
LOG.info("Start processing from partition info spec : {}",
StringUtils.mapToString(lastReplicatedPartSpec));
}
@@ -370,13 +370,13 @@ public class LoadPartitions {
Iterator<AlterTableAddPartitionDesc> partitionIterator = event.partitionDescriptions(tableDesc).iterator();
while (!encounteredTheLastReplicatedPartition && partitionIterator.hasNext()) {
AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next();
- Map<String, String> currentSpec = addPartitionDesc.getPartition(0).getPartSpec();
+ Map<String, String> currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec();
encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec);
}
while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) {
AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next();
- Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
+ Map<String, String> partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec();
Task<?> ptnRootTask = null;
ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec);
switch (loadPtnType) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 0ea516f..0730ca6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -85,7 +85,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
@@ -115,7 +114,6 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc;
import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableDropPartitionDesc;
import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
@@ -135,7 +133,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -3028,143 +3025,46 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
- public List<Partition> createPartitions(AlterTableAddPartitionDesc addPartitionDesc) throws HiveException {
- // TODO: catalog name everywhere in this method
- Table tbl = getTable(addPartitionDesc.getDbName(), addPartitionDesc.getTableName());
- int size = addPartitionDesc.getPartitionCount();
- List<org.apache.hadoop.hive.metastore.api.Partition> in =
- new ArrayList<org.apache.hadoop.hive.metastore.api.Partition>(size);
- long writeId;
- String validWriteIdList;
-
- // In case of replication, get the writeId from the source and use valid write Id list
- // for replication.
- if (addPartitionDesc.getReplicationSpec().isInReplicationScope() &&
- addPartitionDesc.getPartition(0).getWriteId() > 0) {
- writeId = addPartitionDesc.getPartition(0).getWriteId();
- // We need a valid writeId list for a transactional change. During replication we do not
- // have a valid writeId list which was used for this on the source. But we know for sure
- // that the writeId associated with it was valid then (otherwise the change would have
- // failed on the source). So use a valid transaction list with only that writeId.
- validWriteIdList = new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
- tbl.getTableName()),
- new long[0], new BitSet(), writeId).writeToString();
- } else {
- AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
- if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) {
- writeId = tableSnapshot.getWriteId();
- validWriteIdList = tableSnapshot.getValidWriteIdList();
- } else {
- writeId = -1;
- validWriteIdList = null;
- }
- }
- for (int i = 0; i < size; ++i) {
- org.apache.hadoop.hive.metastore.api.Partition tmpPart =
- convertAddSpecToMetaPartition(tbl, addPartitionDesc.getPartition(i), conf);
- if (tmpPart != null && writeId > 0) {
- tmpPart.setWriteId(writeId);
- }
- in.add(tmpPart);
- }
- List<Partition> out = new ArrayList<Partition>();
+ public List<org.apache.hadoop.hive.metastore.api.Partition> addPartition(
+ List<org.apache.hadoop.hive.metastore.api.Partition> partitions, boolean ifNotExists, boolean needResults)
+ throws HiveException {
try {
- if (!addPartitionDesc.getReplicationSpec().isInReplicationScope()){
- // TODO: normally, the result is not necessary; might make sense to pass false
- for (org.apache.hadoop.hive.metastore.api.Partition outPart
- : getMSC().add_partitions(in, addPartitionDesc.isIfNotExists(), true)) {
- out.add(new Partition(tbl, outPart));
- }
- } else {
-
- // For replication add-ptns, we need to follow a insert-if-not-exist, alter-if-exists scenario.
- // TODO : ideally, we should push this mechanism to the metastore, because, otherwise, we have
- // no choice but to iterate over the partitions here.
-
- List<org.apache.hadoop.hive.metastore.api.Partition> partsToAdd = new ArrayList<>();
- List<org.apache.hadoop.hive.metastore.api.Partition> partsToAlter = new ArrayList<>();
- List<String> part_names = new ArrayList<>();
- for (org.apache.hadoop.hive.metastore.api.Partition p: in){
- part_names.add(Warehouse.makePartName(tbl.getPartitionKeys(), p.getValues()));
- try {
- org.apache.hadoop.hive.metastore.api.Partition ptn =
- getMSC().getPartition(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), p.getValues());
- if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn.getParameters())){
- ReplicationSpec.copyLastReplId(ptn.getParameters(), p.getParameters());
- partsToAlter.add(p);
- } // else ptn already exists, but we do nothing with it.
- } catch (NoSuchObjectException nsoe){
- // if the object does not exist, we want to add it.
- partsToAdd.add(p);
- }
- }
- for (org.apache.hadoop.hive.metastore.api.Partition outPart
- : getMSC().add_partitions(partsToAdd, addPartitionDesc.isIfNotExists(), true)) {
- out.add(new Partition(tbl, outPart));
- }
- EnvironmentContext ec = new EnvironmentContext();
- // In case of replication, statistics is obtained from the source, so do not update those
- // on replica.
- ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
- getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),
- partsToAlter, ec, validWriteIdList, writeId);
-
- for ( org.apache.hadoop.hive.metastore.api.Partition outPart :
- getMSC().getPartitionsByNames(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),part_names)){
- out.add(new Partition(tbl,outPart));
- }
- }
+ return getMSC().add_partitions(partitions, ifNotExists, needResults);
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
}
- return out;
}
- public static org.apache.hadoop.hive.metastore.api.Partition convertAddSpecToMetaPartition(
- Table tbl, AlterTableAddPartitionDesc.PartitionDesc addSpec, final HiveConf conf) throws HiveException {
- Path location = addSpec.getLocation() != null
- ? new Path(tbl.getPath(), addSpec.getLocation()) : null;
- if (location != null) {
- // Ensure that it is a full qualified path (in most cases it will be since tbl.getPath() is full qualified)
- location = new Path(Utilities.getQualifiedPath(conf, location));
- }
- org.apache.hadoop.hive.metastore.api.Partition part =
- Partition.createMetaPartitionObject(tbl, addSpec.getPartSpec(), location);
- if (addSpec.getPartParams() != null) {
- part.setParameters(addSpec.getPartParams());
- }
- if (addSpec.getInputFormat() != null) {
- part.getSd().setInputFormat(addSpec.getInputFormat());
- }
- if (addSpec.getOutputFormat() != null) {
- part.getSd().setOutputFormat(addSpec.getOutputFormat());
- }
- if (addSpec.getNumBuckets() != -1) {
- part.getSd().setNumBuckets(addSpec.getNumBuckets());
- }
- if (addSpec.getCols() != null) {
- part.getSd().setCols(addSpec.getCols());
- }
- if (addSpec.getSerializationLib() != null) {
- part.getSd().getSerdeInfo().setSerializationLib(addSpec.getSerializationLib());
- }
- if (addSpec.getSerdeParams() != null) {
- part.getSd().getSerdeInfo().setParameters(addSpec.getSerdeParams());
- }
- if (addSpec.getBucketCols() != null) {
- part.getSd().setBucketCols(addSpec.getBucketCols());
+ public org.apache.hadoop.hive.metastore.api.Partition getPartition(String dbName, String tableName,
+ List<String> params) throws HiveException {
+ try {
+ return getMSC().getPartition(dbName, tableName, params);
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
}
- if (addSpec.getSortCols() != null) {
- part.getSd().setSortCols(addSpec.getSortCols());
+ }
+
+ public void alterPartitions(String dbName, String tableName,
+ List<org.apache.hadoop.hive.metastore.api.Partition> partitions, EnvironmentContext ec, String validWriteIdList,
+ long writeId) throws HiveException {
+ try {
+ getMSC().alter_partitions(dbName, tableName, partitions, ec, validWriteIdList, writeId);
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
}
- if (addSpec.getColStats() != null) {
- part.setColStats(addSpec.getColStats());
- // Statistics will have an associated write Id for a transactional table. We need it to
- // update column statistics.
- part.setWriteId(addSpec.getWriteId());
+ }
+
+ public List<org.apache.hadoop.hive.metastore.api.Partition> getPartitionsByNames(String dbName, String tableName,
+ List<String> partitionNames) throws HiveException {
+ try {
+ return getMSC().getPartitionsByNames(dbName, tableName, partitionNames);
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
}
- return part;
}
public Partition getPartition(Table tbl, Map<String, String> partSpec,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 99ce46e..3235024 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -146,7 +146,6 @@ import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableSetSerdePropsDesc;
import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableSetSkewedLocationDesc;
import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableSkewedByDesc;
import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableUnarchiveDesc;
-import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc.PartitionDesc;
import org.apache.hadoop.hive.ql.ddl.view.AlterMaterializedViewRewriteDesc;
import org.apache.hadoop.hive.ql.ddl.view.DropMaterializedViewDesc;
import org.apache.hadoop.hive.ql.ddl.view.DropViewDesc;
@@ -3509,15 +3508,15 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
// ^(TOK_ALTERTABLE_ADDPARTS identifier ifNotExists? alterStatementSuffixAddPartitionsElement+)
boolean ifNotExists = ast.getChild(0).getType() == HiveParser.TOK_IFNOTEXISTS;
- Table tab = getTable(qualified);
- boolean isView = tab.isView();
- validateAlterTableType(tab, AlterTableType.ADDPARTITION, expectView);
- outputs.add(new WriteEntity(tab,
+ Table table = getTable(qualified);
+ boolean isView = table.isView();
+ validateAlterTableType(table, AlterTableType.ADDPARTITION, expectView);
+ outputs.add(new WriteEntity(table,
/*use DDL_EXCLUSIVE to cause X lock to prevent races between concurrent add partition calls
with IF NOT EXISTS. w/o this 2 concurrent calls to add the same partition may both add
data since for transactional tables creating partition metadata and moving data there are
2 separate actions. */
- ifNotExists && AcidUtils.isTransactionalTable(tab) ? WriteType.DDL_EXCLUSIVE
+ ifNotExists && AcidUtils.isTransactionalTable(table) ? WriteType.DDL_EXCLUSIVE
: WriteEntity.WriteType.DDL_SHARED));
int numCh = ast.getChildCount();
@@ -3526,17 +3525,17 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
String currentLocation = null;
Map<String, String> currentPart = null;
// Parser has done some verification, so the order of tokens doesn't need to be verified here.
- AlterTableAddPartitionDesc addPartitionDesc =
- new AlterTableAddPartitionDesc(tab.getDbName(), tab.getTableName(), ifNotExists);
+
+ List<AlterTableAddPartitionDesc.PartitionDesc> partitions = new ArrayList<>();
for (int num = start; num < numCh; num++) {
ASTNode child = (ASTNode) ast.getChild(num);
switch (child.getToken().getType()) {
case HiveParser.TOK_PARTSPEC:
if (currentPart != null) {
- addPartitionDesc.addPartition(currentPart, currentLocation);
+ partitions.add(createPartitionDesc(table, currentLocation, currentPart));
currentLocation = null;
}
- currentPart = getValidatedPartSpec(tab, child, conf, true);
+ currentPart = getValidatedPartSpec(table, child, conf, true);
validatePartitionValues(currentPart); // validate reserved values
break;
case HiveParser.TOK_PARTITIONLOCATION:
@@ -3554,31 +3553,21 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
// add the last one
if (currentPart != null) {
- addPartitionDesc.addPartition(currentPart, currentLocation);
+ partitions.add(createPartitionDesc(table, currentLocation, currentPart));
}
- if (this.conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
- for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) {
- PartitionDesc desc = addPartitionDesc.getPartition(index);
- if (desc.getLocation() == null) {
- if (desc.getPartParams() == null) {
- desc.setPartParams(new HashMap<String, String>());
- }
- StatsSetupConst.setStatsStateForCreateTable(desc.getPartParams(),
- MetaStoreUtils.getColumnNames(tab.getCols()), StatsSetupConst.TRUE);
- }
- }
- }
-
- if (addPartitionDesc.getPartitionCount() == 0) {
+ if (partitions.isEmpty()) {
// nothing to do
return;
}
+ AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(table.getDbName(),
+ table.getTableName(), ifNotExists, partitions);
+
Task<DDLWork> ddlTask =
TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc));
rootTasks.add(ddlTask);
- handleTransactionalTable(tab, addPartitionDesc, ddlTask);
+ handleTransactionalTable(table, addPartitionDesc, ddlTask);
if (isView) {
// Compile internal query to capture underlying table partition dependencies
@@ -3589,8 +3578,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
cmd.append(HiveUtils.unparseIdentifier(qualified[1]));
cmd.append(" WHERE ");
boolean firstOr = true;
- for (int i = 0; i < addPartitionDesc.getPartitionCount(); ++i) {
- AlterTableAddPartitionDesc.PartitionDesc partitionDesc = addPartitionDesc.getPartition(i);
+ for (AlterTableAddPartitionDesc.PartitionDesc partitionDesc : partitions) {
if (firstOr) {
firstOr = false;
} else {
@@ -3623,6 +3611,17 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
+ private AlterTableAddPartitionDesc.PartitionDesc createPartitionDesc(Table table, String currentLocation,
+ Map<String, String> currentPart) {
+ Map<String, String> params = null;
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER) && currentLocation == null) {
+ params = new HashMap<String, String>();
+ StatsSetupConst.setStatsStateForCreateTable(params,
+ MetaStoreUtils.getColumnNames(table.getCols()), StatsSetupConst.TRUE);
+ }
+ return new AlterTableAddPartitionDesc.PartitionDesc(currentPart, currentLocation, params);
+ }
+
/**
* Add partition for Transactional tables needs to add (copy/rename) the data so that it lands
* in a delta_x_x/ folder in the partition dir.
@@ -3635,13 +3634,12 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
Long writeId = null;
int stmtId = 0;
- for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) {
- PartitionDesc desc = addPartitionDesc.getPartition(index);
- if (desc.getLocation() != null) {
- AcidUtils.validateAcidPartitionLocation(desc.getLocation(), conf);
+ for (AlterTableAddPartitionDesc.PartitionDesc partitonDesc : addPartitionDesc.getPartitions()) {
+ if (partitonDesc.getLocation() != null) {
+ AcidUtils.validateAcidPartitionLocation(partitonDesc.getLocation(), conf);
if(addPartitionDesc.isIfNotExists()) {
//Don't add partition data if it already exists
- Partition oldPart = getPartition(tab, desc.getPartSpec(), false);
+ Partition oldPart = getPartition(tab, partitonDesc.getPartSpec(), false);
if(oldPart != null) {
continue;
}
@@ -3657,15 +3655,15 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
}
stmtId = getTxnMgr().getStmtIdAndIncrement();
}
- LoadTableDesc loadTableWork = new LoadTableDesc(new Path(desc.getLocation()),
- Utilities.getTableDesc(tab), desc.getPartSpec(),
+ LoadTableDesc loadTableWork = new LoadTableDesc(new Path(partitonDesc.getLocation()),
+ Utilities.getTableDesc(tab), partitonDesc.getPartSpec(),
LoadTableDesc.LoadFileType.KEEP_EXISTING, //not relevant - creating new partition
writeId);
loadTableWork.setStmtId(stmtId);
loadTableWork.setInheritTableSpecs(true);
try {
- desc.setLocation(new Path(tab.getDataLocation(),
- Warehouse.makePartPath(desc.getPartSpec())).toString());
+ partitonDesc.setLocation(new Path(tab.getDataLocation(),
+ Warehouse.makePartPath(partitonDesc.getPartSpec())).toString());
}
catch (MetaException ex) {
throw new SemanticException("Could not determine partition path due to: "
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 687122a..e955989 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -70,6 +71,9 @@ import org.apache.hadoop.mapred.OutputFormat;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import java.io.IOException;
@@ -322,10 +326,17 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf()));
}
- if ((parsedTableName != null) && (!parsedTableName.isEmpty())) {
+ if (StringUtils.isNotBlank(parsedTableName)) {
tblDesc.setTableName(parsedTableName);
}
+ if (tblDesc.getTableName() == null) {
+ // Either we got the tablename from the IMPORT statement (first priority) or from the export dump.
+ throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
+ } else {
+ x.getConf().set("import.destination.table", tblDesc.getTableName());
+ }
+
List<AlterTableAddPartitionDesc> partitionDescs = new ArrayList<>();
Iterable<Partition> partitions = rv.getPartitions();
for (Partition partition : partitions) {
@@ -343,7 +354,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
for (Iterator<AlterTableAddPartitionDesc> partnIter = partitionDescs
.listIterator(); partnIter.hasNext(); ) {
AlterTableAddPartitionDesc addPartitionDesc = partnIter.next();
- if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
+ if (!found && addPartitionDesc.getPartitions().get(0).getPartSpec().equals(parsedPartSpec)) {
found = true;
} else {
partnIter.remove();
@@ -356,17 +367,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- if (tblDesc.getTableName() == null) {
- // Either we got the tablename from the IMPORT statement (first priority)
- // or from the export dump.
- throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
- } else {
- x.getConf().set("import.destination.table", tblDesc.getTableName());
- for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) {
- addPartitionDesc.setTableName(tblDesc.getTableName());
- }
- }
-
Warehouse wh = new Warehouse(x.getConf());
Table table = tableIfExists(tblDesc, x.getHive());
boolean tableExists = false;
@@ -410,36 +410,32 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return tableExists;
}
- private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition(
- Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition,
- ReplicationSpec replicationSpec, HiveConf conf)
+ private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition(Path fromPath, String dbName,
+ ImportTableDesc tblDesc, Partition partition, ReplicationSpec replicationSpec, HiveConf conf)
throws MetaException, SemanticException {
- AlterTableAddPartitionDesc partsDesc = new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(),
- EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
- partition.getSd().getLocation(), partition.getParameters());
- AlterTableAddPartitionDesc.PartitionDesc partDesc = partsDesc.getPartition(0);
- partDesc.setInputFormat(partition.getSd().getInputFormat());
- partDesc.setOutputFormat(partition.getSd().getOutputFormat());
- partDesc.setNumBuckets(partition.getSd().getNumBuckets());
- partDesc.setCols(partition.getSd().getCols());
- partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib());
- partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters());
- partDesc.setBucketCols(partition.getSd().getBucketCols());
- partDesc.setSortCols(partition.getSd().getSortCols());
- if (replicationSpec.isInReplicationScope() && tblDesc.isExternal()
- && !replicationSpec.isMigratingToExternalTable()) {
- String newLocation = ReplExternalTables
- .externalTableLocation(conf, partition.getSd().getLocation());
- LOG.debug("partition {} has data location: {}", partition, newLocation);
- partDesc.setLocation(newLocation);
+ Map<String, String> partitionSpec = EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues());
+
+ StorageDescriptor sd = partition.getSd();
+
+ String location = null;
+ if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() &&
+ !replicationSpec.isMigratingToExternalTable()) {
+ location = ReplExternalTables.externalTableLocation(conf, partition.getSd().getLocation());
+ LOG.debug("partition {} has data location: {}", partition, location);
} else {
- partDesc.setLocation(new Path(fromPath,
- Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+ location = new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString();
}
+
+ long writeId = -1;
if (tblDesc.getReplWriteId() != null) {
- partDesc.setWriteId(tblDesc.getReplWriteId());
+ writeId = tblDesc.getReplWriteId();
}
- return partsDesc;
+
+ AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc(
+ partitionSpec, location, partition.getParameters(), sd.getInputFormat(), sd.getOutputFormat(),
+ sd.getNumBuckets(), sd.getCols(), sd.getSerdeInfo().getSerializationLib(), sd.getSerdeInfo().getParameters(),
+ sd.getBucketCols(), sd.getSortCols(), null, writeId);
+ return new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(), true, ImmutableList.of(partitionDesc));
}
private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName,
@@ -566,11 +562,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
ImportTableDesc tblDesc, Table table, Warehouse wh, AlterTableAddPartitionDesc addPartitionDesc,
ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException {
- addPartitionDesc.setReplaceMode(true);
if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) {
addPartitionDesc.setReplicationSpec(replicationSpec);
}
- AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0);
+ AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0);
if (ptn == null) {
fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
} else if (!externalTablePartition(tblDesc, replicationSpec)) {
@@ -584,7 +579,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
ReplicationSpec replicationSpec,
EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
throws MetaException, IOException, HiveException {
- AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0);
+ AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0);
boolean isAutoPurge = false;
boolean needRecycle = false;
boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();
@@ -1038,7 +1033,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getLOG().debug("table partitioned");
for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) {
- Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
+ Map<String, String> partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
x.getTasks().add(addSinglePartition(
@@ -1235,7 +1230,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
if (updatedMetadata != null) {
updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
- addPartitionDesc.getPartition(0).getPartSpec());
+ addPartitionDesc.getPartitions().get(0).getPartSpec());
}
}
} else if (!replicationSpec.isMetadataOnly()
@@ -1289,7 +1284,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getLOG().debug("table partitioned");
for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) {
addPartitionDesc.setReplicationSpec(replicationSpec);
- Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
+ Map<String, String> partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
if (isOldTableValid) {
// If existing table is valid but the partition spec is different, then ignore partition
@@ -1308,15 +1303,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getTasks().add(addSinglePartition(
tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
if (updatedMetadata != null) {
- updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
- addPartitionDesc.getPartition(0).getPartSpec());
+ updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec);
}
} else {
x.getTasks().add(alterSinglePartition(
tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x));
if (updatedMetadata != null) {
- updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
- addPartitionDesc.getPartition(0).getPartSpec());
+ updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec);
}
}
} else {
@@ -1331,8 +1324,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
}
if (updatedMetadata != null) {
- updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
- addPartitionDesc.getPartition(0).getPartSpec());
+ updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec);
}
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
diff --git a/ql/src/test/results/clientpositive/add_part_multiple.q.out b/ql/src/test/results/clientpositive/add_part_multiple.q.out
index 81454f7..8fbbe2a 100644
--- a/ql/src/test/results/clientpositive/add_part_multiple.q.out
+++ b/ql/src/test/results/clientpositive/add_part_multiple.q.out
@@ -30,8 +30,22 @@ STAGE DEPENDENCIES:
STAGE PLANS:
Stage: Stage-0
Add Partition
-#### A masked pattern was here ####
- Spec: {ds=2010-01-01}, {ds=2010-02-01}, {ds=2010-03-01}, {ds=2010-04-01}
+ db name: default
+ partitions:
+ Partition
+ location: A
+ partition spec: {ds=2010-01-01}
+ Partition
+ location: B
+ partition spec: {ds=2010-02-01}
+ Partition
+ params: {totalSize=0, numRows=0, rawDataSize=0, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}}, numFiles=0, numFilesErasureCoded=0}
+ partition spec: {ds=2010-03-01}
+ Partition
+ location: C
+ partition spec: {ds=2010-04-01}
+ table name: add_part_test_n1
+ if not exists: true
PREHOOK: query: ALTER TABLE add_part_test_n1 ADD IF NOT EXISTS
PARTITION (ds='2010-01-01') location 'A'
diff --git a/ql/src/test/results/clientpositive/drop_partitions_filter.q.out b/ql/src/test/results/clientpositive/drop_partitions_filter.q.out
index 457fa0b..edfbbf7 100644
--- a/ql/src/test/results/clientpositive/drop_partitions_filter.q.out
+++ b/ql/src/test/results/clientpositive/drop_partitions_filter.q.out
@@ -33,8 +33,12 @@ STAGE DEPENDENCIES:
STAGE PLANS:
Stage: Stage-0
Add Partition
-#### A masked pattern was here ####
- Spec: {c=US, d=1}
+ db name: default
+ partitions:
+ Partition
+ params: {totalSize=0, numRows=0, rawDataSize=0, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true"}}, numFiles=0, numFilesErasureCoded=0}
+ partition spec: {c=US, d=1}
+ table name: ptestfilter_n1
PREHOOK: query: alter table ptestfilter_n1 add partition (c='US', d=1)
PREHOOK: type: ALTERTABLE_ADDPARTS
diff --git a/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out b/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out
index b307c02..1c58917 100644
--- a/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out
+++ b/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out
@@ -31,8 +31,12 @@ STAGE DEPENDENCIES:
STAGE PLANS:
Stage: Stage-0
Add Partition
-#### A masked pattern was here ####
- Spec: {day=20110102}
+ db name: default
+ partitions:
+ Partition
+ location: hdfs://### HDFS PATH ###
+ partition spec: {day=20110102}
+ table name: supply
Stage: Stage-1
Move Operator
diff --git a/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out b/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out
index 81454f7..8fbbe2a 100644
--- a/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out
+++ b/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out
@@ -30,8 +30,22 @@ STAGE DEPENDENCIES:
STAGE PLANS:
Stage: Stage-0
Add Partition
-#### A masked pattern was here ####
- Spec: {ds=2010-01-01}, {ds=2010-02-01}, {ds=2010-03-01}, {ds=2010-04-01}
+ db name: default
+ partitions:
+ Partition
+ location: A
+ partition spec: {ds=2010-01-01}
+ Partition
+ location: B
+ partition spec: {ds=2010-02-01}
+ Partition
+ params: {totalSize=0, numRows=0, rawDataSize=0, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}}, numFiles=0, numFilesErasureCoded=0}
+ partition spec: {ds=2010-03-01}
+ Partition
+ location: C
+ partition spec: {ds=2010-04-01}
+ table name: add_part_test_n1
+ if not exists: true
PREHOOK: query: ALTER TABLE add_part_test_n1 ADD IF NOT EXISTS
PARTITION (ds='2010-01-01') location 'A'
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index f2beafe..f4e71f9 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.thrift.TException;
@@ -437,11 +437,13 @@ public class HiveStreamingConnection implements StreamingConnection {
try {
Map<String, String> partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues);
- AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(database, table, true);
+
+ Path location = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec));
+ location = new Path(Utilities.getQualifiedPath(conf, location));
+ partLocation = location.toString();
partName = Warehouse.makePartName(tableObject.getPartitionKeys(), partitionValues);
- partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString();
- addPartitionDesc.addPartition(partSpec, partLocation);
- Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf);
+ Partition partition =
+ org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(tableObject, partSpec, location);
if (getMSC() == null) {
// We assume it doesn't exist if we can't check it