You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/04/18 01:24:22 UTC
[1/2] hive git commit: HIVE-18739 - Add support for Import/Export
from Acid table (Eugene Koifman, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master bd6b58258 -> 699c5768c
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index a4df509..4b2f961 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -191,8 +189,9 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
checkExpected(rs, expected, "Unexpected row count after ctas from non acid table");
runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values));
+ //todo: try this with acid default - it seem making table acid in listener is too late
runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES ('transactional" +
- "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);//todo: try this with acid default - it seem makeing table acid in listener is too late
+ "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);
rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas2 order by ROW__ID");
String expected2[][] = {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00000"},
@@ -234,7 +233,7 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
/**
* Insert into unbucketed acid table from union all query
- * Union All is flattend so nested subdirs are created and acid move drops them since
+ * Union All is flattened so nested subdirs are created and acid move drops them since
* delta dirs have unique names
*/
@Test
@@ -529,11 +528,26 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
CommandProcessorResponse cpr = runStatementOnDriverNegative("create table myctas " +
"clustered by (a) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') as " +
"select a, b from " + Table.NONACIDORCTBL);
- int j = ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode();//this code doesn't propagate
+ int j = ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(); //this code doesn't propagate
// Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode());
Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support"));
}
/**
+ * Currently CTAS doesn't support partitioned tables. Correspondingly Acid only supports CTAS for
+ * un-partitioned tables. This test is here to make sure that if CTAS is made to support
+ * un-partitioned tables, that it raises a red flag for Acid.
+ */
+ @Test
+ public void testCtasPartitioned() throws Exception {
+ runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + "(a,b) values(1,2),(1,3)");
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("create table myctas partitioned " +
+ "by (b int) stored as " +
+ "ORC TBLPROPERTIES ('transactional'='true') as select a, b from " + Table.NONACIDORCTBL);
+ int j = ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode();//this code doesn't propagate
+ Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support " +
+ "partitioning in the target table"));
+ }
+ /**
* Tests to check that we are able to use vectorized acid reader,
* VectorizedOrcAcidRowBatchReader, when reading "original" files,
* i.e. those that were written before the table was converted to acid.
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 108add0..7ba053d 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2033,7 +2033,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
* @param partition
* @return
*/
- private Partition deepCopy(Partition partition) {
+ protected Partition deepCopy(Partition partition) {
Partition copy = null;
if (partition != null) {
copy = new Partition(partition);
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index c5da7b5..a7acdcb 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -3033,23 +3033,7 @@ public class ObjectStore implements RawStore, Configurable {
throw new NoSuchObjectException(getCatalogQualifiedTableName(catName, dbName, tableName)
+ " table not found");
}
- List<FieldSchema> partCols = table.getPartitionKeys();
- int numPartKeys = partCols.size();
- if (part_vals.size() > numPartKeys) {
- throw new MetaException("Incorrect number of partition values."
- + " numPartKeys=" + numPartKeys + ", part_val=" + part_vals.size());
- }
- partCols = partCols.subList(0, part_vals.size());
- // Construct a pattern of the form: partKey=partVal/partKey2=partVal2/...
- // where partVal is either the escaped partition value given as input,
- // or a regex of the form ".*"
- // This works because the "=" and "/" separating key names and partition key/values
- // are not escaped.
- String partNameMatcher = Warehouse.makePartName(partCols, part_vals, ".*");
- // add ".*" to the regex to match anything else afterwards the partial spec.
- if (part_vals.size() < numPartKeys) {
- partNameMatcher += ".*";
- }
+ String partNameMatcher = MetaStoreUtils.makePartNameMatcher(table, part_vals);
Query query = queryWrapper.query = pm.newQuery(MPartition.class);
StringBuilder queryFilter = new StringBuilder("table.database.name == dbName");
queryFilter.append(" && table.database.catalogName == catName");
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 8ea6051..d022bc0 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -934,6 +934,26 @@ public class MetaStoreUtils {
}
return pvals;
}
+ public static String makePartNameMatcher(Table table, List<String> partVals) throws MetaException {
+ List<FieldSchema> partCols = table.getPartitionKeys();
+ int numPartKeys = partCols.size();
+ if (partVals.size() > numPartKeys) {
+ throw new MetaException("Incorrect number of partition values."
+ + " numPartKeys=" + numPartKeys + ", part_val=" + partVals);
+ }
+ partCols = partCols.subList(0, partVals.size());
+ // Construct a pattern of the form: partKey=partVal/partKey2=partVal2/...
+ // where partVal is either the escaped partition value given as input,
+ // or a regex of the form ".*"
+ // This works because the "=" and "/" separating key names and partition key/values
+ // are not escaped.
+ String partNameMatcher = Warehouse.makePartName(partCols, partVals, ".*");
+ // add ".*" to the regex to match anything else afterwards the partial spec.
+ if (partVals.size() < numPartKeys) {
+ partNameMatcher += ".*";
+ }
+ return partNameMatcher;
+ }
/**
* @param schema1: The first schema to be compared
[2/2] hive git commit: HIVE-18739 - Add support for Import/Export
from Acid table (Eugene Koifman, reviewed by Sergey Shelukhin)
Posted by ek...@apache.org.
HIVE-18739 - Add support for Import/Export from Acid table (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/699c5768
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/699c5768
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/699c5768
Branch: refs/heads/master
Commit: 699c5768c88967abd507122d775bd5955ca45218
Parents: bd6b582
Author: Eugene Koifman <ek...@apache.org>
Authored: Tue Apr 17 18:23:13 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Tue Apr 17 18:23:48 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/common/JavaUtils.java | 3 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +-
.../apache/hadoop/hive/ql/exec/ExportTask.java | 1 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 4 +
.../apache/hadoop/hive/ql/metadata/Hive.java | 9 +-
.../ql/metadata/SessionHiveMetaStoreClient.java | 217 ++++++++
.../hive/ql/parse/BaseSemanticAnalyzer.java | 7 +
.../hive/ql/parse/ExportSemanticAnalyzer.java | 19 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 111 ++--
.../hive/ql/parse/SemanticAnalyzerFactory.java | 3 +
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 258 ++++++++-
.../apache/hadoop/hive/ql/plan/CopyWork.java | 6 +-
.../apache/hadoop/hive/ql/plan/ExportWork.java | 28 +-
.../hadoop/hive/ql/plan/ImportTableDesc.java | 2 +-
.../hadoop/hive/ql/session/SessionState.java | 10 +-
.../hadoop/hive/ql/TestTxnAddPartition.java | 2 +-
.../org/apache/hadoop/hive/ql/TestTxnExIm.java | 538 +++++++++++++++++++
.../apache/hadoop/hive/ql/TestTxnLoadData.java | 2 +-
.../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 24 +-
.../hive/metastore/HiveMetaStoreClient.java | 2 +-
.../hadoop/hive/metastore/ObjectStore.java | 18 +-
.../hive/metastore/utils/MetaStoreUtils.java | 20 +
22 files changed, 1197 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
index 75c07b4..7894ec1 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
@@ -228,10 +228,11 @@ public final class JavaUtils {
@Override
public boolean accept(Path path) {
String name = path.getName();
+ //todo: what if this is a base?
if (!name.startsWith(DELTA_PREFIX + "_")) return false;
String idStr = name.substring(DELTA_PREFIX.length() + 1, DELTA_PREFIX.length() + 1 + DELTA_DIGITS_LEN);
try {
- Long.parseLong(idStr);
+ Long.parseLong(idStr);//what for? sanity check?
} catch (NumberFormatException ex) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 2466da9..c8cb8a4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -3969,7 +3969,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
* how this is desirable.
*
* As of HIVE-14993, WriteEntity with different WriteType must be considered different.
- * So WriteEntity create in DDLTask cause extra output in golden files, but only because
+ * So WriteEntity created in DDLTask cause extra output in golden files, but only because
* DDLTask sets a different WriteType for the same Entity.
*
* In the spirit of bug-for-bug compatibility, this method ensures we only add new
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
index 91af814..aba6591 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
@@ -52,6 +52,7 @@ public class ExportTask extends Task<ExportWork> implements Serializable {
conf, false);
Hive db = getHive();
LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir());
+ work.acidPostProcess(db);
TableExport tableExport = new TableExport(
exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf
);
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2b1960c..4760b85 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1295,6 +1295,10 @@ public class AcidUtils {
!isInsertOnlyTable(table.getParameters());
}
+ public static boolean isFullAcidTable(Map<String, String> params) {
+ return isTransactionalTable(params) && !isInsertOnlyTable(params);
+ }
+
public static boolean isTransactionalTable(org.apache.hadoop.hive.metastore.api.Table table) {
return table != null && table.getParameters() != null &&
isTablePropertyTransactional(table.getParameters());
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 90b6836..009a890 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1727,7 +1727,7 @@ public class Hive {
// to ACID updates. So the are not themselves ACID.
// Note: this assumes both paths are qualified; which they are, currently.
- if (isMmTableWrite && loadPath.equals(newPartPath)) {
+ if ((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath)) {
// MM insert query, move itself is a no-op.
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)");
@@ -2305,7 +2305,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
// Note: this assumes both paths are qualified; which they are, currently.
- if (isMmTable && loadPath.equals(tbl.getPath())) {
+ if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) {
+ /**
+ * some operations on Transactional tables (e.g. Import) write directly to the final location
+ * and avoid the 'move' operation. Since MoveTask does other things, setting 'loadPath' to be
+ * the table/partition path indicates that the 'file move' part of MoveTask is not needed.
+ */
if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) {
Utilities.FILE_OP_LOGGER.debug(
"not moving " + loadPath + " to " + tbl.getPath() + " (MM)");
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index 51df754..d89df48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.TableMeta;
@@ -467,6 +468,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
ss.getTempTables().put(dbName, tables);
}
tables.put(tblName, tTable);
+ createTempTable(tbl);
}
private org.apache.hadoop.hive.metastore.api.Table getTempTable(String dbName, String tableName) {
@@ -655,6 +657,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
throw new MetaException(
"Could not find temp table entry for " + StatsUtils.getFullyQualifiedTableName(dbName, tableName));
}
+ removeTempTable(table);
// Delete table data
if (deleteData && !MetaStoreUtils.isExternalTable(table)) {
@@ -788,4 +791,218 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
}
return true;
}
+
+ /**
+ * This stores partition information for a temp table.
+ */
+ public static final class TempTable {
+ private final org.apache.hadoop.hive.metastore.api.Table tTable;
+ private final PartitionTree pTree;
+ TempTable(org.apache.hadoop.hive.metastore.api.Table t) {
+ assert t != null;
+ this.tTable = t;
+ pTree = t.getPartitionKeysSize() > 0 ? new PartitionTree(tTable) : null;
+ }
+ private void addPartition(Partition p) throws AlreadyExistsException, MetaException {
+ assertPartitioned();
+ pTree.addPartition(p);
+ }
+ private Partition getPartition(String partName) throws MetaException {
+ assertPartitioned();
+ return pTree.getPartition(partName);
+ }
+ private List<Partition> getPartitions(List<String> partialPartVals) throws MetaException {
+ assertPartitioned();
+ return pTree.getPartitions(partialPartVals);
+ }
+ private void assertPartitioned() throws MetaException {
+ if(tTable.getPartitionKeysSize() <= 0) {
+ throw new MetaException(Warehouse.getQualifiedName(tTable) + " is not partitioned");
+ }
+ }
+
+ /**
+ * Always clone objects before adding or returning them so that callers don't modify them
+ * via references.
+ */
+ private static final class PartitionTree {
+ private final Map<String, Partition> parts = new HashMap<>();
+ private final org.apache.hadoop.hive.metastore.api.Table tTable;
+
+ private PartitionTree(org.apache.hadoop.hive.metastore.api.Table t) {
+ this.tTable = t;
+ }
+ private void addPartition(Partition p) throws AlreadyExistsException, MetaException {
+ String partName = Warehouse.makePartName(tTable.getPartitionKeys(), p.getValues());
+ if(parts.putIfAbsent(partName, p) != null) {
+ throw new AlreadyExistsException("Partition " + partName + " already exists");
+ }
+ }
+ /**
+ * @param partName - "p=1/q=2" full partition name {@link Warehouse#makePartName(List, List)}
+ * @return null if doesn't exist
+ */
+ private Partition getPartition(String partName) {
+ return parts.get(partName);
+ }
+ /**
+ * Provided values for the 1st N partition columns, will return all matching PartitionS
+ * The list is a partial list of partition values in the same order as partition columns.
+ * Missing values should be represented as "" (empty strings). May provide fewer values.
+ * So if part cols are a,b,c, {"",2} is a valid list
+ * {@link MetaStoreUtils#getPvals(List, Map)}
+ *
+ */
+ private List<Partition> getPartitions(List<String> partialPartVals) throws MetaException {
+ String partNameMatcher = MetaStoreUtils.makePartNameMatcher(tTable, partialPartVals);
+ List<Partition> matchedPartitions = new ArrayList<>();
+ for(String key : parts.keySet()) {
+ if(key.matches(partNameMatcher)) {
+ matchedPartitions.add(parts.get(key));
+ }
+ }
+ return matchedPartitions;
+ }
+ }
+ }
+ /**
+ * Loading Dynamic Partitons calls this.
+ * Hive.loadPartition() calls this which in turn can be called from Hive.loadDynamicPartitions()
+ * among others
+ * @param partition
+ * The partition to add
+ * @return the partition added
+ */
+ @Override
+ public org.apache.hadoop.hive.metastore.api.Partition add_partition(
+ org.apache.hadoop.hive.metastore.api.Partition partition) throws TException {
+ // First try temp table
+ org.apache.hadoop.hive.metastore.api.Table table =
+ getTempTable(partition.getDbName(), partition.getTableName());
+ if (table == null) {
+ //(assume) not a temp table - Try underlying client
+ return super.add_partition(partition);
+ }
+ TempTable tt = getTempTable(table);
+ if(tt == null) {
+ throw new IllegalStateException("TempTable not found for " +
+ Warehouse.getQualifiedName(table));
+ }
+ tt.addPartition(deepCopy(partition));
+ return partition;
+ }
+ /**
+ * @param partialPvals partition values, can be partial. This really means that missing values
+ * are represented by empty str.
+ * @param maxParts maximum number of partitions to fetch, or -1 for all
+ */
+ @Override
+ public List<Partition> listPartitionsWithAuthInfo(String dbName,
+ String tableName, List<String> partialPvals, short maxParts, String userName,
+ List<String> groupNames) throws TException {
+ org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbName, tableName);
+ if (table == null) {
+ //(assume) not a temp table - Try underlying client
+ return super.listPartitionsWithAuthInfo(dbName, tableName, partialPvals, maxParts, userName,
+ groupNames);
+ }
+ TempTable tt = getTempTable(table);
+ if(tt == null) {
+ throw new IllegalStateException("TempTable not found for " +
+ Warehouse.getQualifiedName(table));
+ }
+ List<Partition> parts = tt.getPartitions(partialPvals);
+ List<Partition> matchedParts = new ArrayList<>();
+ for(int i = 0; i < (maxParts <= 0 ? parts.size() : maxParts); i++) {
+ matchedParts.add(deepCopy(parts.get(i)));
+ }
+ return matchedParts;
+ }
+
+ /**
+ * Returns a list of partition names, i.e. "p=1/q=2" type strings. The values (RHS of =) are
+ * escaped.
+ */
+ @Override
+ public List<String> listPartitionNames(String dbName, String tableName,
+ short maxParts) throws TException {
+ org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbName, tableName);
+ if (table == null) {
+ //(assume) not a temp table - Try underlying client
+ return super.listPartitionNames(dbName, tableName, maxParts);
+ }
+ TempTable tt = getTempTable(table);
+ if(tt == null) {
+ throw new IllegalStateException("TempTable not found for " +
+ Warehouse.getQualifiedName(table));
+ }
+ List<String> partVals = new ArrayList<>();
+ partVals.add(""); //to get all partitions
+ List<Partition> parts = tt.getPartitions(partVals);
+ List<String> matchedParts = new ArrayList<>();
+ for(int i = 0; i < (maxParts <= 0 ? parts.size() : maxParts); i++) {
+ matchedParts.add(
+ Warehouse.makePartName(tt.tTable.getPartitionKeys(), parts.get(i).getValues()));
+ }
+ return matchedParts;
+ }
+ /**
+ * partNames are like "p=1/q=2" type strings. The values (RHS of =) are escaped.
+ */
+ @Override
+ public List<Partition> getPartitionsByNames(String db_name, String tblName,
+ List<String> partNames) throws TException {
+ org.apache.hadoop.hive.metastore.api.Table table = getTempTable(db_name, tblName);
+ if (table == null) {
+ //(assume) not a temp table - Try underlying client
+ return super.getPartitionsByNames(db_name, tblName, partNames);
+ }
+ TempTable tt = getTempTable(table);
+ if(tt == null) {
+ throw new IllegalStateException("TempTable not found for " + tblName);
+ }
+ List<Partition> matchedParts = new ArrayList<>();
+ for(String partName : partNames) {
+ Partition p = tt.getPartition(partName);
+ if(p != null) {
+ matchedParts.add(deepCopy(p));
+ }
+ }
+ return matchedParts;
+ }
+
+ private static TempTable getTempTable(org.apache.hadoop.hive.metastore.api.Table t) {
+ String qualifiedTableName = Warehouse.
+ getQualifiedName(t.getDbName().toLowerCase(), t.getTableName().toLowerCase());
+ SessionState ss = SessionState.get();
+ if (ss == null) {
+ LOG.debug("No current SessionState, skipping temp partitions");
+ return null;
+ }
+ return ss.getTempPartitions().get(qualifiedTableName);
+ }
+ private static void removeTempTable(org.apache.hadoop.hive.metastore.api.Table t) {
+ SessionState ss = SessionState.get();
+ if (ss == null) {
+ LOG.debug("No current SessionState, skipping temp partitions");
+ return;
+ }
+ ss.getTempPartitions().remove(Warehouse.getQualifiedName(t));
+ }
+ private static void createTempTable(org.apache.hadoop.hive.metastore.api.Table t) {
+ if(t.getPartitionKeysSize() <= 0) {
+ //do nothing as it's not a partitioned table
+ return;
+ }
+ SessionState ss = SessionState.get();
+ if (ss == null) {
+ LOG.debug("No current SessionState, skipping temp partitions");
+ return;
+ }
+ TempTable tt = new TempTable(t);
+ String qualifiedName = Warehouse.getQualifiedName(t);
+ if(ss.getTempPartitions().putIfAbsent(qualifiedName, tt) != null) {
+ throw new IllegalStateException("TempTable for " + qualifiedName + " already exists");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 59130ca..85d1cff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -1423,11 +1423,18 @@ public abstract class BaseSemanticAnalyzer {
public TableSpec(Hive db, String tableName, Map<String, String> partSpec)
throws HiveException {
+ this(db, tableName, partSpec, false);
+ }
+ public TableSpec(Hive db, String tableName, Map<String, String> partSpec, boolean allowPartialPartitionsSpec)
+ throws HiveException {
Table table = db.getTable(tableName);
tableHandle = table;
this.tableName = table.getDbName() + "." + table.getTableName();
if (partSpec == null) {
specType = SpecType.TABLE_ONLY;
+ } else if(allowPartialPartitionsSpec) {
+ partitions = db.getPartitions(table, partSpec);
+ specType = SpecType.STATIC_PARTITION;
} else {
Partition partition = db.getPartition(table, partSpec, false);
if (partition == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 33f426c..d3c62a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -20,15 +20,22 @@ package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
import org.apache.hadoop.hive.ql.plan.ExportWork;
+import javax.annotation.Nullable;
+import java.util.Set;
+
/**
* ExportSemanticAnalyzer.
*
@@ -41,6 +48,13 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
@Override
public void analyzeInternal(ASTNode ast) throws SemanticException {
+ rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs));
+ }
+ /**
+ * @param acidTableName - table name in db.table format; not NULL if exporting Acid table
+ */
+ static Task<ExportWork> analyzeExport(ASTNode ast, @Nullable String acidTableName, Hive db,
+ HiveConf conf, Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
Tree tableTree = ast.getChild(0);
Tree toTree = ast.getChild(1);
@@ -94,9 +108,8 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
String exportRootDirName = tmpPath;
// Configure export work
ExportWork exportWork =
- new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast));
+ new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName);
// Create an export task and add it as a root task
- Task<ExportWork> exportTask = TaskFactory.get(exportWork);
- rootTasks.add(exportTask);
+ return TaskFactory.get(exportWork);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 2fbe46d..ac44be5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -249,15 +249,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
} catch (Exception e) {
throw new HiveException(e);
}
- boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps());
if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
tblDesc.setReplicationSpec(replicationSpec);
StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
}
- if (isExternalSet){
- if (isSourceMm) {
+ if (isExternalSet) {
+ if (AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())) {
throw new SemanticException("Cannot import an MM table as external");
}
tblDesc.setExternal(isExternalSet);
@@ -328,32 +327,28 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
Long writeId = 0L; // Initialize with 0 for non-ACID and non-MM tables.
- if (((table != null) && AcidUtils.isTransactionalTable(table))
- || AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())) {
+ int stmtId = 0;
+ if ((tableExists && AcidUtils.isTransactionalTable(table))
+ || (!tableExists && AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps()))) {
+ //if importing into existing transactional table or will create a new transactional table
+ //(because Export was done from transactional table), need a writeId
// Explain plan doesn't open a txn and hence no need to allocate write id.
if (x.getCtx().getExplainConfig() == null) {
writeId = txnMgr.getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName());
+ stmtId = txnMgr.getStmtIdAndIncrement();
}
}
- int stmtId = 0;
- // TODO [MM gap?]: bad merge; tblDesc is no longer CreateTableDesc, but ImportTableDesc.
- // We need to verify the tests to see if this works correctly.
- /*
- if (isAcid(writeId)) {
- tblDesc.setInitialMmWriteId(writeId);
- }
- */
if (!replicationSpec.isInReplicationScope()) {
createRegularImportTasks(
tblDesc, partitionDescs,
isPartSpecSet, replicationSpec, table,
- fromURI, fs, wh, x, writeId, stmtId, isSourceMm);
+ fromURI, fs, wh, x, writeId, stmtId);
} else {
createReplImportTasks(
tblDesc, partitionDescs,
replicationSpec, waitOnPrecursor, table,
- fromURI, fs, wh, x, writeId, stmtId, isSourceMm, updatedMetadata);
+ fromURI, fs, wh, x, writeId, stmtId, updatedMetadata);
}
return tableExists;
}
@@ -385,17 +380,27 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return tblDesc;
}
-
private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
Long writeId, int stmtId, boolean isSourceMm) {
+ assert table != null;
+ assert table.getParameters() != null;
Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
Path destPath = null, loadPath = null;
LoadFileType lft;
- if (AcidUtils.isInsertOnlyTable(table)) {
+ if (AcidUtils.isTransactionalTable(table)) {
String mmSubdir = replace ? AcidUtils.baseDir(writeId)
: AcidUtils.deltaSubdir(writeId, writeId, stmtId);
destPath = new Path(tgtPath, mmSubdir);
+ /**
+ * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition
+ * directory, i.e. the final destination for these files. This has to be a copy to preserve
+ * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem.
+ * So setting 'loadPath' this way will make
+ * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean,
+ * boolean, Long, int)}
+ * skip the unnecessary file (rename) operation but it will perform other things.
+ */
loadPath = tgtPath;
lft = LoadFileType.KEEP_EXISTING;
} else {
@@ -406,8 +411,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("adding import work for table with source location: " +
- dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId +
- " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
+ dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId +
+ " for " + table.getTableName() + ": " +
+ (AcidUtils.isFullAcidTable(table) ? "acid" :
+ (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat")
+ )
+ );
}
Task<?> copyTask = null;
@@ -422,6 +431,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadTableDesc loadTableWork = new LoadTableDesc(
loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
loadTableWork.setStmtId(stmtId);
+ //if Importing into existing table, FileFormat is checked by
+ // ImportSemanticAnalzyer.checked checkTable()
MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false);
Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
copyTask.addDependentTask(loadTableTask);
@@ -429,14 +440,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return loadTableTask;
}
- /**
- * todo: this is odd: write id allocated for all write operations on ACID tables. what is this supposed to check?
- */
- @Deprecated
- private static boolean isAcid(Long writeId) {
- return (writeId != null) && (writeId != 0);
- }
-
private static Task<?> createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){
return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf());
}
@@ -496,16 +499,20 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
+ partSpecToString(partSpec.getPartSpec())
+ " with source location: " + srcLocation);
Path tgtLocation = new Path(partSpec.getLocation());
- Path destPath = !AcidUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtLocation)
+ Path destPath = !AcidUtils.isTransactionalTable(table.getParameters()) ?
+ x.getCtx().getExternalTmpPath(tgtLocation)
: new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId));
- Path moveTaskSrc = !AcidUtils.isInsertOnlyTable(table.getParameters()) ? destPath : tgtLocation;
+ Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ? destPath : tgtLocation;
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("adding import work for partition with source location: "
+ srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
- + writeId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
+ + writeId + " for " + partSpecToString(partSpec.getPartSpec()) + ": " +
+ (AcidUtils.isFullAcidTable(table) ? "acid" :
+ (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat")
+ )
+ );
}
-
Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(
@@ -601,7 +608,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return sb.toString();
}
- public static void checkTable(Table table, ImportTableDesc tableDesc,
+ private static void checkTable(Table table, ImportTableDesc tableDesc,
ReplicationSpec replicationSpec, HiveConf conf)
throws SemanticException, URISyntaxException {
// This method gets called only in the scope that a destination table already exists, so
@@ -634,7 +641,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
// table in the statement, if a destination partitioned table exists, so long as it is actually
// not external itself. Is that the case? Why?
{
- if ( (tableDesc.isExternal()) // IMPORT statement speicified EXTERNAL
+ if ((tableDesc.isExternal()) // IMPORT statement specified EXTERNAL
&& (!table.isPartitioned() || !table.getTableType().equals(TableType.EXTERNAL_TABLE))
){
throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg(
@@ -824,8 +831,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
private static void createRegularImportTasks(
ImportTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet,
ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh,
- EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm)
- throws HiveException, URISyntaxException, IOException, MetaException {
+ EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
+ throws HiveException, IOException, MetaException {
+
+ final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps());
if (table != null) {
if (table.isPartitioned()) {
@@ -851,7 +860,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
Path tgtPath = new Path(table.getDataLocation().toString());
FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
- loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, isSourceMm);
+ loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId,
+ isSourceMm);
}
// Set this to read because we can't overwrite any existing partitions
x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
@@ -859,7 +869,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getLOG().debug("table " + tblDesc.getTableName() + " does not exist");
Task<?> t = createTableTask(tblDesc, x);
- table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
+ table = createNewTableMetadataObject(tblDesc);
+
Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
// Since we are going to be creating a new table in a db, we should mark that db as a write entity
@@ -889,19 +900,26 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG());
- if (isSourceMm) { // since target table doesn't exist, it should inherit soruce table's properties
- Map<String, String> tblproperties = table.getParameters();
- tblproperties.put("transactional", "true");
- tblproperties.put("transactional_properties", "insert_only");
- table.setParameters(tblproperties);
- }
- t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, writeId, stmtId, isSourceMm));
+ t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x,
+ writeId, stmtId, isSourceMm));
}
}
x.getTasks().add(t);
}
}
+ private static Table createNewTableMetadataObject(ImportTableDesc tblDesk)
+ throws SemanticException {
+ Table newTable = new Table(tblDesk.getDatabaseName(), tblDesk.getTableName());
+ //so that we know the type of table we are creating: acid/MM to match what was exported
+ newTable.setParameters(tblDesk.getTblProps());
+ if(tblDesk.isExternal() && AcidUtils.isTransactionalTable(newTable)) {
+ throw new SemanticException("External tables may not be transactional: " +
+ Warehouse.getQualifiedName(tblDesk.getDatabaseName(), tblDesk.getTableName()));
+ }
+ return newTable;
+ }
+
private static Task<?> createImportCommitTask(
HiveConf conf, String dbName, String tblName, Long writeId, int stmtId, boolean isMmTable) {
// TODO: noop, remove?
@@ -918,11 +936,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
List<AddPartitionDesc> partitionDescs,
ReplicationSpec replicationSpec, boolean waitOnPrecursor,
Table table, URI fromURI, FileSystem fs, Warehouse wh,
- EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm,
+ EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId,
UpdatedMetaDataTracker updatedMetadata)
throws HiveException, URISyntaxException, IOException, MetaException {
Task<?> dropTblTask = null;
+ final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps());
WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
// Normally, on import, trying to create a table or a partition in a db that does not yet exist
@@ -999,7 +1018,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
Task t = createTableTask(tblDesc, x);
- table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
+ table = createNewTableMetadataObject(tblDesc);
if (!replicationSpec.isMetadataOnly()) {
if (isPartitioned(tblDesc)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index f46a7b5..8200463 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -215,6 +215,9 @@ public final class SemanticAnalyzerFactory {
case HiveParser.TOK_LOAD:
return new LoadSemanticAnalyzer(queryState);
case HiveParser.TOK_EXPORT:
+ if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) {
+ return new UpdateDeleteSemanticAnalyzer(queryState);
+ }
return new ExportSemanticAnalyzer(queryState);
case HiveParser.TOK_IMPORT:
return new ImportSemanticAnalyzer(queryState);
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 0effd92..2f3b07f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -17,9 +17,8 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,27 +28,44 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import org.antlr.runtime.TokenRewriteStream;
+import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.ExportWork;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -59,6 +75,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
* updates and deletes instead.
*/
public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
+ private static final Logger LOG = LoggerFactory.getLogger(UpdateDeleteSemanticAnalyzer.class);
private boolean useSuper = false;
@@ -84,6 +101,9 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
case HiveParser.TOK_MERGE:
analyzeMerge(tree);
break;
+ case HiveParser.TOK_EXPORT:
+ analyzeAcidExport(tree);
+ break;
default:
throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
"UpdateDeleteSemanticAnalyzer");
@@ -99,6 +119,228 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
return currentOperation == Context.Operation.DELETE;
}
+ /**
+ * Exporting an Acid table is more complicated than a flat table. It may contains delete events,
+ * which can only be interpreted properly withing the context of the table/metastore where they
+ * were generated. It may also contain insert events that belong to transactions that aborted
+ * where the same constraints apply.
+ * In order to make the export artifact free of these constraints, the export does a
+ * insert into tmpTable select * from <export table> to filter/apply the events in current
+ * context and then export the tmpTable. This export artifact can now be imported into any
+ * table on any cluster (subject to schema checks etc).
+ * See {@link #analyzeAcidExport(ASTNode)}
+ * @param tree Export statement
+ * @return true if exporting an Acid table.
+ */
+ public static boolean isAcidExport(ASTNode tree) throws SemanticException {
+ assert tree != null && tree.getToken() != null &&
+ tree.getToken().getType() == HiveParser.TOK_EXPORT;
+ Tree tokTab = tree.getChild(0);
+ assert tokTab != null && tokTab.getType() == HiveParser.TOK_TAB;
+ Table tableHandle = null;
+ try {
+ tableHandle = getTable((ASTNode) tokTab.getChild(0), Hive.get(), false);
+ } catch(HiveException ex) {
+ throw new SemanticException(ex);
+ }
+
+ //tableHandle can be null if table doesn't exist
+ return tableHandle != null && AcidUtils.isFullAcidTable(tableHandle);
+ }
+ private static String getTmptTableNameForExport(Table exportTable) {
+ String tmpTableDb = exportTable.getDbName();
+ String tmpTableName = exportTable.getTableName() + "_" +
+ UUID.randomUUID().toString().replace('-', '_');
+ return Warehouse.getQualifiedName(tmpTableDb, tmpTableName);
+ }
+
+ /**
+ * See {@link #isAcidExport(ASTNode)}
+ * 1. create the temp table T
+ * 2. compile 'insert into T select * from acidTable'
+ * 3. compile 'export acidTable' (acidTable will be replaced with T during execution)
+ * 4. create task to drop T
+ *
+ * Using a true temp (session level) table means it should not affect replication and the table
+ * is not visible outside the Session that created for security
+ */
+ private void analyzeAcidExport(ASTNode ast) throws SemanticException {
+ assert ast != null && ast.getToken() != null &&
+ ast.getToken().getType() == HiveParser.TOK_EXPORT;
+ ASTNode tableTree = (ASTNode)ast.getChild(0);
+ assert tableTree != null && tableTree.getType() == HiveParser.TOK_TAB;
+ ASTNode tokRefOrNameExportTable = (ASTNode) tableTree.getChild(0);
+ Table exportTable = getTargetTable(tokRefOrNameExportTable);
+ assert AcidUtils.isFullAcidTable(exportTable);
+
+ //need to create the table "manually" rather than creating a task since it has to exist to
+ // compile the insert into T...
+ String newTableName = getTmptTableNameForExport(exportTable); //this is db.table
+ Map<String, String> tblProps = new HashMap<>();
+ tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString());
+ CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName,
+ false, true, null,
+ null, null, null, null,
+ tblProps,
+ true, //important so we get an exception on name collision
+ Warehouse.getQualifiedName(exportTable.getTTable()), false);
+ Table newTable;
+ try {
+ ReadEntity dbForTmpTable = new ReadEntity(db.getDatabase(exportTable.getDbName()));
+ inputs.add(dbForTmpTable); //so the plan knows we are 'reading' this db - locks, security...
+ DDLTask createTableTask = (DDLTask) TaskFactory.get(
+ new DDLWork(new HashSet<>(), new HashSet<>(), ctlt), conf);
+ createTableTask.setConf(conf); //above get() doesn't set it
+ createTableTask.execute(new DriverContext(new Context(conf)));
+ newTable = db.getTable(newTableName);
+ } catch(IOException|HiveException ex) {
+ throw new SemanticException(ex);
+ }
+
+ //now generate insert statement
+ //insert into newTableName select * from ts <where partition spec>
+ StringBuilder rewrittenQueryStr = generateExportQuery(newTable.getPartCols(),
+ tokRefOrNameExportTable, tableTree, newTableName);
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ rewrittenCtx.setIsUpdateDeleteMerge(false); //it's set in parseRewrittenQuery()
+ ASTNode rewrittenTree = rr.rewrittenTree;
+ try {
+ useSuper = true;
+ //newTable has to exist at this point to compile
+ super.analyze(rewrittenTree, rewrittenCtx);
+ } finally {
+ useSuper = false;
+ }
+ //now we have the rootTasks set up for Insert ... Select
+ removeStatsTasks(rootTasks);
+ //now make an ExportTask from temp table
+ /*analyzeExport() creates TableSpec which in turn tries to build
+ "public List<Partition> partitions" by looking in the metastore to find Partitions matching
+ the partition spec in the Export command. These of course don't exist yet since we've not
+ ran the insert stmt yet!!!!!!!
+ */
+ Task<ExportWork> exportTask = ExportSemanticAnalyzer.analyzeExport(ast, newTableName,
+ db, conf, inputs, outputs);
+
+ AlterTableDesc alterTblDesc = null;
+ {
+ /**
+ * add an alter table task to set transactional props
+ * do it after populating temp table so that it's written as non-transactional table but
+ * update props before export so that export archive metadata has these props. This way when
+ * IMPORT is done for this archive and target table doesn't exist, it will be created as Acid.
+ */
+ alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
+ HashMap<String, String> mapProps = new HashMap<>();
+ mapProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString());
+ alterTblDesc.setProps(mapProps);
+ alterTblDesc.setOldName(newTableName);
+ }
+ addExportTask(rootTasks, exportTask, TaskFactory.get(
+ new DDLWork(getInputs(), getOutputs(), alterTblDesc)));
+
+ {
+ /**
+ * Now make a task to drop temp table
+ * {@link DDLSemanticAnalyzer#analyzeDropTable(ASTNode ast, TableType expectedType)
+ */
+ ReplicationSpec replicationSpec = new ReplicationSpec();
+ DropTableDesc dropTblDesc = new DropTableDesc(newTableName, TableType.MANAGED_TABLE,
+ false, true, replicationSpec);
+ Task<DDLWork> dropTask =
+ TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), conf);
+ exportTask.addDependentTask(dropTask);
+ }
+ markReadEntityForUpdate();
+ if(ctx.isExplainPlan()) {
+ try {
+ //so that "explain" doesn't "leak" tmp tables
+ db.dropTable(newTable.getDbName(), newTable.getTableName(), true, true, true);
+ } catch(HiveException ex) {
+ LOG.warn("Unable to drop " + newTableName + " due to: " + ex.getMessage(), ex);
+ }
+ }
+ }
+ /**
+ * generate
+ * insert into newTableName select * from ts <where partition spec>
+ * for EXPORT command
+ */
+ private StringBuilder generateExportQuery(List<FieldSchema> partCols,
+ ASTNode tokRefOrNameExportTable, ASTNode tableTree, String newTableName)
+ throws SemanticException {
+ StringBuilder rewrittenQueryStr = new StringBuilder("insert into ").append(newTableName);
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+ rewrittenQueryStr.append(" select * from ").append(getFullTableNameForSQL(tokRefOrNameExportTable));
+ //builds partition spec so we can build suitable WHERE clause
+ TableSpec exportTableSpec = new TableSpec(db, conf, tableTree, false, true);
+ if(exportTableSpec.getPartSpec() != null) {
+ StringBuilder whereClause = null;
+ for(Map.Entry<String, String> ent : exportTableSpec.getPartSpec().entrySet()) {
+ if(ent.getValue() == null) {
+ continue; //partial spec
+ }
+ if(whereClause == null) {
+ whereClause = new StringBuilder(" WHERE ");
+ }
+ if(whereClause.length() > " WHERE ".length()) {
+ whereClause.append(" AND ");
+ }
+ whereClause.append(HiveUtils.unparseIdentifier(ent.getKey(), conf))
+ .append(" = ").append(ent.getValue());
+ }
+ if(whereClause != null) {
+ rewrittenQueryStr.append(whereClause);
+ }
+ }
+ return rewrittenQueryStr;
+ }
+ /**
+ * Makes the exportTask run after all other tasks of the "insert into T ..." are done.
+ */
+ private void addExportTask(List<Task<? extends Serializable>> rootTasks,
+ Task<ExportWork> exportTask, Task<DDLWork> alterTable) {
+ for(Task<? extends Serializable> t : rootTasks) {
+ if(t.getNumChild() <= 0) {
+ //todo: ConditionalTask#addDependentTask(Task) doesn't do the right thing: HIVE-18978
+ t.addDependentTask(alterTable);
+ //this is a leaf so add exportTask to follow it
+ alterTable.addDependentTask(exportTask);
+ } else {
+ addExportTask(t.getDependentTasks(), exportTask, alterTable);
+ }
+ }
+ }
+ private List<Task<? extends Serializable>> findStatsTasks(
+ List<Task<? extends Serializable>> rootTasks, List<Task<? extends Serializable>> statsTasks) {
+ for(Task<? extends Serializable> t : rootTasks) {
+ if (t instanceof StatsTask) {
+ if(statsTasks == null) {
+ statsTasks = new ArrayList<>();
+ }
+ statsTasks.add(t);
+ }
+ if(t.getDependentTasks() != null) {
+ statsTasks = findStatsTasks(t.getDependentTasks(), statsTasks);
+ }
+ }
+ return statsTasks;
+ }
+ private void removeStatsTasks(List<Task<? extends Serializable>> rootTasks) {
+ List<Task<? extends Serializable>> statsTasks = findStatsTasks(rootTasks, null);
+ if(statsTasks == null) {
+ return;
+ }
+ for(Task<? extends Serializable> statsTask : statsTasks) {
+ if(statsTask.getParentTasks() == null) {
+ continue; //should never happen
+ }
+ for(Task<? extends Serializable> t : new ArrayList<>(statsTask.getParentTasks())) {
+ t.removeDependentTask(statsTask);
+ }
+ }
+ }
private void analyzeUpdate(ASTNode tree) throws SemanticException {
currentOperation = Context.Operation.UPDATE;
reparseAndSuperAnalyze(tree);
@@ -219,6 +461,13 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
* @return the Metastore representation of the target table
*/
private Table getTargetTable(ASTNode tabRef) throws SemanticException {
+ return getTable(tabRef, db, true);
+ }
+ /**
+ * @param throwException if false, return null if table doesn't exist, else throw
+ */
+ private static Table getTable(ASTNode tabRef, Hive db, boolean throwException)
+ throws SemanticException {
String[] tableName;
Table mTable;
switch (tabRef.getType()) {
@@ -232,7 +481,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef);
}
try {
- mTable = db.getTable(tableName[0], tableName[1]);
+ mTable = db.getTable(tableName[0], tableName[1], throwException);
} catch (InvalidTableException e) {
LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
+ e.getMessage());
@@ -300,6 +549,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
}
rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
+ rewrittenCtx.setExplainPlan(ctx.isExplainPlan());
rewrittenCtx.setIsUpdateDeleteMerge(true);
rewrittenCtx.setCmd(rewrittenQueryStr.toString());
@@ -581,7 +831,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
}
/**
* Here we take a Merge statement AST and generate a semantically equivalent multi-insert
- * statement to exectue. Each Insert leg represents a single WHEN clause. As much as possible,
+ * statement to execute. Each Insert leg represents a single WHEN clause. As much as possible,
* the new SQL statement is made to look like the input SQL statement so that it's easier to map
* Query Compiler errors from generated SQL to original one this way.
* The generated SQL is a complete representation of the original input for the same reason.
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
index 1e405a5..c0e4a43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
@@ -93,8 +93,10 @@ public class CopyWork implements Serializable {
return errorOnSrcEmpty;
}
- /** Whether the copy should ignore MM directories in the source, and copy their content to
- * destination directly, rather than copying the directories themselves. */
+ /**
+ * Whether the copy should ignore MM directories in the source, and copy their content to
+ * destination directly, rather than copying the directories themselves.
+ * */
public void setSkipSourceMmDirs(boolean isMm) {
this.isSkipMmDirs = isMm;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
index 9093f48..72ce798 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
@@ -17,15 +17,20 @@
*/
package org.apache.hadoop.hive.ql.plan;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.plan.Explain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
@Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT,
Explain.Level.EXTENDED })
public class ExportWork implements Serializable {
+ private Logger LOG = LoggerFactory.getLogger(ExportWork.class);
private static final long serialVersionUID = 1L;
@@ -33,13 +38,18 @@ public class ExportWork implements Serializable {
private TableSpec tableSpec;
private ReplicationSpec replicationSpec;
private String astRepresentationForErrorMsg;
+ private String qualifiedTableName;
+ /**
+ * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise
+ */
public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec,
- String astRepresentationForErrorMsg) {
+ String astRepresentationForErrorMsg, String qualifiedTable) {
this.exportRootDirName = exportRootDirName;
this.tableSpec = tableSpec;
this.replicationSpec = replicationSpec;
this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
+ this.qualifiedTableName = qualifiedTable;
}
public String getExportRootDir() {
@@ -70,4 +80,18 @@ public class ExportWork implements Serializable {
this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
}
+ /**
+ * For exporting Acid table, change the "pointer" to the temp table.
+ * This has to be done after the temp table is populated and all necessary Partition objects
+ * exist in the metastore.
+ * See {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer#isAcidExport(ASTNode)}
+ * for more info.
+ */
+ public void acidPostProcess(Hive db) throws HiveException {
+ if(qualifiedTableName != null) {
+ LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName +
+ " using partSpec=" + tableSpec.partSpec);
+ tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
index 5fbd33f..b137cd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
@@ -60,7 +60,7 @@ public class ImportTableDesc {
this.createTblDesc = new CreateTableDesc(dbName,
table.getTableName(),
false, // isExternal: set to false here, can be overwritten by the IMPORT stmt
- table.isTemporary(),
+ false,
table.getSd().getCols(),
table.getPartitionKeys(),
table.getSd().getBucketCols(),
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 993357d..60b63d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -116,9 +117,10 @@ public class SessionState {
static final String LOCK_FILE_NAME = "inuse.lck";
static final String INFO_FILE_NAME = "inuse.info";
- private final Map<String, Map<String, Table>> tempTables = new HashMap<String, Map<String, Table>>();
+ private final Map<String, Map<String, Table>> tempTables = new HashMap<>();
private final Map<String, Map<String, ColumnStatisticsObj>> tempTableColStats =
new HashMap<String, Map<String, ColumnStatisticsObj>>();
+ private final Map<String, SessionHiveMetaStoreClient.TempTable> tempPartitions = new HashMap<>();
protected ClassLoader parentLoader;
@@ -535,7 +537,8 @@ public class SessionState {
* Singleton Session object per thread.
*
**/
- private static ThreadLocal<SessionStates> tss = new ThreadLocal<SessionStates>() {
+ private static InheritableThreadLocal<SessionStates> tss =
+ new InheritableThreadLocal<SessionStates>() {
@Override
protected SessionStates initialValue() {
return new SessionStates();
@@ -1866,6 +1869,9 @@ public class SessionState {
public Map<String, Map<String, Table>> getTempTables() {
return tempTables;
}
+ public Map<String, SessionHiveMetaStoreClient.TempTable> getTempPartitions() {
+ return tempPartitions;
+ }
public Map<String, Map<String, ColumnStatisticsObj>> getTempTableColStats() {
return tempTableColStats;
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
index c821365..7f7bc11 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
@@ -49,7 +49,7 @@ public class TestTxnAddPartition extends TxnCommandsBaseForTests {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnAddPartition.class);
private static final String TEST_DATA_DIR =
new File(System.getProperty("java.io.tmpdir") +
- File.separator + TestTxnLoadData.class.getCanonicalName()
+ File.separator + TestTxnAddPartition.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
@Rule
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
new file mode 100644
index 0000000..0e53697
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * tests for IMPORT/EXPORT of transactional tables.
+ */
+public class TestTxnExIm extends TxnCommandsBaseForTests {
+ private static final Logger LOG = LoggerFactory.getLogger(TestTxnExIm.class);
+ private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestTxnExIm.class.getCanonicalName() + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+
+ @Override
+ String getTestDataDir() {
+ return TEST_DATA_DIR;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true");
+ }
+
+ /**
+ * simplest export test.
+ */
+ @Test
+ public void testExport() throws Exception {
+ int[][] rows1 = {{1, 2}, {3, 4}};
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists TImport ");
+ runStatementOnDriver("create table T (a int, b int) stored as ORC");
+ runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES " +
+ "('transactional'='false')");
+ runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1));
+ List<String> rs = runStatementOnDriver("select * from T order by a,b");
+ Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs);
+
+ String exportStmt = "export table T to '" + getTestDataDir() + "/export'";
+ rs = runStatementOnDriver("explain " + exportStmt);
+ StringBuilder sb = new StringBuilder("*** " + exportStmt);
+ for (String r : rs) {
+ sb.append("\n").append(r);
+ }
+ LOG.error(sb.toString());
+
+ runStatementOnDriver(exportStmt);
+ //verify data
+ runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'");
+ List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b");
+ Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs1);
+ }
+
+ /**
+ * The update delete cause MergeFileTask to be executed.
+ */
+ @Test
+ public void testExportMerge() throws Exception {
+ int[][] rows1 = {{1, 2}, {3, 4}};
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists TImport ");
+ runStatementOnDriver("create table T (a int, b int) stored as ORC");
+ runStatementOnDriver("create table TImport (a int, b int) stored as ORC TBLPROPERTIES " +
+ "('transactional'='false')");
+ runStatementOnDriver("insert into T(a,b) " + makeValuesClause(rows1));
+ runStatementOnDriver("update T set b = 17 where a = 1");
+ int[][] rows2 = {{1, 17}, {3, 4}};
+ List<String> rs = runStatementOnDriver("select * from T order by a,b");
+ Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs);
+
+ String exportStmt = "export table T to '" + getTestDataDir() + "/export'";
+ rs = runStatementOnDriver("explain " + exportStmt);
+ StringBuilder sb = new StringBuilder("*** " + exportStmt);
+ for (String r : rs) {
+ sb.append("\n").append(r);
+ }
+ LOG.error(sb.toString());
+
+ runStatementOnDriver(exportStmt);
+ //verify data
+ runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'");
+ List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b");
+ Assert.assertEquals("Content didn't match rs", stringifyValues(rows2), rs1);
+ }
+
+ /**
+ * export partitioned table with full partition spec.
+ */
+ @Test
+ public void testExportPart() throws Exception {
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ int[][] rows1 = {{1, 2, 1}, {3, 4, 2}};
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists TImport ");
+ runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int) stored as " +
+ "ORC TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as ORC");
+ runStatementOnDriver("insert into T partition(p)" + makeValuesClause(rows1));
+ runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'");
+ /*
+target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1519423568221/
+├── export
+│ ├── _metadata
+│ └── p=1
+│ └── delta_0000001_0000001_0000
+│ └── bucket_00000
+*/
+ runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'");
+ List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b");
+ int[][] res = {{1, 2, 1}};
+ Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1);
+ }
+
+ /**
+ * Export partitioned table with partial partition spec.
+ */
+ @Test
+ public void testExportPartPartial() throws Exception {
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ int[][] rows1 = {{1, 2, 1, 1}, {3, 4, 2, 2}, {5, 6, 1, 2}, {7, 8, 2, 2}};
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists TImport ");
+ runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int, q int) " +
+ "stored as ORC TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table T (a int, b int) partitioned by (p int, q int) stored as " +
+ "ORC");
+ runStatementOnDriver("insert into T partition(p,q)" + makeValuesClause(rows1));
+
+ runStatementOnDriver("export table T partition(p=1) to '" + getTestDataDir() + "/export'");
+ runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'");
+ List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b");
+ int[][] res = {{1, 2, 1, 1}, {5, 6, 1, 2}};
+ Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1);
+ /* Here is the layout we expect
+target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
+├── export
+│ ├── _metadata
+│ └── p=1
+│ ├── q=1
+│ │ └── 000002_0
+│ └── q=2
+│ └── 000001_0
+└── warehouse
+ ├── acidtbl
+ ├── acidtblpart
+ ├── nonacidnonbucket
+ ├── nonacidorctbl
+ ├── nonacidorctbl2
+ ├── t
+ │ ├── p=1
+ │ │ ├── q=1
+ │ │ │ └── delta_0000001_0000001_0000
+ │ │ │ ├── _orc_acid_version
+ │ │ │ └── bucket_00000
+ │ │ └── q=2
+ │ │ └── delta_0000001_0000001_0000
+ │ │ ├── _orc_acid_version
+ │ │ └── bucket_00000
+ │ └── p=2
+ │ └── q=2
+ │ └── delta_0000001_0000001_0000
+ │ ├── _orc_acid_version
+ │ └── bucket_00000
+ └── timport
+ └── p=1
+ ├── q=1
+ │ └── 000002_0
+ └── q=2
+ └── 000001_0
+
+23 directories, 11 files
+*/
+ }
+ /**
+ * This specifies partial partition spec omitting top/first columns.
+ */
+ @Test
+ public void testExportPartPartial2() throws Exception {
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ int[][] rows1 = {{1, 2, 1, 1}, {3, 4, 2, 2}, {5, 6, 1, 2}, {7, 8, 2, 2}};
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists TImport ");
+ runStatementOnDriver("create table TImport (a int, b int) partitioned by (p int, q int)" +
+ " stored as ORC TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table T (a int, b int) partitioned by (p int, q int) " +
+ "stored as ORC");
+ runStatementOnDriver("insert into T partition(p,q)" + makeValuesClause(rows1));
+
+ runStatementOnDriver("export table T partition(q=2) to '" + getTestDataDir() + "/export'");
+ runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'");
+ List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b");
+ int[][] res = {{3, 4, 2, 2}, {5, 6, 1, 2}, {7, 8, 2, 2}};
+ Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1);
+ }
+ @Test
+ public void testExportPartPartial3() throws Exception {
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ int[][] rows1 = {{1, 1, 1, 2}, {3, 2, 3, 8}, {5, 1, 2, 6}, {7, 2, 2, 8}};
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists TImport ");
+ runStatementOnDriver("create table TImport (a int) partitioned by (p int, q int, r int)" +
+ " stored as ORC TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table T (a int) partitioned by (p int, q int, r int) " +
+ "stored as ORC");
+ runStatementOnDriver("insert into T partition(p,q,r)" + makeValuesClause(rows1));
+
+ runStatementOnDriver("export table T partition(p=2,r=8) to '" + getTestDataDir() + "/export'");
+ runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'");
+ List<String> rs1 = runStatementOnDriver("select * from TImport order by a");
+ int[][] res = {{3, 2, 3, 8}, {7, 2, 2, 8}};
+ Assert.assertEquals("Content didn't match rs", stringifyValues(res), rs1);
+ }
+
+ @Test
+ public void testExportBucketed() throws Exception {
+ int[][] rows1 = {{1, 2}, {1, 3}, {2, 4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(rows1));
+ runStatementOnDriver("export table " + Table.ACIDTBL + " to '" + getTestDataDir()
+ + "/export'");
+ runStatementOnDriver("drop table if exists TImport ");
+ runStatementOnDriver("create table TImport (a int, b int) clustered by (a) into 2 buckets" +
+ " stored as ORC TBLPROPERTIES ('transactional'='false')");
+
+ runStatementOnDriver("import table TImport from '" + getTestDataDir() + "/export'");
+ List<String> rs1 = runStatementOnDriver("select * from TImport order by a, b");
+ Assert.assertEquals("Content didn't match rs", stringifyValues(rows1), rs1);
+ }
+
+ @Ignore
+ @Test
+ public void testCTLT() throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("create table T like " + Table.ACIDTBL + " TBLPROPERTIES " +
+ "('transactional'='true')");
+// runStatementOnDriver("create table T like " + Table.ACIDTBL);
+ List<String> rs = runStatementOnDriver("show create table T");
+ StringBuilder sb = new StringBuilder("*show create table");
+ for (String r : rs) {
+ sb.append("\n").append(r);
+ }
+ LOG.error(sb.toString());
+ }
+
+ /**
+ * tests import where target table already exists.
+ */
+ @Test
+ public void testImport() throws Exception {
+ testImport(false, true);
+ }
+ /**
+ * tests import where target table already exists.
+ */
+ @Test
+ public void testImportVectorized() throws Exception {
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ testImport(true, true);
+ }
+ /**
+ * tests import where target table does not exists.
+ */
+ @Test
+ public void testImportNoTarget() throws Exception {
+ testImport(false, false);
+ }
+ /**
+ * MM tables already work - mm_exim.q
+ * Export creates a bunch of metadata in addition to data including all table props/IF/OF etc
+ * Import from 'export' can create a table (any name specified) or add data into existing table.
+ * If importing into existing table (un-partitioned) it must be empty.
+ * If Import is creating a table it will be exactly like exported one except for the name.
+ */
+ private void testImport(boolean isVectorized, boolean existingTarget) throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ if(existingTarget) {
+ runStatementOnDriver("create table T (a int, b int) stored as orc");
+ }
+ //Tstage is just a simple way to generate test data
+ runStatementOnDriver("create table Tstage (a int, b int) stored as orc " +
+ "tblproperties('transactional'='true')");
+ //this creates an ORC data file with correct schema under table root
+ runStatementOnDriver("insert into Tstage values(1,2),(3,4),(5,6)");
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+ //runStatementOnDriver("truncate table Tstage");
+
+ //load into existing empty table T
+ runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'");
+
+ String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+ "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+ String[][] expected = new String[][] {
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+ "t/delta_0000001_0000001_0000/000000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4",
+ "t/delta_0000001_0000001_0000/000000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t5\t6",
+ "t/delta_0000001_0000001_0000/000000_0"}};
+ checkResult(expected, testQuery, isVectorized, "import existing table");
+
+ runStatementOnDriver("update T set a = 0 where b = 6");
+ String[][] expected2 = new String[][] {
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+ "t/delta_0000001_0000001_0000/000000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4",
+ "t/delta_0000001_0000001_0000/000000_0"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t6",
+ "t/delta_0000002_0000002_0000/bucket_00000"}};
+ checkResult(expected2, testQuery, isVectorized, "update imported table");
+
+ runStatementOnDriver("alter table T compact 'minor'");
+ TestTxnCommands2.runWorker(hiveConf);
+ String[][] expected3 = new String[][] {
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+ "t/delta_0000001_0000002/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4",
+ "t/delta_0000001_0000002/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t6",
+ "t/delta_0000001_0000002/bucket_00000"}};
+ checkResult(expected3, testQuery, isVectorized, "minor compact imported table");
+
+ }
+
+ @Test
+ public void testImportPartitioned() throws Exception {
+ boolean isVectorized = false;
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc");
+ //Tstage is just a simple way to generate test data
+ runStatementOnDriver("create table Tstage (a int, b int) partitioned by (p int) stored" +
+ " as orc tblproperties('transactional'='false')");
+ //this creates an ORC data file with correct schema under table root
+ runStatementOnDriver("insert into Tstage values(1,2,10),(3,4,11),(5,6,12)");
+ //now we have an archive with 3 partitions
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+
+ //make the partition in Target not empty
+ runStatementOnDriver("insert into T values(0,0,10)");
+ //load partition that doesn't exist in T
+ runStatementOnDriver("import table T PARTITION(p=11) from '" + getWarehouseDir() + "/1'");
+ //load partition that doesn't exist in T
+ runStatementOnDriver("import table T PARTITION(p=12) from '" + getWarehouseDir() + "/1'");
+ String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+ "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+ String[][] expected = new String[][] {
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0",
+ "t/p=10/delta_0000001_0000001_0000/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4",
+ "t/p=11/delta_0000002_0000002_0000/000000_0"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
+ "t/p=12/delta_0000003_0000003_0000/000000_0"}};
+ checkResult(expected, testQuery, isVectorized, "import existing table");
+ }
+
+ /**
+ * test selective partitioned import where target table needs to be created.
+ * export is made from acid table so that target table is created as acid
+ */
+ @Test
+ public void testImportPartitionedCreate() throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ //Tstage is just a simple way to generate test data
+ runStatementOnDriver("create table Tstage (a int, b int) partitioned by (p int) stored" +
+ " as orc");
+ int[][] data = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
+ //this creates an ORC data file with correct schema under table root
+ runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(data));
+ //now we have an archive with 3 partitions
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+
+ /*
+ * load partition that doesn't exist in T
+ * There is some parallelism going on if you load more than 1 partition which I don't
+ * understand. In testImportPartitionedCreate2() that's reasonable since each partition is
+ * loaded in parallel. Why it happens here is beyond me.
+ * The file name changes from run to run between 000000_0 and 000001_0 and 000002_0
+ * The data is correct but this causes ROW__ID.bucketId/file names to change
+ */
+ runStatementOnDriver("import table T PARTITION(p=10) from '" + getWarehouseDir() + "/1'");
+ runStatementOnDriver("import table T PARTITION(p=11) from '" + getWarehouseDir() + "/1'");
+ runStatementOnDriver("import table T PARTITION(p=12) from '" + getWarehouseDir() + "/1'");
+
+ //verify data
+ List<String> rs = runStatementOnDriver("select a, b, p from T order by a,b,p");
+ Assert.assertEquals("reading imported data",
+ TestTxnCommands2.stringifyValues(data), rs);
+ //verify that we are indeed doing an Acid write (import)
+ rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME");
+ Assert.assertEquals(3, rs.size());
+ Assert.assertTrue(rs.get(0).contains("t/p=10/delta_0000001_0000001_0000/00000"));
+ Assert.assertTrue(rs.get(1).contains("t/p=11/delta_0000002_0000002_0000/00000"));
+ Assert.assertTrue(rs.get(2).contains("t/p=12/delta_0000003_0000003_0000/00000"));
+ }
+
+ /**
+ * import all partitions from archive - target table needs to be created.
+ * export is made from acid table so that target table is created as acid
+ */
+ @Test
+ public void testImportPartitionedCreate2() throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ //Tstage is just a simple way to generate test data
+ runStatementOnDriver("create table Tstage (a int, b int) partitioned by (p int) stored" +
+ " as orc");
+ int[][] data = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
+ //this creates an ORC data file with correct schema under table root
+ runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(data));
+ //now we have an archive with 3 partitions
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+
+ /*
+ * load entire archive
+ * There is some parallelism going on if you load more than 1 partition
+ * The file name changes from run to run between 000000_0 and 000001_0 and 000002_0
+ * The data is correct but this causes ROW__ID.bucketId/file names to change
+ */
+ runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'");
+
+ //verify data
+ List<String> rs = runStatementOnDriver("select a, b, p from T order by a,b,p");
+ Assert.assertEquals("reading imported data",
+ TestTxnCommands2.stringifyValues(data), rs);
+ //verify that we are indeed doing an Acid write (import)
+ rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME");
+ Assert.assertEquals(3, rs.size());
+ Assert.assertTrue(rs.get(0).contains("t/p=10/delta_0000001_0000001_0000/00000"));
+ Assert.assertTrue(rs.get(1).contains("t/p=11/delta_0000001_0000001_0000/00000"));
+ Assert.assertTrue(rs.get(2).contains("t/p=12/delta_0000001_0000001_0000/00000"));
+ }
+ @Test
+ public void testMM() throws Exception {
+ testMM(true, true);
+ }
+ @Test
+ public void testMMFlatSource() throws Exception {
+ testMM(true, false);
+ }
+ @Test
+ public void testMMCreate() throws Exception {
+ testMM(false, true);
+ }
+ @Ignore("in this case no transactional tables are involved")
+ @Test
+ public void testMMCreateFlatSource() throws Exception {
+ testMM(false, false);
+ }
+ private void testMM(boolean existingTable, boolean isSourceMM) throws Exception {
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+
+ int[][] data = {{1,2}, {3, 4}, {5, 6}};
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+
+ if(existingTable) {
+ runStatementOnDriver("create table T (a int, b int)");
+ }
+
+ runStatementOnDriver("create table Tstage (a int, b int)" +
+ (isSourceMM ? "" : " tblproperties('transactional'='false')"));
+ runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(data));
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+
+ runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'");
+
+ //verify data
+ List<String> rs = runStatementOnDriver("select a, b from T order by a, b");
+ Assert.assertEquals("reading imported data",
+ TestTxnCommands2.stringifyValues(data), rs);
+ //verify that we are indeed doing an Acid write (import)
+ rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME");
+ Assert.assertEquals(3, rs.size());
+ Assert.assertTrue(rs.get(0).endsWith("t/delta_0000001_0000001_0000/000000_0"));
+ Assert.assertTrue(rs.get(1).endsWith("t/delta_0000001_0000001_0000/000000_0"));
+ Assert.assertTrue(rs.get(2).endsWith("t/delta_0000001_0000001_0000/000000_0"));
+ }
+ private void checkResult(String[][] expectedResult, String query, boolean isVectorized,
+ String msg) throws Exception{
+ checkResult(expectedResult, query, isVectorized, msg, LOG);
+ }
+
+ /**
+ * This test will fail - MM export doesn't filter out aborted transaction data.
+ */
+ @Ignore()
+ @Test
+ public void testMMExportAborted() throws Exception {
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+ int[][] data = {{1, 2}, {3, 4}, {5, 6}};
+ int[][] dataAbort = {{10, 2}};
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ runStatementOnDriver("create table T (a int, b int)");
+ runStatementOnDriver("create table Tstage (a int, b int)");
+
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+ runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(dataAbort));
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+ runStatementOnDriver("insert into Tstage" + TestTxnCommands2.makeValuesClause(data));
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+
+ runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'");
+ //verify data
+ List<String> rs = runStatementOnDriver("select a, b from T order by a, b");
+ Assert.assertEquals("reading imported data",
+ TestTxnCommands2.stringifyValues(data), rs);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/699c5768/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
index 0fee075..ec8c150 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -448,7 +448,7 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
};
checkResult(expected, testQuery, isVectorized, "load data inpath");
}
- private void checkResult(String[][] expectedResult, String query, boolean isVectorized,
+ void checkResult(String[][] expectedResult, String query, boolean isVectorized,
String msg) throws Exception{
checkResult(expectedResult, query, isVectorized, msg, LOG);
}