You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/05/07 20:21:03 UTC
hive git commit: HIVE-17657 : export/import for MM tables is broken
(Sergey Shelukhin, reviewed by Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master 0fd74db20 -> 7ebcdeb95
HIVE-17657 : export/import for MM tables is broken (Sergey Shelukhin, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7ebcdeb9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7ebcdeb9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7ebcdeb9
Branch: refs/heads/master
Commit: 7ebcdeb951fca86c1fb4920553ff9f2d3687627a
Parents: 0fd74db
Author: sergey <se...@apache.org>
Authored: Mon May 7 13:20:51 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon May 7 13:20:51 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/CopyTask.java | 49 ++------
.../apache/hadoop/hive/ql/exec/ExportTask.java | 10 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 3 +
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 6 +-
.../hive/ql/parse/ExportSemanticAnalyzer.java | 45 ++++---
.../hive/ql/parse/ImportSemanticAnalyzer.java | 34 ++----
.../hive/ql/parse/SemanticAnalyzerFactory.java | 2 +-
.../ql/parse/repl/dump/PartitionExport.java | 9 +-
.../hive/ql/parse/repl/dump/TableExport.java | 11 +-
.../ql/parse/repl/dump/io/FileOperations.java | 84 +++++++++++--
.../apache/hadoop/hive/ql/plan/CopyWork.java | 18 ---
.../apache/hadoop/hive/ql/plan/ExportWork.java | 60 +++++----
.../apache/hadoop/hive/ql/TestTxnCommands.java | 122 +++++++++++++++++--
.../org/apache/hadoop/hive/ql/TestTxnExIm.java | 9 +-
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 3 +-
ql/src/test/queries/clientpositive/mm_exim.q | 4 +-
.../results/clientpositive/llap/mm_exim.q.out | 66 +++++++++-
17 files changed, 382 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index b0ec5ab..1a8e5e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -63,14 +63,25 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
protected int copyOnePath(Path fromPath, Path toPath) {
FileSystem dstFs = null;
try {
- Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} " + fromPath);
+ Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} ", fromPath, toPath);
console.printInfo("Copying data from " + fromPath.toString(), " to "
+ toPath.toString());
FileSystem srcFs = fromPath.getFileSystem(conf);
dstFs = toPath.getFileSystem(conf);
- FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs());
+ FileStatus[] srcs = srcFs.globStatus(fromPath, new EximPathFilter());
+
+ // TODO: this is very brittle given that Hive supports nested directories in the tables.
+ // The caller should pass a flag explicitly telling us if the directories in the
+ // input are data, or parent of data. For now, retain this for backward compat.
+ if (srcs != null && srcs.length == 1 && srcs[0].isDirectory()
+ /*&& srcs[0].getPath().getName().equals(EximUtil.DATA_PATH_NAME) - still broken for partitions*/) {
+ Utilities.FILE_OP_LOGGER.debug(
+ "Recursing into a single child directory {}", srcs[0].getPath().getName());
+ srcs = srcFs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
+ }
+
if (srcs == null || srcs.length == 0) {
if (work.isErrorOnSrcEmpty()) {
console.printError("No files matching path: " + fromPath.toString());
@@ -107,40 +118,6 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
}
}
- // Note: initially copied from LoadSemanticAnalyzer.
- private static FileStatus[] matchFilesOrDir(
- FileSystem fs, Path path, boolean isSourceMm) throws IOException {
- if (!fs.exists(path)) return null;
- if (!isSourceMm) return matchFilesOneDir(fs, path, null);
- // Note: this doesn't handle list bucketing properly; neither does the original code.
- FileStatus[] mmDirs = fs.listStatus(path, new AcidUtils.AnyIdDirFilter());
- if (mmDirs == null || mmDirs.length == 0) return null;
- List<FileStatus> allFiles = new ArrayList<FileStatus>();
- for (FileStatus mmDir : mmDirs) {
- if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- Utilities.FILE_OP_LOGGER.trace("Found source MM directory " + mmDir.getPath());
- }
- matchFilesOneDir(fs, mmDir.getPath(), allFiles);
- }
- return allFiles.toArray(new FileStatus[allFiles.size()]);
- }
-
- private static FileStatus[] matchFilesOneDir(
- FileSystem fs, Path path, List<FileStatus> result) throws IOException {
- FileStatus[] srcs = fs.globStatus(path, new EximPathFilter());
- if (srcs != null && srcs.length == 1) {
- if (srcs[0].isDirectory()) {
- srcs = fs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
- }
- }
- if (result != null && srcs != null) {
- for (int i = 0; i < srcs.length; ++i) {
- result.add(srcs[i]);
- }
- }
- return srcs;
- }
-
private static final class EximPathFilter implements PathFilter {
@Override
public boolean accept(Path p) {
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
index aba6591..119d792 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
@@ -47,15 +47,13 @@ public class ExportTask extends Task<ExportWork> implements Serializable {
protected int execute(DriverContext driverContext) {
try {
// Also creates the root directory
- TableExport.Paths exportPaths =
- new TableExport.Paths(work.getAstRepresentationForErrorMsg(), work.getExportRootDir(),
- conf, false);
+ TableExport.Paths exportPaths = new TableExport.Paths(
+ work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false);
Hive db = getHive();
LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir());
work.acidPostProcess(db);
- TableExport tableExport = new TableExport(
- exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf
- );
+ TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(),
+ work.getReplicationSpec(), db, null, conf, work.getMmContext());
if (!tableExport.write()) {
throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 31846a3..406bea0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
import java.beans.DefaultPersistenceDelegate;
import java.beans.Encoder;
import java.beans.Expression;
@@ -72,6 +73,7 @@ import java.util.regex.Pattern;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.codec.binary.Base64;
@@ -136,6 +138,7 @@ import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 88d352b..ccdf04a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -288,7 +289,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
}
- new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write();
+ MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
+ new TableExport(
+ exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf, mmCtx).write();
+
replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
} catch (InvalidTableException te) {
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index d3c62a2..4a366a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -19,12 +19,17 @@
package org.apache.hadoop.hive.ql.parse;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -32,15 +37,14 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
import org.apache.hadoop.hive.ql.plan.ExportWork;
-
-import javax.annotation.Nullable;
-import java.util.Set;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
/**
* ExportSemanticAnalyzer.
*
*/
public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
+ private boolean isMmExport = false;
ExportSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
@@ -48,7 +52,9 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
@Override
public void analyzeInternal(ASTNode ast) throws SemanticException {
- rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs));
+ Task<ExportWork> task = analyzeExport(ast, null, db, conf, inputs, outputs);
+ isMmExport = task.getWork().getMmContext() != null;
+ rootTasks.add(task);
}
/**
* @param acidTableName - table name in db.table format; not NULL if exporting Acid table
@@ -80,12 +86,10 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
try {
ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true);
- } catch (SemanticException sme){
- if ((replicationSpec.isInReplicationScope()) &&
- ((sme.getCause() instanceof InvalidTableException)
- || (sme instanceof Table.ValidationFailureSemanticException)
- )
- ){
+ } catch (SemanticException sme) {
+ if (!replicationSpec.isInReplicationScope()) throw sme;
+ if ((sme.getCause() instanceof InvalidTableException)
+ || (sme instanceof Table.ValidationFailureSemanticException)) {
// If we're in replication scope, it's possible that we're running the export long after
// the table was dropped, so the table not existing currently or being a different kind of
// table is not an error - it simply means we should no-op, and let a future export
@@ -101,15 +105,26 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
// All parsing is done, we're now good to start the export process
TableExport.Paths exportPaths =
new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false);
- TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf);
- TableExport.AuthEntities authEntities = tableExport.getAuthEntities();
+ // Note: this tableExport is actually never used other than for auth, and another one is
+ // created when the task is executed. So, we don't care about the correct MM state here.
+ TableExport.AuthEntities authEntities = new TableExport(
+ exportPaths, ts, replicationSpec, db, null, conf, null).getAuthEntities();
inputs.addAll(authEntities.inputs);
outputs.addAll(authEntities.outputs);
String exportRootDirName = tmpPath;
+ MmContext mmCtx = MmContext.createIfNeeded(ts == null ? null : ts.tableHandle);
+
+ Utilities.FILE_OP_LOGGER.debug("Exporting table {}: MM context {}",
+ ts == null ? null : ts.tableName, mmCtx);
// Configure export work
- ExportWork exportWork =
- new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName);
+ ExportWork exportWork = new ExportWork(exportRootDirName, ts, replicationSpec,
+ ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName, mmCtx);
// Create an export task and add it as a root task
- return TaskFactory.get(exportWork);
+ return TaskFactory.get(exportWork);
+ }
+
+ @Override
+ public boolean hasTransactionalInQuery() {
+ return isMmExport; // Full ACID export goes thru UpdateDelete analyzer.
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index e6a7012..e597872 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -382,7 +382,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
- Long writeId, int stmtId, boolean isSourceMm) {
+ Long writeId, int stmtId) {
assert table != null;
assert table.getParameters() != null;
Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
@@ -423,9 +423,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf());
} else {
- CopyWork cw = new CopyWork(dataPath, destPath, false);
- cw.setSkipSourceMmDirs(isSourceMm);
- copyTask = TaskFactory.get(cw);
+ copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
}
LoadTableDesc loadTableWork = new LoadTableDesc(
@@ -480,7 +478,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
- EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm)
+ EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
throws MetaException, IOException, HiveException {
AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
@@ -517,9 +515,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
copyTask = ReplCopyTask.getLoadCopyTask(
replicationSpec, new Path(srcLocation), destPath, x.getConf());
} else {
- CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
- cw.setSkipSourceMmDirs(isSourceMm);
- copyTask = TaskFactory.get(cw);
+ copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
}
Task<?> addPartTask = TaskFactory.get(
@@ -830,8 +826,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
throws HiveException, IOException, MetaException {
- final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps());
-
if (table != null) {
if (table.isPartitioned()) {
x.getLOG().debug("table partitioned");
@@ -841,7 +835,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
} else {
throw new SemanticException(
ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -853,8 +847,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
Path tgtPath = new Path(table.getDataLocation().toString());
FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
- loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId,
- isSourceMm);
+ loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId);
}
// Set this to read because we can't overwrite any existing partitions
x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
@@ -873,7 +866,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (isPartitioned(tblDesc)) {
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
- replicationSpec, x, writeId, stmtId, isSourceMm));
+ replicationSpec, x, writeId, stmtId));
}
} else {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
@@ -891,7 +884,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG());
t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x,
- writeId, stmtId, isSourceMm));
+ writeId, stmtId));
}
}
x.getTasks().add(t);
@@ -923,7 +916,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
throws HiveException, URISyntaxException, IOException, MetaException {
Task<?> dropTblTask = null;
- final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps());
WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
// Normally, on import, trying to create a table or a partition in a db that does not yet exist
@@ -1007,14 +999,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
addPartitionDesc.setReplicationSpec(replicationSpec);
t.addDependentTask(
- addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
+ addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
if (updatedMetadata != null) {
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
}
}
} else {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
- t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm));
+ t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId));
}
}
@@ -1037,7 +1029,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
if (updatedMetadata != null) {
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
}
@@ -1054,7 +1046,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (replicationSpec.allowReplacementInto(ptn.getParameters())){
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
} else {
x.getTasks().add(alterSinglePartition(
fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
@@ -1080,7 +1072,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (!replicationSpec.isMetadataOnly()) {
// repl-imports are replace-into unless the event is insert-into
loadTable(fromURI, table, replicationSpec.isReplace(), table.getDataLocation(),
- replicationSpec, x, writeId, stmtId, isSourceMm);
+ replicationSpec, x, writeId, stmtId);
} else {
x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 8200463..70295da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -215,7 +215,7 @@ public final class SemanticAnalyzerFactory {
case HiveParser.TOK_LOAD:
return new LoadSemanticAnalyzer(queryState);
case HiveParser.TOK_EXPORT:
- if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) {
+ if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) {
return new UpdateDeleteSemanticAnalyzer(queryState);
}
return new ExportSemanticAnalyzer(queryState);
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index 70eb750..d73fc4f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +44,8 @@ import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths;
* it has a blocking queue that stores partitions to be dumped via a producer thread.
* it has a worker thread pool that reads of the queue to perform the various tasks.
*/
+// TODO: this object is created once to call one method and then immediately destroyed.
+// So it's basically just a roundabout way to pass arguments to a static method. Simplify?
class PartitionExport {
private final Paths paths;
private final PartitionIterable partitionIterable;
@@ -50,16 +53,18 @@ class PartitionExport {
private final HiveConf hiveConf;
private final int nThreads;
private final SessionState callersSession;
+ private final MmContext mmCtx;
private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class);
private BlockingQueue<Partition> queue;
PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser,
- HiveConf hiveConf) {
+ HiveConf hiveConf, MmContext mmCtx) {
this.paths = paths;
this.partitionIterable = partitionIterable;
this.distCpDoAsUser = distCpDoAsUser;
this.hiveConf = hiveConf;
+ this.mmCtx = mmCtx;
this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM);
this.queue = new ArrayBlockingQueue<>(2 * nThreads);
this.callersSession = SessionState.get();
@@ -106,7 +111,7 @@ class PartitionExport {
List<Path> dataPathList = Utils.getDataPathList(partition.getDataLocation(),
forReplicationSpec, hiveConf);
Path rootDataDumpDir = paths.partitionExportDir(partitionName);
- new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf)
+ new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx)
.export(forReplicationSpec);
LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index d0aeee5..2a3986a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity;
+// TODO: this object is created once to call one method and then immediately destroyed.
+// So it's basically just a roundabout way to pass arguments to a static method. Simplify?
public class TableExport {
private static final Logger logger = LoggerFactory.getLogger(TableExport.class);
@@ -59,9 +62,10 @@ public class TableExport {
private final String distCpDoAsUser;
private final HiveConf conf;
private final Paths paths;
+ private final MmContext mmCtx;
public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db,
- String distCpDoAsUser, HiveConf conf) {
+ String distCpDoAsUser, HiveConf conf, MmContext mmCtx) {
this.tableSpec = (tableSpec != null
&& tableSpec.tableHandle.isTemporary()
&& replicationSpec.isInReplicationScope())
@@ -76,6 +80,7 @@ public class TableExport {
this.distCpDoAsUser = distCpDoAsUser;
this.conf = conf;
this.paths = paths;
+ this.mmCtx = mmCtx;
}
public boolean write() throws SemanticException {
@@ -147,13 +152,13 @@ public class TableExport {
throw new IllegalStateException("partitions cannot be null for partitionTable :"
+ tableSpec.tableName);
}
- new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec);
+ new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec);
} else {
List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(),
replicationSpec, conf);
// this is the data copy
- new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf)
+ new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx)
.export(replicationSpec);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index 690498f..b61a945 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -20,24 +20,31 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.LoginException;
+
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
+//TODO: this object is created once to call one method and then immediately destroyed.
+//So it's basically just a roundabout way to pass arguments to a static method. Simplify?
public class FileOperations {
private static Logger logger = LoggerFactory.getLogger(FileOperations.class);
private final List<Path> dataPathList;
@@ -45,13 +52,15 @@ public class FileOperations {
private final String distCpDoAsUser;
private HiveConf hiveConf;
private final FileSystem dataFileSystem, exportFileSystem;
+ private final MmContext mmCtx;
- public FileOperations(List<Path> dataPathList, Path exportRootDataDir,
- String distCpDoAsUser, HiveConf hiveConf) throws IOException {
+ public FileOperations(List<Path> dataPathList, Path exportRootDataDir, String distCpDoAsUser,
+ HiveConf hiveConf, MmContext mmCtx) throws IOException {
this.dataPathList = dataPathList;
this.exportRootDataDir = exportRootDataDir;
this.distCpDoAsUser = distCpDoAsUser;
this.hiveConf = hiveConf;
+ this.mmCtx = mmCtx;
if ((dataPathList != null) && !dataPathList.isEmpty()) {
dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
} else {
@@ -72,17 +81,59 @@ public class FileOperations {
* This writes the actual data in the exportRootDataDir from the source.
*/
private void copyFiles() throws IOException, LoginException {
- for (Path dataPath : dataPathList) {
- FileStatus[] fileStatuses =
- LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataPath);
- List<Path> srcPaths = new ArrayList<>();
- for (FileStatus fileStatus : fileStatuses) {
- srcPaths.add(fileStatus.getPath());
+ if (mmCtx == null) {
+ for (Path dataPath : dataPathList) {
+ copyOneDataPath(dataPath, exportRootDataDir);
}
- new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths);
+ } else {
+ copyMmPath();
+ }
+ }
+
+ private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException {
+ FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath);
+ List<Path> srcPaths = new ArrayList<>();
+ for (FileStatus fileStatus : fileStatuses) {
+ srcPaths.add(fileStatus.getPath());
}
+
+ new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths);
}
+ private void copyMmPath() throws LoginException, IOException {
+ assert dataPathList.size() == 1;
+ ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName());
+ Path fromPath = dataFileSystem.makeQualified(dataPathList.get(0));
+ List<Path> validPaths = getMmValidPaths(ids, fromPath);
+ String fromPathStr = fromPath.toString();
+ if (!fromPathStr.endsWith(Path.SEPARATOR)) {
+ fromPathStr += Path.SEPARATOR;
+ }
+ for (Path validPath : validPaths) {
+ // Export valid directories with a modified name so they don't look like bases/deltas.
+ // We could also dump the delta contents all together and rename the files if names collide.
+ String mmChildPath = "export_old_" + validPath.toString().substring(fromPathStr.length());
+ Path destPath = new Path(exportRootDataDir, mmChildPath);
+ exportFileSystem.mkdirs(destPath);
+ copyOneDataPath(validPath, destPath);
+ }
+ }
+
+ private List<Path> getMmValidPaths(ValidWriteIdList ids, Path fromPath) throws IOException {
+ Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", fromPath);
+ AcidUtils.Directory acidState = AcidUtils.getAcidState(fromPath, hiveConf, ids);
+ List<Path> validPaths = new ArrayList<>();
+ Path base = acidState.getBaseDirectory();
+ if (base != null) {
+ validPaths.add(base);
+ }
+ for (ParsedDelta pd : acidState.getCurrentDirectories()) {
+ validPaths.add(pd.getPath());
+ }
+ return validPaths;
+ }
+
+
/**
* This needs the root data directory to which the data needs to be exported to.
* The data export here is a list of files either in table/partition that are written to the _files
@@ -90,8 +141,19 @@ public class FileOperations {
*/
private void exportFilesAsList() throws SemanticException, IOException {
try (BufferedWriter writer = writer()) {
- for (Path dataPath : dataPathList) {
- writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+ if (mmCtx != null) {
+ assert dataPathList.size() == 1;
+ Path dataPath = dataPathList.get(0);
+ ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(
+ hiveConf, mmCtx.getFqTableName());
+ List<Path> validPaths = getMmValidPaths(ids, dataPath);
+ for (Path mmPath : validPaths) {
+ writeFilesList(listFilesInDir(mmPath), writer, AcidUtils.getAcidSubDir(dataPath));
+ }
+ } else {
+ for (Path dataPath : dataPathList) {
+ writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
index c0e4a43..018983f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
@@ -33,15 +33,10 @@ public class CopyWork implements Serializable {
private Path[] fromPath;
private Path[] toPath;
private boolean errorOnSrcEmpty;
- private boolean isSkipMmDirs = false;
public CopyWork() {
}
- public CopyWork(final Path fromPath, final Path toPath) {
- this(fromPath, toPath, true);
- }
-
public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) {
this(new Path[] { fromPath }, new Path[] { toPath });
this.setErrorOnSrcEmpty(errorOnSrcEmpty);
@@ -92,17 +87,4 @@ public class CopyWork implements Serializable {
public boolean isErrorOnSrcEmpty() {
return errorOnSrcEmpty;
}
-
- /**
- * Whether the copy should ignore MM directories in the source, and copy their content to
- * destination directly, rather than copying the directories themselves.
- * */
- public void setSkipSourceMmDirs(boolean isMm) {
- this.isSkipMmDirs = isMm;
- }
-
- public boolean doSkipSourceMmDirs() {
- return isSkipMmDirs ;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
index 72ce798..d91569e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
@@ -17,39 +17,65 @@
*/
package org.apache.hadoop.hive.ql.plan;
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
-
@Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT,
Explain.Level.EXTENDED })
public class ExportWork implements Serializable {
- private Logger LOG = LoggerFactory.getLogger(ExportWork.class);
+ private static Logger LOG = LoggerFactory.getLogger(ExportWork.class);
private static final long serialVersionUID = 1L;
+ public final static class MmContext {
+ private final String fqTableName;
+
+ private MmContext(String fqTableName) {
+ this.fqTableName = fqTableName;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + fqTableName + "]";
+ }
+
+ public static MmContext createIfNeeded(Table t) {
+ if (t == null) return null;
+ if (!AcidUtils.isInsertOnlyTable(t.getParameters())) return null;
+ return new MmContext(AcidUtils.getFullTableName(t.getDbName(), t.getTableName()));
+ }
+
+ public String getFqTableName() {
+ return fqTableName;
+ }
+ }
+
private final String exportRootDirName;
private TableSpec tableSpec;
private ReplicationSpec replicationSpec;
private String astRepresentationForErrorMsg;
- private String qualifiedTableName;
+ private String acidFqTableName;
+ private final MmContext mmContext;
/**
- * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise
+ * @param acidFqTableName if exporting Acid table, this is temp table - null otherwise
*/
public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec,
- String astRepresentationForErrorMsg, String qualifiedTable) {
+ String astRepresentationForErrorMsg, String acidFqTableName, MmContext mmContext) {
this.exportRootDirName = exportRootDirName;
this.tableSpec = tableSpec;
this.replicationSpec = replicationSpec;
this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
- this.qualifiedTableName = qualifiedTable;
+ this.mmContext = mmContext;
+ this.acidFqTableName = acidFqTableName;
}
public String getExportRootDir() {
@@ -60,24 +86,16 @@ public class ExportWork implements Serializable {
return tableSpec;
}
- public void setTableSpec(TableSpec tableSpec) {
- this.tableSpec = tableSpec;
- }
-
public ReplicationSpec getReplicationSpec() {
return replicationSpec;
}
- public void setReplicationSpec(ReplicationSpec replicationSpec) {
- this.replicationSpec = replicationSpec;
- }
-
public String getAstRepresentationForErrorMsg() {
return astRepresentationForErrorMsg;
}
- public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) {
- this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
+ public MmContext getMmContext() {
+ return mmContext;
}
/**
@@ -88,10 +106,10 @@ public class ExportWork implements Serializable {
* for more info.
*/
public void acidPostProcess(Hive db) throws HiveException {
- if(qualifiedTableName != null) {
- LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName +
+ if (acidFqTableName != null) {
+ LOG.info("Swapping export of " + tableSpec.tableName + " to " + acidFqTableName +
" using partSpec=" + tableSpec.partSpec);
- tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true);
+ tableSpec = new TableSpec(db, acidFqTableName, tableSpec.partSpec, true);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 6faba42..3e2784b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -17,9 +17,24 @@
*/
package org.apache.hadoop.hive.ql;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
@@ -47,13 +62,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
/**
* The LockManager is not ready, but for no-concurrency straight-line path we can
* test AC=true, and AC=false with commit/rollback/exception and test resulting data.
@@ -152,6 +160,106 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1);
}
+ @Test
+ public void testMmExim() throws Exception {
+ String tableName = "mm_table", importName = tableName + "_import";
+ runStatementOnDriver("drop table if exists " + tableName);
+ runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " +
+ "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+ tableName));
+
+ // Regular insert: export some MM deltas, then import into a new table.
+ int[][] rows1 = {{1,2},{3,4}};
+ runStatementOnDriver(String.format("insert into %s (a,b) %s",
+ tableName, makeValuesClause(rows1)));
+ runStatementOnDriver(String.format("insert into %s (a,b) %s",
+ tableName, makeValuesClause(rows1)));
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+ org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(hiveConf);
+ Path exportPath = new Path(table.getSd().getLocation() + "_export");
+ fs.delete(exportPath, true);
+ runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath));
+ List<String> paths = listPathsRecursive(fs, exportPath);
+ verifyMmExportPaths(paths, 2);
+ runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath));
+ org.apache.hadoop.hive.metastore.api.Table imported = msClient.getTable("default", importName);
+ Assert.assertEquals(imported.toString(), "insert_only",
+ imported.getParameters().get("transactional_properties"));
+ Path importPath = new Path(imported.getSd().getLocation());
+ FileStatus[] stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter);
+ Assert.assertEquals(Arrays.toString(stat), 1, stat.length);
+ assertIsDelta(stat[0]);
+ List<String> allData = stringifyValues(rows1);
+ allData.addAll(stringifyValues(rows1));
+ allData.sort(null);
+ Collections.sort(allData);
+ List<String> rs = runStatementOnDriver(
+ String.format("select a,b from %s order by a,b", importName));
+ Assert.assertEquals("After import: " + rs, allData, rs);
+ runStatementOnDriver("drop table if exists " + importName);
+
+ // Do insert overwrite to create some invalid deltas, and import into a non-MM table.
+ int[][] rows2 = {{5,6},{7,8}};
+ runStatementOnDriver(String.format("insert overwrite table %s %s",
+ tableName, makeValuesClause(rows2)));
+ fs.delete(exportPath, true);
+ runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath));
+ paths = listPathsRecursive(fs, exportPath);
+ verifyMmExportPaths(paths, 1);
+ runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " +
+ "TBLPROPERTIES ('transactional'='false')", importName));
+ runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath));
+ imported = msClient.getTable("default", importName);
+ Assert.assertNull(imported.toString(), imported.getParameters().get("transactional"));
+ Assert.assertNull(imported.toString(),
+ imported.getParameters().get("transactional_properties"));
+ importPath = new Path(imported.getSd().getLocation());
+ stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter);
+ allData = stringifyValues(rows2);
+ Collections.sort(allData);
+ rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", importName));
+ Assert.assertEquals("After import: " + rs, allData, rs);
+ runStatementOnDriver("drop table if exists " + importName);
+ runStatementOnDriver("drop table if exists " + tableName);
+ msClient.close();
+ }
+
+ private void assertIsDelta(FileStatus stat) {
+ Assert.assertTrue(stat.toString(),
+ stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX));
+ }
+
+ private void verifyMmExportPaths(List<String> paths, int deltasOrBases) {
+ // 1 file, 1 dir for each, for now. Plus export "data" dir.
+ // This could be changed to a flat file list later.
+ Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size());
+ // No confusing directories in export.
+ for (String path : paths) {
+ Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX));
+ Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX));
+ }
+ }
+
+ private List<String> listPathsRecursive(FileSystem fs, Path path) throws IOException {
+ List<String> paths = new ArrayList<>();
+ LinkedList<Path> queue = new LinkedList<>();
+ queue.add(path);
+ while (!queue.isEmpty()) {
+ Path next = queue.pollFirst();
+ FileStatus[] stats = fs.listStatus(next, AcidUtils.hiddenFileFilter);
+ for (FileStatus stat : stats) {
+ Path child = stat.getPath();
+ paths.add(child.toString());
+ if (stat.isDirectory()) {
+ queue.add(child);
+ }
+ }
+ }
+ return paths;
+ }
+
+
/**
* add tests for all transitions - AC=t, AC=t, AC=f, commit (for example)
* @throws Exception
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 6daac1b..4f1d384 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -477,6 +477,7 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
}
private void testMM(boolean existingTable, boolean isSourceMM) throws Exception {
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+ hiveConf.setBoolean("mapred.input.dir.recursive", true);
int[][] data = {{1,2}, {3, 4}, {5, 6}};
runStatementOnDriver("drop table if exists T");
@@ -500,9 +501,10 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
//verify that we are indeed doing an Acid write (import)
rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME");
Assert.assertEquals(3, rs.size());
- Assert.assertTrue(rs.get(0).endsWith("t/delta_0000001_0000001_0000/000000_0"));
- Assert.assertTrue(rs.get(1).endsWith("t/delta_0000001_0000001_0000/000000_0"));
- Assert.assertTrue(rs.get(2).endsWith("t/delta_0000001_0000001_0000/000000_0"));
+ for (String s : rs) {
+ Assert.assertTrue(s, s.contains("/delta_0000001_0000001_0000/"));
+ Assert.assertTrue(s, s.endsWith("/000000_0"));
+ }
}
private void checkResult(String[][] expectedResult, String query, boolean isVectorized,
String msg) throws Exception{
@@ -516,6 +518,7 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
@Test
public void testMMExportAborted() throws Exception {
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+ hiveConf.setBoolean("mapred.input.dir.recursive", true);
int[][] data = {{1, 2}, {3, 4}, {5, 6}};
int[][] dataAbort = {{10, 2}};
runStatementOnDriver("drop table if exists T");
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index a2adb96..a88a570 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -207,7 +207,8 @@ public abstract class TxnCommandsBaseForTests {
void checkExpected(List<String> rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) {
LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
logResult(LOG, rs);
- Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size());
+ Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs,
+ expected.length, rs.size());
//verify data and layout
for(int i = 0; i < expected.length; i++) {
Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/queries/clientpositive/mm_exim.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_exim.q b/ql/src/test/queries/clientpositive/mm_exim.q
index c47342b..a2b6e08 100644
--- a/ql/src/test/queries/clientpositive/mm_exim.q
+++ b/ql/src/test/queries/clientpositive/mm_exim.q
@@ -59,13 +59,13 @@ drop table import1_mm;
drop table import2_mm;
import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart';
-desc import2_mm;
+desc formatted import2_mm;
select * from import2_mm order by key, p;
drop table import2_mm;
drop table import3_mm;
import table import3_mm from 'ql/test/data/exports/intermmediate_part';
-desc import3_mm;
+desc formatted import3_mm;
select * from import3_mm order by key, p;
drop table import3_mm;
http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/results/clientpositive/llap/mm_exim.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_exim.q.out b/ql/src/test/results/clientpositive/llap/mm_exim.q.out
index 1f40754..c683de3 100644
--- a/ql/src/test/results/clientpositive/llap/mm_exim.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_exim.q.out
@@ -292,14 +292,42 @@ POSTHOOK: type: IMPORT
#### A masked pattern was here ####
POSTHOOK: Output: database:default
POSTHOOK: Output: default@import2_mm
-PREHOOK: query: desc import2_mm
+PREHOOK: query: desc formatted import2_mm
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@import2_mm
-POSTHOOK: query: desc import2_mm
+POSTHOOK: query: desc formatted import2_mm
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@import2_mm
+# col_name data_type comment
key int
p int
+
+# Detailed Table Information
+Database: default
+#### A masked pattern was here ####
+Retention: 0
+#### A masked pattern was here ####
+Table Type: MANAGED_TABLE
+Table Parameters:
+ bucketing_version 2
+ numFiles 3
+ numRows 6
+ rawDataSize 37
+ totalSize 43
+ transactional true
+ transactional_properties insert_only
+#### A masked pattern was here ####
+
+# Storage Information
+SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+InputFormat: org.apache.hadoop.mapred.TextInputFormat
+OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+Compressed: No
+Num Buckets: -1
+Bucket Columns: []
+Sort Columns: []
+Storage Desc Params:
+ serialization.format 1
PREHOOK: query: select * from import2_mm order by key, p
PREHOOK: type: QUERY
PREHOOK: Input: default@import2_mm
@@ -338,18 +366,46 @@ POSTHOOK: Output: default@import3_mm
POSTHOOK: Output: default@import3_mm@p=455
POSTHOOK: Output: default@import3_mm@p=456
POSTHOOK: Output: default@import3_mm@p=457
-PREHOOK: query: desc import3_mm
+PREHOOK: query: desc formatted import3_mm
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@import3_mm
-POSTHOOK: query: desc import3_mm
+POSTHOOK: query: desc formatted import3_mm
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@import3_mm
+# col_name data_type comment
key int
-p int
# Partition Information
# col_name data_type comment
p int
+
+# Detailed Table Information
+Database: default
+#### A masked pattern was here ####
+Retention: 0
+#### A masked pattern was here ####
+Table Type: MANAGED_TABLE
+Table Parameters:
+ bucketing_version 2
+ numFiles 3
+ numPartitions 3
+ numRows 6
+ rawDataSize 13
+ totalSize 19
+ transactional true
+ transactional_properties insert_only
+#### A masked pattern was here ####
+
+# Storage Information
+SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+InputFormat: org.apache.hadoop.mapred.TextInputFormat
+OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+Compressed: No
+Num Buckets: -1
+Bucket Columns: []
+Sort Columns: []
+Storage Desc Params:
+ serialization.format 1
PREHOOK: query: select * from import3_mm order by key, p
PREHOOK: type: QUERY
PREHOOK: Input: default@import3_mm