You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/28 02:08:49 UTC
hive git commit: HIVE-15027 : make sure export takes MM information
into account (Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/hive-14535 e083d33ac -> 2e602596f
HIVE-15027 : make sure export takes MM information into account (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2e602596
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2e602596
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2e602596
Branch: refs/heads/hive-14535
Commit: 2e602596f7af6c302fd23628d4337673ca38be86
Parents: e083d33
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Oct 27 19:08:33 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Oct 27 19:08:33 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/metastore/ObjectStore.java | 1 -
.../apache/hadoop/hive/ql/exec/CopyTask.java | 15 +++--
.../apache/hadoop/hive/ql/exec/Utilities.java | 30 +++++++++
.../hive/ql/io/CombineHiveInputFormat.java | 29 ++++++---
.../hive/ql/parse/ExportSemanticAnalyzer.java | 66 +++++++++++++++++---
.../hive/ql/parse/ImportSemanticAnalyzer.java | 4 +-
.../ql/plan/ConditionalResolverMergeFiles.java | 1 +
.../apache/hadoop/hive/ql/plan/CopyWork.java | 53 ++++++++++++----
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 10 +++
9 files changed, 171 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index a1b3a09..8ad7059 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -636,7 +636,6 @@ public class ObjectStore implements RawStore, Configurable {
transactionStatus = TXN_STATUS.COMMITED;
try {
- LOG.error("TODO# grrrrr");
currentTransaction.commit();
} catch (Exception ex) {
Throwable candidate = ex;
http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index a8a44bc..9f89ea5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -53,19 +53,24 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
@Override
public int execute(DriverContext driverContext) {
+ Path[] from = work.getFromPaths(), to = work.getToPaths();
+ for (int i = 0; i < from.length; ++i) {
+ int result = copyOnePath(from[i], to[i]);
+ if (result != 0) return result;
+ }
+ return 0;
+ }
+
+ protected int copyOnePath(Path fromPath, Path toPath) {
FileSystem dstFs = null;
- Path toPath = null;
try {
- Path fromPath = work.getFromPath();
- toPath = work.getToPath();
-
console.printInfo("Copying data from " + fromPath.toString(), " to "
+ toPath.toString());
FileSystem srcFs = fromPath.getFileSystem(conf);
dstFs = toPath.getFileSystem(conf);
- FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.isSourceMm());
+ FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs());
if (srcs == null || srcs.length == 0) {
if (work.isErrorOnSrcEmpty()) {
console.printError("No files matching path: " + fromPath.toString());
http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 6774d4d..8e506aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -4077,4 +4077,34 @@ public final class Utilities {
}
}
+ /**
+ * @return the complete list of valid MM directories under a table/partition path; null
+ * if the entire directory is valid (has no uncommitted/temporary files).
+ */
+ public static List<Path> getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf,
+ ValidWriteIds ids, int lbLevels) throws IOException {
+ Utilities.LOG14535.info("Looking for valid MM paths under " + path);
+ // NULL means this directory is entirely valid.
+ List<Path> result = null;
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus[] children = (lbLevels == 0) ? fs.listStatus(path)
+ : fs.globStatus(new Path(path, StringUtils.repeat("*" + Path.SEPARATOR, lbLevels) + "*"));
+ for (int i = 0; i < children.length; ++i) {
+ FileStatus file = children[i];
+ Path childPath = file.getPath();
+ Long writeId = ValidWriteIds.extractWriteId(childPath);
+ if (!file.isDirectory() || writeId == null || !ids.isValid(writeId)) {
+ Utilities.LOG14535.info("Skipping path " + childPath);
+ if (result == null) {
+ result = new ArrayList<>(children.length - 1);
+ for (int j = 0; j < i; ++j) {
+ result.add(children[j].getPath());
+ }
+ }
+ } else if (result != null) {
+ result.add(childPath);
+ }
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index cc1de11..15d6b9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.io;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -85,20 +87,22 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
private final int start;
private final int length;
private final JobConf conf;
+ private final boolean isMerge;
- public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) {
+ public CheckNonCombinablePathCallable(
+ Path[] paths, int start, int length, JobConf conf, boolean isMerge) {
this.paths = paths;
this.start = start;
this.length = length;
this.conf = conf;
+ this.isMerge = isMerge;
}
@Override
public Set<Integer> call() throws Exception {
Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
for (int i = 0; i < length; i++) {
- PartitionDesc part =
- HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+ PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
pathToPartitionInfo, paths[i + start],
IOPrepareCache.get().allocatePartitionDescMap());
// Use HiveInputFormat if any of the paths is not splittable
@@ -107,12 +111,16 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
getInputFormatFromCache(inputFormatClass, conf);
boolean isAvoidSplitCombine = inputFormat instanceof AvoidSplitCombination &&
((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf);
- boolean isMmTable = MetaStoreUtils.isInsertOnlyTable(part.getTableDesc().getProperties());
- if (isAvoidSplitCombine || isMmTable) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The path [" + paths[i + start] +
+
+ // Combined splits are not supported for MM tables right now.
+ // However, the merge for MM always combines one directory and should ignore that it's MM.
+ boolean isMmTableNonMerge = !isMerge
+ && MetaStoreUtils.isInsertOnlyTable(part.getTableDesc().getProperties());
+ if (isAvoidSplitCombine || isMmTableNonMerge) {
+ //if (LOG.isDebugEnabled()) {
+ Utilities.LOG14535.info("The path [" + paths[i + start] +
"] is being parked for HiveInputFormat.getSplits");
- }
+ //}
nonCombinablePathIndices.add(i + start);
}
}
@@ -467,11 +475,12 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads);
try {
+ boolean isMerge = mrwork.isMergeFromResolver();
for (int i = 0; i < numThreads; i++) {
int start = i * numPathPerThread;
int length = i != numThreads - 1 ? numPathPerThread : paths.length - start;
- futureList.add(executor.submit(
- new CheckNonCombinablePathCallable(paths, start, length, job)));
+ futureList.add(executor.submit(new CheckNonCombinablePathCallable(
+ paths, start, length, job, isMerge)));
}
Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
for (Future<Set<Integer>> future : futureList) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 475f2c9..12dea9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -18,6 +18,16 @@
package org.apache.hadoop.hive.ql.parse;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import org.apache.hadoop.hive.common.ValidWriteIds;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
@@ -171,29 +181,69 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
.getMsg("Exception while writing out the local file"), e);
}
- if (!(replicationSpec.isMetadataOnly() || (ts == null))) {
+ if (replicationSpec.isMetadataOnly() || (ts == null)) return;
+
+ try {
Path parentPath = new Path(toURI);
+ boolean isMmTable = MetaStoreUtils.isInsertOnlyTable(ts.tableHandle.getParameters());
+ Utilities.LOG14535.info("Exporting table " + ts.tableName + " / "
+ + ts.tableHandle.getTableName() + ": " + isMmTable);
+
+ int lbLevels = isMmTable && ts.tableHandle.isStoredAsSubDirectories()
+ ? ts.tableHandle.getSkewedColNames().size() : 0;
+ ValidWriteIds ids = isMmTable ? db.getValidWriteIdsForTable(
+ ts.tableHandle.getDbName(), ts.tableHandle.getTableName()) : null;
if (ts.tableHandle.isPartitioned()) {
for (Partition partition : partitions) {
Path fromPath = partition.getDataLocation();
Path toPartPath = new Path(parentPath, partition.getName());
- Task<? extends Serializable> rTask = TaskFactory.get(
- new CopyWork(fromPath, toPartPath, false),
- conf);
- rootTasks.add(rTask);
+ CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toPartPath);
+ rootTasks.add(TaskFactory.get(cw, conf));
inputs.add(new ReadEntity(partition));
}
} else {
Path fromPath = ts.tableHandle.getDataLocation();
Path toDataPath = new Path(parentPath, "data");
- Task<? extends Serializable> rTask = TaskFactory.get(new CopyWork(
- fromPath, toDataPath, false), conf);
- rootTasks.add(rTask);
+ CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath);
+ rootTasks.add(TaskFactory.get(cw, conf));
inputs.add(new ReadEntity(ts.tableHandle));
}
outputs.add(toWriteEntity(parentPath));
+ } catch (HiveException | IOException ex) {
+ throw new SemanticException(ex);
}
+ }
+ private CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWriteIds ids,
+ Path fromPath, Path toDataPath) throws IOException {
+ List<Path> validPaths = null;
+ if (isMmTable) {
+ fromPath = fromPath.getFileSystem(conf).makeQualified(fromPath);
+ validPaths = Utilities.getValidMmDirectoriesFromTableOrPart(fromPath, conf, ids, lbLevels);
+ }
+ if (validPaths == null) {
+ return new CopyWork(fromPath, toDataPath, false); // Not MM, or no need to skip anything.
+ } else {
+ return createCopyWorkForValidPaths(fromPath, toDataPath, validPaths);
+ }
}
+ private CopyWork createCopyWorkForValidPaths(
+ Path fromPath, Path toPartPath, List<Path> validPaths) {
+ Path[] from = new Path[validPaths.size()], to = new Path[validPaths.size()];
+ int i = 0;
+ String fromPathStr = fromPath.toString();
+ if (!fromPathStr.endsWith(Path.SEPARATOR)) {
+ fromPathStr += "/";
+ }
+ for (Path validPath : validPaths) {
+ from[i] = validPath;
+ // TODO: assumes the results are already qualified.
+ to[i] = new Path(toPartPath, validPath.toString().substring(fromPathStr.length()));
+ Utilities.LOG14535.info("Will copy " + from[i] + " to " + to[i]
+ + " based on dest " + toPartPath + ", from " + fromPathStr + ", subpath " + validPath);
+ ++i;
+ }
+ return new CopyWork(from, to);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 87b85c8..8aa076f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -347,7 +347,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
+ mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
CopyWork cv = new CopyWork(dataPath, destPath, false);
- cv.setIsSourceMm(isSourceMm);
+ cv.setSkipSourceMmDirs(isSourceMm);
LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, mmWriteId);
MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false);
@@ -411,7 +411,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
+ srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
+ mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
- cw.setIsSourceMm(isSourceMm);
+ cw.setSkipSourceMmDirs(isSourceMm);
DDLWork dw = new DDLWork(getInputs(), getOutputs(), addPartitionDesc);
LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
partSpec.getPartSpec(), true, mmWriteId);
http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
index 4635f18..7fcb1ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
@@ -330,6 +330,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver,
mWork.setMinSplitSize(targetSize);
mWork.setMinSplitSizePerNode(targetSize);
mWork.setMinSplitSizePerRack(targetSize);
+ mWork.setIsMergeFromResolver(true);
}
private static class AverageSize {
http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
index 2e484ba..c08911f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
@@ -30,10 +30,10 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
@Explain(displayName = "Copy", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public class CopyWork implements Serializable {
private static final long serialVersionUID = 1L;
- private Path fromPath;
- private Path toPath;
+ private Path[] fromPath;
+ private Path[] toPath;
private boolean errorOnSrcEmpty;
- private boolean isMm = false;
+ private boolean isSkipMmDirs = false;
public CopyWork() {
}
@@ -43,18 +43,45 @@ public class CopyWork implements Serializable {
}
public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) {
+ this(new Path[] { fromPath }, new Path[] { toPath });
+ this.setErrorOnSrcEmpty(errorOnSrcEmpty);
+ }
+
+ public CopyWork(final Path[] fromPath, final Path[] toPath) {
+ if (fromPath.length != toPath.length) {
+ throw new RuntimeException(
+ "Cannot copy " + fromPath.length + " paths into " + toPath.length + " paths");
+ }
this.fromPath = fromPath;
this.toPath = toPath;
- this.setErrorOnSrcEmpty(errorOnSrcEmpty);
}
-
+
+ // Keep backward compat in explain for single-file copy tasks.
@Explain(displayName = "source", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
- public Path getFromPath() {
- return fromPath;
+ public Path getFromPathExplain() {
+ return (fromPath == null || fromPath.length > 1) ? null : fromPath[0];
}
@Explain(displayName = "destination", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
- public Path getToPath() {
+ public Path getToPathExplain() {
+ return (toPath == null || toPath.length > 1) ? null : toPath[0];
+ }
+
+ @Explain(displayName = "sources", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public Path[] getFromPathsExplain() {
+ return (fromPath != null && fromPath.length > 1) ? fromPath : null;
+ }
+
+ @Explain(displayName = "destinations", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public Path[] getToPathsExplain() {
+ return (toPath != null && toPath.length > 1) ? toPath : null;
+ }
+
+ public Path[] getFromPaths() {
+ return fromPath;
+ }
+
+ public Path[] getToPaths() {
return toPath;
}
@@ -66,12 +93,14 @@ public class CopyWork implements Serializable {
return errorOnSrcEmpty;
}
- public void setIsSourceMm(boolean isMm) {
- this.isMm = isMm;
+ /** Whether the copy should ignore MM directories in the source, and copy their content to
+ * destination directly, rather than copying the directories themselves. */
+ public void setSkipSourceMmDirs(boolean isMm) {
+ this.isSkipMmDirs = isMm;
}
- public boolean isSourceMm() {
- return isMm ;
+ public boolean doSkipSourceMmDirs() {
+ return isSkipMmDirs ;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 1be4d84..5a81a62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -148,6 +148,8 @@ public class MapWork extends BaseWork {
/** Whether LLAP IO will be used for inputs. */
private String llapIoDesc;
+ private boolean isMergeFromResolver;
+
public MapWork() {}
public MapWork(String name) {
@@ -718,4 +720,12 @@ public class MapWork extends BaseWork {
public VectorizedRowBatch getVectorizedRowBatch() {
return vectorizedRowBatch;
}
+
+ public void setIsMergeFromResolver(boolean b) {
+ this.isMergeFromResolver = b;
+ }
+
+ public boolean isMergeFromResolver() {
+ return this.isMergeFromResolver;
+ }
}