You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/15 03:31:35 UTC
[12/13] hive git commit: HIVE-14990 : run all tests for MM tables and
fix the issues that are found - more issues (Sergey Shelukhin)
HIVE-14990 : run all tests for MM tables and fix the issues that are found - more issues (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1155ed75
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1155ed75
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1155ed75
Branch: refs/heads/hive-14535
Commit: 1155ed756dd4cf0c73494ee146a47e9e0aa39575
Parents: 46e7657
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Nov 14 19:27:09 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Nov 14 19:27:09 2016 -0800
----------------------------------------------------------------------
.../apache/hadoop/hive/common/FileUtils.java | 6 +
.../hadoop/hive/common/ValidWriteIds.java | 39 ++++++-
.../hive/ql/exec/AbstractFileMergeOperator.java | 4 +-
.../apache/hadoop/hive/ql/exec/CopyTask.java | 1 +
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 3 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 2 +-
.../hadoop/hive/ql/exec/TableScanOperator.java | 4 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 51 ++++++---
.../hive/ql/io/CombineHiveInputFormat.java | 14 ++-
.../hadoop/hive/ql/io/merge/MergeFileWork.java | 7 +-
.../ql/io/rcfile/stats/PartialScanMapper.java | 5 +-
.../ql/io/rcfile/stats/PartialScanTask.java | 2 +-
.../ql/io/rcfile/stats/PartialScanWork.java | 6 +-
.../formatting/JsonMetaDataFormatter.java | 2 +-
.../formatting/TextMetaDataFormatter.java | 114 ++++++++++---------
.../hive/ql/optimizer/GenMRTableScan1.java | 4 +-
.../hive/ql/optimizer/GenMapRedUtils.java | 2 +-
.../physical/GenMRSkewJoinProcessor.java | 2 +-
.../physical/GenSparkSkewJoinProcessor.java | 4 +-
.../physical/LocalMapJoinProcFactory.java | 2 +-
.../physical/index/IndexWhereProcessor.java | 2 +-
.../ql/parse/AlterTablePartMergeFilesDesc.java | 9 ++
.../hive/ql/parse/DDLSemanticAnalyzer.java | 1 +
.../hadoop/hive/ql/parse/IndexUpdater.java | 40 ++++---
.../hive/ql/parse/ProcessAnalyzeTable.java | 4 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 22 ++--
.../parse/spark/SparkProcessAnalyzeTable.java | 4 +-
.../hadoop/hive/ql/plan/FileSinkDesc.java | 10 +-
.../hadoop/hive/ql/plan/PartitionDesc.java | 11 ++
.../apache/hadoop/hive/ql/plan/TableDesc.java | 7 ++
.../hive/ql/exec/TestFileSinkOperator.java | 3 +-
31 files changed, 250 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 95a553b..2660cce 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.io.HdfsUtils;
import org.apache.hadoop.hive.shims.HadoopShims;
@@ -517,6 +518,11 @@ public final class FileUtils {
return true;
}
+ public static boolean mkdir(FileSystem fs, Path f, Configuration conf) throws IOException {
+ boolean inheritPerms = HiveConf.getBoolVar(conf, ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+ return mkdir(fs, f, inheritPerms, conf);
+ }
+
/**
* Creates the directory and all necessary parent directories.
* @param fs FileSystem to use
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
index 88d6dfa..2ce4040 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
@@ -31,6 +31,7 @@ public class ValidWriteIds {
public static final ValidWriteIds NO_WRITE_IDS = new ValidWriteIds(-1, -1, false, null);
public static final String MM_PREFIX = "mm";
+ private static final String CURRENT_SUFFIX = ".current";
private final static Logger LOG = LoggerFactory.getLogger(ValidWriteIds.class);
@@ -53,9 +54,11 @@ public class ValidWriteIds {
}
public static ValidWriteIds createFromConf(Configuration conf, String fullTblName) {
- String idStr = conf.get(createConfKey(fullTblName), null);
+ String key = createConfKey(fullTblName);
+ String idStr = conf.get(key, null);
+ String current = conf.get(key + CURRENT_SUFFIX, null);
if (idStr == null || idStr.isEmpty()) return null;
- return new ValidWriteIds(idStr);
+ return new ValidWriteIds(idStr, current);
}
private static String createConfKey(String dbName, String tblName) {
@@ -66,7 +69,7 @@ public class ValidWriteIds {
return VALID_WRITEIDS_PREFIX + fullName;
}
- private ValidWriteIds(String src) {
+ private ValidWriteIds(String src, String current) {
// TODO: lifted from ACID config implementation... optimize if needed? e.g. ranges, base64
String[] values = src.split(":");
highWatermark = Long.parseLong(values[0]);
@@ -77,25 +80,48 @@ public class ValidWriteIds {
for(int i = 3; i < values.length; ++i) {
ids.add(Long.parseLong(values[i]));
}
+ if (current != null) {
+ long currentId = Long.parseLong(current);
+ if (areIdsValid) {
+ ids.add(currentId);
+ } else {
+ ids.remove(currentId);
+ }
+ }
+ } else if (current != null) {
+ long currentId = Long.parseLong(current);
+ areIdsValid = true;
+ ids = new HashSet<Long>();
+ ids.add(currentId);
} else {
areIdsValid = false;
ids = null;
}
}
+ public static void addCurrentToConf(
+ Configuration conf, String dbName, String tblName, long mmWriteId) {
+ String key = createConfKey(dbName, tblName) + CURRENT_SUFFIX;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting " + key + " => " + mmWriteId);
+ }
+ conf.set(key, Long.toString(mmWriteId));
+ }
+
public void addToConf(Configuration conf, String dbName, String tblName) {
if (source == null) {
source = toString();
}
+ String key = createConfKey(dbName, tblName);
if (LOG.isDebugEnabled()) {
- LOG.debug("Setting " + createConfKey(dbName, tblName) + " => " + source);
+ LOG.debug("Setting " + key + " => " + source
+ + " (old value was " + conf.get(key, null) + ")");
}
- conf.set(createConfKey(dbName, tblName), source);
+ conf.set(key, source);
}
public static void clearConf(HiveConf conf, String dbName, String tblName) {
if (LOG.isDebugEnabled()) {
- // TODO# remove
LOG.debug("Unsetting " + createConfKey(dbName, tblName));
}
conf.unset(createConfKey(dbName, tblName));
@@ -188,4 +214,5 @@ public class ValidWriteIds {
}
return writeId;
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index dedbb78..1315b99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -123,7 +123,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
outPath = new Path(ttp, Utilities.toTempPath(taskId));
}
Utilities.LOG14535.info("Paths for merge " + taskId + ": tmp " + tmpPath + ", task "
- + taskTmpPath + ", final " + finalPath + ", out " + outPath, new Exception());
+ + taskTmpPath + ", final " + finalPath + ", out " + outPath);
}
/**
@@ -297,7 +297,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
// We don't expect missing buckets from mere (actually there should be no buckets),
// so just pass null as bucketing context. Union suffix should also be accounted for.
Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success,
- dpLevels, lbLevels, null, mmWriteId, reporter);
+ dpLevels, lbLevels, null, mmWriteId, reporter, false);
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index 9f89ea5..e8526f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -110,6 +110,7 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
// Note: initially copied from LoadSemanticAnalyzer.
private static FileStatus[] matchFilesOrDir(
FileSystem fs, Path path, boolean isSourceMm) throws IOException {
+ if (!fs.exists(path)) return null;
if (!isSourceMm) return matchFilesOneDir(fs, path, null);
// TODO: this doesn't handle list bucketing properly. Does the original exim do that?
FileStatus[] mmDirs = fs.listStatus(path, new ValidWriteIds.AnyIdDirFilter());
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 1e348c6..be65f49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -651,7 +651,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
// merge work only needs input and output.
MergeFileWork mergeWork = new MergeFileWork(mergeFilesDesc.getInputDir(),
- mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass().getName());
+ mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass().getName(),
+ mergeFilesDesc.getTableDesc());
LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>();
ArrayList<String> inputDirstr = new ArrayList<String>(1);
inputDirstr.add(mergeFilesDesc.getInputDir().toString());
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 2864af4..5406de9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -1230,7 +1230,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
MissingBucketsContext mbc = new MissingBucketsContext(
conf.getTableInfo(), numBuckets, conf.getCompressed());
Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success,
- dpLevels, lbLevels, mbc, conf.getMmWriteId(), reporter);
+ dpLevels, lbLevels, mbc, conf.getMmWriteId(), reporter, conf.isMmCtas());
}
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index 68477ca..bb0af7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -78,11 +78,11 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
private String schemaEvolutionColumns;
private String schemaEvolutionColumnsTypes;
- public TableDesc getTableDesc() {
+ public TableDesc getTableDescSkewJoin() {
return tableDesc;
}
- public void setTableDesc(TableDesc tableDesc) {
+ public void setTableDescSkewJoin(TableDesc tableDesc) {
this.tableDesc = tableDesc;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 8c9f622..1b7a8a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1113,6 +1113,9 @@ public final class Utilities {
if (orig.getName().indexOf(tmpPrefix) == 0) {
return orig;
}
+ if (orig.getName().contains("=1")) {
+ LOG.error("TODO# creating tmp path from " + orig, new Exception());
+ }
return new Path(orig.getParent(), tmpPrefix + orig.getName());
}
@@ -3312,8 +3315,8 @@ public final class Utilities {
if (op instanceof FileSinkOperator) {
FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
+ if (fdesc.isMmTable()) continue; // No need to create for MM tables
Path tempDir = fdesc.getDirName();
-
if (tempDir != null) {
Path tempPath = Utilities.toTempPath(tempDir);
FileSystem fs = tempPath.getFileSystem(conf);
@@ -3970,7 +3973,7 @@ public final class Utilities {
public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf,
boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long mmWriteId,
- Reporter reporter) throws IOException, HiveException {
+ Reporter reporter, boolean isMmCtas) throws IOException, HiveException {
FileSystem fs = specPath.getFileSystem(hconf);
Path manifestDir = getManifestDir(specPath, mmWriteId, unionSuffix);
if (!success) {
@@ -3982,20 +3985,30 @@ public final class Utilities {
Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + mmWriteId + ")");
// TODO# may be wrong if there are no splits (empty insert/CTAS)
- FileStatus[] manifestFiles = fs.listStatus(manifestDir);
List<Path> manifests = new ArrayList<>();
- if (manifestFiles != null) {
- for (FileStatus status : manifestFiles) {
- Path path = status.getPath();
- if (path.getName().endsWith(MANIFEST_EXTENSION)) {
- Utilities.LOG14535.info("Reading manifest " + path);
- manifests.add(path);
+ if (fs.exists(manifestDir)) {
+ FileStatus[] manifestFiles = fs.listStatus(manifestDir);
+ if (manifestFiles != null) {
+ for (FileStatus status : manifestFiles) {
+ Path path = status.getPath();
+ if (path.getName().endsWith(MANIFEST_EXTENSION)) {
+ Utilities.LOG14535.info("Reading manifest " + path);
+ manifests.add(path);
+ }
}
}
+ } else {
+ Utilities.LOG14535.info("No manifests found - query produced no output");
+ manifestDir = null;
}
Utilities.LOG14535.info("Looking for files in: " + specPath);
ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
+ if (isMmCtas && !fs.exists(specPath)) {
+ // TODO: do we also need to do this when creating an empty partition from select?
+ Utilities.LOG14535.info("Creating table directory for CTAS with no output at " + specPath);
+ FileUtils.mkdir(fs, specPath, hconf);
+ }
Path[] files = getMmDirectoryCandidates(
fs, specPath, dpLevels, lbLevels, filter, mmWriteId, hconf);
ArrayList<Path> mmDirectories = new ArrayList<>();
@@ -4019,15 +4032,17 @@ public final class Utilities {
}
}
- Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
- tryDelete(fs, manifestDir);
- if (unionSuffix != null) {
- // Also delete the parent directory if we are the last union FSOP to execute.
- manifestDir = manifestDir.getParent();
- FileStatus[] remainingFiles = fs.listStatus(manifestDir);
- if (remainingFiles == null || remainingFiles.length == 0) {
- Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
- tryDelete(fs, manifestDir);
+ if (manifestDir != null) {
+ Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
+ tryDelete(fs, manifestDir);
+ if (unionSuffix != null) {
+ // Also delete the parent directory if we are the last union FSOP to execute.
+ manifestDir = manifestDir.getParent();
+ FileStatus[] remainingFiles = fs.listStatus(manifestDir);
+ if (remainingFiles == null || remainingFiles.length == 0) {
+ Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
+ tryDelete(fs, manifestDir);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index f0257ff..86397af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -111,12 +111,16 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
getInputFormatFromCache(inputFormatClass, conf);
boolean isAvoidSplitCombine = inputFormat instanceof AvoidSplitCombination &&
((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf);
+ TableDesc tbl = part.getTableDesc();
+ boolean isMmNonMerge = false;
+ if (tbl != null) {
+ isMmNonMerge = !isMerge && MetaStoreUtils.isInsertOnlyTable(tbl.getProperties());
+ } else {
+ // This would be the case for obscure tasks like truncate column (unsupported for MM).
+ Utilities.LOG14535.warn("Assuming not insert-only; no table in partition spec " + part);
+ }
- // Combined splits are not supported for MM tables right now.
- // However, the merge for MM always combines one directory and should ignore that it's MM.
- boolean isMmTableNonMerge = !isMerge
- && MetaStoreUtils.isInsertOnlyTable(part.getTableDesc().getProperties());
- if (isAvoidSplitCombine || isMmTableNonMerge) {
+ if (isAvoidSplitCombine || isMmNonMerge) {
//if (LOG.isDebugEnabled()) {
Utilities.LOG14535.info("The path [" + paths[i + start] +
"] is being parked for HiveInputFormat.getSplits");
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
index 94b9431..8d340df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
@@ -60,13 +60,13 @@ public class MergeFileWork extends MapWork {
private Class<? extends InputFormat> internalInputFormat;
public MergeFileWork(List<Path> inputPaths, Path outputDir,
- String srcTblInputFormat) {
- this(inputPaths, outputDir, false, srcTblInputFormat);
+ String srcTblInputFormat, TableDesc tbl) {
+ this(inputPaths, outputDir, false, srcTblInputFormat, tbl);
}
public MergeFileWork(List<Path> inputPaths, Path outputDir,
boolean hasDynamicPartitions,
- String srcTblInputFormat) {
+ String srcTblInputFormat, TableDesc tbl) {
this.inputPaths = inputPaths;
this.outputDir = outputDir;
this.hasDynamicPartitions = hasDynamicPartitions;
@@ -78,6 +78,7 @@ public class MergeFileWork extends MapWork {
this.internalInputFormat = RCFileBlockMergeInputFormat.class;
}
partDesc.setInputFileFormatClass(internalInputFormat);
+ partDesc.setTableDesc(tbl);
for (Path path : this.inputPaths) {
this.addPathToPartitionInfo(path, partDesc);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
index 09e4a47..9a6406d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
@@ -88,8 +88,9 @@ public class PartialScanMapper extends MapReduceBase implements
}
try {
- //CombineHiveInputFormat is set in PartialScanTask.
- RCFileKeyBufferWrapper key = (RCFileKeyBufferWrapper) ((CombineHiveKey) k).getKey();
+ //CombineHiveInputFormat may be set in PartialScanTask.
+ RCFileKeyBufferWrapper key = (RCFileKeyBufferWrapper)
+ ((k instanceof CombineHiveKey) ? ((CombineHiveKey) k).getKey() : k);
// calculate rawdatasize
KeyBuffer keyBuffer = key.getKeyBuffer();
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index dcd0e97..c8cd27d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -350,7 +350,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
}
QueryState queryState = new QueryState(new HiveConf(conf, PartialScanTask.class));
- PartialScanWork mergeWork = new PartialScanWork(inputPaths);
+ PartialScanWork mergeWork = new PartialScanWork(inputPaths, null);
DriverContext driverCxt = new DriverContext();
PartialScanTask taskExec = new PartialScanTask();
taskExec.initialize(queryState, null, driverCxt, new CompilationOpContext());
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
index 919cea0..d8ee7d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.mapred.Mapper;
/**
@@ -47,15 +48,18 @@ public class PartialScanWork extends MapWork implements Serializable {
private transient List<Path> inputPaths;
private String aggKey;
private String statsTmpDir;
+ private TableDesc tblDesc;
public PartialScanWork() {
}
- public PartialScanWork(List<Path> inputPaths) {
+ public PartialScanWork(List<Path> inputPaths, TableDesc tblDesc) {
super();
this.inputPaths = inputPaths;
+ this.tblDesc = tblDesc;
PartitionDesc partDesc = new PartitionDesc();
partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class);
+ partDesc.setTableDesc(tblDesc);
for(Path path: this.inputPaths) {
this.addPathToPartitionInfo(path, partDesc);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
index 3315806..68079c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
@@ -238,7 +238,7 @@ public class JsonMetaDataFormatter implements MetaDataFormatter {
* @param tblPath not NULL
* @throws IOException
*/
- // Duplicates logic in TextMetaDataFormatter
+ // Duplicates logic in TextMetaDataFormatter TODO: wtf?!!
private void putFileSystemsStats(MapBuilder builder, List<Path> locations,
HiveConf conf, Path tblPath)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
index b990bda..22908d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.metadata.formatting;
import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -280,28 +281,30 @@ class TextMetaDataFormatter implements MetaDataFormatter {
} catch (IOException e) {
throw new HiveException(e);
}
- }
+ }
+
+ private static class FileData {
+ public long totalFileSize = 0;
+ public long maxFileSize = 0;
+ public long minFileSize = Long.MAX_VALUE;
+ public long lastAccessTime = 0;
+ public long lastUpdateTime = 0;
+ public int numOfFiles = 0;
+ }
+ // TODO: why is this in text formatter? grrr
private void writeFileSystemStats(DataOutputStream outStream,
HiveConf conf,
List<Path> locations,
- Path tblPath, boolean partSpecified, int indent)
- throws IOException
- {
- long totalFileSize = 0;
- long maxFileSize = 0;
- long minFileSize = Long.MAX_VALUE;
- long lastAccessTime = 0;
- long lastUpdateTime = 0;
- int numOfFiles = 0;
-
+ Path tblPath, boolean partSpecified, int indent) throws IOException {
+ FileData fd = new FileData();
boolean unknown = false;
FileSystem fs = tblPath.getFileSystem(conf);
// in case all files in locations do not exist
try {
FileStatus tmpStatus = fs.getFileStatus(tblPath);
- lastAccessTime = tmpStatus.getAccessTime();
- lastUpdateTime = tmpStatus.getModificationTime();
+ fd.lastAccessTime = tmpStatus.getAccessTime();
+ fd.lastUpdateTime = tmpStatus.getModificationTime();
if (partSpecified) {
// check whether the part exists or not in fs
tmpStatus = fs.getFileStatus(locations.get(0));
@@ -316,42 +319,12 @@ class TextMetaDataFormatter implements MetaDataFormatter {
for (Path loc : locations) {
try {
FileStatus status = fs.getFileStatus(tblPath);
- FileStatus[] files = fs.listStatus(loc);
- long accessTime = status.getAccessTime();
- long updateTime = status.getModificationTime();
// no matter loc is the table location or part location, it must be a
// directory.
if (!status.isDir()) {
continue;
}
- if (accessTime > lastAccessTime) {
- lastAccessTime = accessTime;
- }
- if (updateTime > lastUpdateTime) {
- lastUpdateTime = updateTime;
- }
- for (FileStatus currentStatus : files) {
- if (currentStatus.isDir()) {
- continue;
- }
- numOfFiles++;
- long fileLen = currentStatus.getLen();
- totalFileSize += fileLen;
- if (fileLen > maxFileSize) {
- maxFileSize = fileLen;
- }
- if (fileLen < minFileSize) {
- minFileSize = fileLen;
- }
- accessTime = currentStatus.getAccessTime();
- updateTime = currentStatus.getModificationTime();
- if (accessTime > lastAccessTime) {
- lastAccessTime = accessTime;
- }
- if (updateTime > lastUpdateTime) {
- lastUpdateTime = updateTime;
- }
- }
+ processDir(status, fs, fd);
} catch (IOException e) {
// ignore
}
@@ -363,29 +336,29 @@ class TextMetaDataFormatter implements MetaDataFormatter {
outStream.write(Utilities.INDENT.getBytes("UTF-8"));
}
outStream.write("totalNumberFiles:".getBytes("UTF-8"));
- outStream.write((unknown ? unknownString : "" + numOfFiles).getBytes("UTF-8"));
+ outStream.write((unknown ? unknownString : "" + fd.numOfFiles).getBytes("UTF-8"));
outStream.write(terminator);
for (int k = 0; k < indent; k++) {
outStream.write(Utilities.INDENT.getBytes("UTF-8"));
}
outStream.write("totalFileSize:".getBytes("UTF-8"));
- outStream.write((unknown ? unknownString : "" + totalFileSize).getBytes("UTF-8"));
+ outStream.write((unknown ? unknownString : "" + fd.totalFileSize).getBytes("UTF-8"));
outStream.write(terminator);
for (int k = 0; k < indent; k++) {
outStream.write(Utilities.INDENT.getBytes("UTF-8"));
}
outStream.write("maxFileSize:".getBytes("UTF-8"));
- outStream.write((unknown ? unknownString : "" + maxFileSize).getBytes("UTF-8"));
+ outStream.write((unknown ? unknownString : "" + fd.maxFileSize).getBytes("UTF-8"));
outStream.write(terminator);
for (int k = 0; k < indent; k++) {
outStream.write(Utilities.INDENT.getBytes("UTF-8"));
}
outStream.write("minFileSize:".getBytes("UTF-8"));
- if (numOfFiles > 0) {
- outStream.write((unknown ? unknownString : "" + minFileSize).getBytes("UTF-8"));
+ if (fd.numOfFiles > 0) {
+ outStream.write((unknown ? unknownString : "" + fd.minFileSize).getBytes("UTF-8"));
} else {
outStream.write((unknown ? unknownString : "" + 0).getBytes("UTF-8"));
}
@@ -395,17 +368,52 @@ class TextMetaDataFormatter implements MetaDataFormatter {
outStream.write(Utilities.INDENT.getBytes("UTF-8"));
}
outStream.write("lastAccessTime:".getBytes("UTF-8"));
- outStream.writeBytes((unknown || lastAccessTime < 0) ? unknownString : ""
- + lastAccessTime);
+ outStream.writeBytes((unknown || fd.lastAccessTime < 0) ? unknownString : ""
+ + fd.lastAccessTime);
outStream.write(terminator);
for (int k = 0; k < indent; k++) {
outStream.write(Utilities.INDENT.getBytes("UTF-8"));
}
outStream.write("lastUpdateTime:".getBytes("UTF-8"));
- outStream.write((unknown ? unknownString : "" + lastUpdateTime).getBytes("UTF-8"));
+ outStream.write((unknown ? unknownString : "" + fd.lastUpdateTime).getBytes("UTF-8"));
outStream.write(terminator);
- }
+ }
+
+ private void processDir(FileStatus status, FileSystem fs, FileData fd) throws IOException {
+ long accessTime = status.getAccessTime();
+ long updateTime = status.getModificationTime();
+ if (accessTime > fd.lastAccessTime) {
+ fd.lastAccessTime = accessTime;
+ }
+ if (updateTime > fd.lastUpdateTime) {
+ fd.lastUpdateTime = updateTime;
+ }
+ FileStatus[] files = fs.listStatus(status.getPath());
+ for (FileStatus currentStatus : files) {
+ if (currentStatus.isDir()) {
+ processDir(currentStatus, fs, fd);
+ continue;
+ }
+ fd.numOfFiles++;
+ long fileLen = currentStatus.getLen();
+ fd.totalFileSize += fileLen;
+ if (fileLen > fd.maxFileSize) {
+ fd.maxFileSize = fileLen;
+ }
+ if (fileLen < fd.minFileSize) {
+ fd.minFileSize = fileLen;
+ }
+ accessTime = currentStatus.getAccessTime();
+ updateTime = currentStatus.getModificationTime();
+ if (accessTime > fd.lastAccessTime) {
+ fd.lastAccessTime = accessTime;
+ }
+ if (updateTime > fd.lastUpdateTime) {
+ fd.lastUpdateTime = updateTime;
+ }
+ }
+ }
/**
* Show the table partitions.
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
index 9297a0b..78d1e54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -195,7 +196,8 @@ public class GenMRTableScan1 implements NodeProcessor {
aggregationKey = aggregationKeyBuffer.toString();
// scan work
- PartialScanWork scanWork = new PartialScanWork(inputPaths);
+ PartialScanWork scanWork = new PartialScanWork(inputPaths,
+ Utilities.getTableDesc(op.getConf().getTableMetadata()));
scanWork.setMapperCannotSpanPartns(true);
scanWork.setAggKey(aggregationKey);
scanWork.setStatsTmpDir(op.getConf().getTmpStatsDir(), parseCtx.getConf());
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index c3228d3..5107a89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1598,7 +1598,7 @@ public final class GenMapRedUtils {
Utilities.LOG14535.info("creating mergefilework from " + inputDirs + " to " + finalName);
// create the merge file work
MergeFileWork work = new MergeFileWork(inputDirs, finalName,
- hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName());
+ hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName(), tblDesc);
LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>();
pathToAliases.put(inputDir, inputDirstr);
work.setMapperCannotSpanPartns(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index ede4fcb..b1f4577 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -255,7 +255,7 @@ public final class GenMRSkewJoinProcessor {
Operator<? extends OperatorDesc> ts =
GenMapRedUtils.createTemporaryTableScanOperator(
joinOp.getCompilationOpContext(), rowSchemaList.get((byte)k));
- ((TableScanOperator)ts).setTableDesc(tableDescList.get((byte)k));
+ ((TableScanOperator)ts).setTableDescSkewJoin(tableDescList.get((byte)k));
parentOps[k] = ts;
}
Operator<? extends OperatorDesc> tblScan_op = parentOps[i];
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index 405c3ca..38bb847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -231,7 +231,7 @@ public class GenSparkSkewJoinProcessor {
for (int k = 0; k < tags.length; k++) {
Operator<? extends OperatorDesc> ts = GenMapRedUtils.createTemporaryTableScanOperator(
joinOp.getCompilationOpContext(), rowSchemaList.get((byte) k));
- ((TableScanOperator) ts).setTableDesc(tableDescList.get((byte) k));
+ ((TableScanOperator) ts).setTableDescSkewJoin(tableDescList.get((byte) k));
parentOps[k] = ts;
}
@@ -362,7 +362,7 @@ public class GenSparkSkewJoinProcessor {
HashTableDummyDesc desc = new HashTableDummyDesc();
HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(
tableScan.getCompilationOpContext(), desc);
- dummyOp.getConf().setTbl(tableScan.getTableDesc());
+ dummyOp.getConf().setTbl(tableScan.getTableDescSkewJoin());
MapJoinOperator mapJoinOp = (MapJoinOperator) tableScan.getChildOperators().get(0);
mapJoinOp.replaceParent(tableScan, dummyOp);
List<Operator<? extends OperatorDesc>> mapJoinChildren =
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
index 9ca815c..af3175e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
@@ -209,7 +209,7 @@ public final class LocalMapJoinProcFactory {
if (parent.getSchema() == null) {
if (parent instanceof TableScanOperator) {
- tbl = ((TableScanOperator) parent).getTableDesc();
+ tbl = ((TableScanOperator) parent).getTableDescSkewJoin();
} else {
throw new SemanticException("Expected parent operator of type TableScanOperator." +
"Found " + parent.getClass().getName() + " instead.");
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
index 81e99fc..e036cd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
@@ -118,7 +118,7 @@ public class IndexWhereProcessor implements NodeProcessor {
// get potential reentrant index queries from each index
Map<Index, HiveIndexQueryContext> queryContexts = new HashMap<Index, HiveIndexQueryContext>();
// make sure we have an index on the table being scanned
- TableDesc tblDesc = operator.getTableDesc();
+ TableDesc tblDesc = operator.getTableDescSkewJoin();
Map<String, List<Index>> indexesByType = new HashMap<String, List<Index>>();
for (Index indexOnTable : indexes) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
index bdb872a..7670b86 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -39,6 +40,7 @@ public class AlterTablePartMergeFilesDesc {
private List<Path> inputDir = new ArrayList<Path>();
private Path outputDir = null;
private Class<? extends InputFormat> inputFormatClass;
+ private TableDesc tableDesc;
public AlterTablePartMergeFilesDesc(String tableName,
HashMap<String, String> partSpec) {
@@ -102,4 +104,11 @@ public class AlterTablePartMergeFilesDesc {
this.inputFormatClass = inputFormatClass;
}
+ public void setTableDesc(TableDesc tableDesc) {
+ this.tableDesc = tableDesc;
+ }
+
+ public TableDesc getTableDesc() {
+ return tableDesc;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index e6a31e8..150db52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1603,6 +1603,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
if (MetaStoreUtils.isInsertOnlyTable(tblObj.getParameters())) {
throw new SemanticException("Merge is not supported for MM tables");
}
+ mergeDesc.setTableDesc(Utilities.getTableDesc(tblObj));
List<String> bucketCols = null;
Class<? extends InputFormat> inputFormatClass = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
index 653b657..d3b4da1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidWriteIds;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.ql.Driver;
@@ -43,6 +44,7 @@ import java.util.Set;
public class IndexUpdater {
private List<LoadTableDesc> loadTableWork;
private HiveConf conf;
+ private Configuration parentConf;
// Assumes one instance of this + single-threaded compilation for each query.
private Hive hive;
private List<Task<? extends Serializable>> tasks;
@@ -52,6 +54,7 @@ public class IndexUpdater {
public IndexUpdater(List<LoadTableDesc> loadTableWork, Set<ReadEntity> inputs, Configuration conf) {
this.loadTableWork = loadTableWork;
this.inputs = inputs;
+ this.parentConf = conf;
this.conf = new HiveConf(conf, IndexUpdater.class);
this.tasks = new LinkedList<Task<? extends Serializable>>();
}
@@ -60,6 +63,7 @@ public class IndexUpdater {
Configuration conf) {
this.loadTableWork = new LinkedList<LoadTableDesc>();
this.loadTableWork.add(loadTableWork);
+ this.parentConf = conf;
this.conf = new HiveConf(conf, IndexUpdater.class);
this.tasks = new LinkedList<Task<? extends Serializable>>();
this.inputs = inputs;
@@ -75,16 +79,15 @@ public class IndexUpdater {
Map<String, String> partSpec = ltd.getPartitionSpec();
if (partSpec == null || partSpec.size() == 0) {
//unpartitioned table, update whole index
- doIndexUpdate(tblIndexes);
+ doIndexUpdate(tblIndexes, ltd.getMmWriteId());
} else {
- doIndexUpdate(tblIndexes, partSpec);
+ doIndexUpdate(tblIndexes, partSpec, ltd.getMmWriteId());
}
}
return tasks;
}
- private void doIndexUpdate(List<Index> tblIndexes) throws HiveException {
- Driver driver = new Driver(this.conf);
+ private void doIndexUpdate(List<Index> tblIndexes, Long mmWriteId) throws HiveException {
for (Index idx : tblIndexes) {
StringBuilder sb = new StringBuilder();
sb.append("ALTER INDEX ");
@@ -93,23 +96,21 @@ public class IndexUpdater {
sb.append(idx.getDbName()).append('.');
sb.append(idx.getOrigTableName());
sb.append(" REBUILD");
- driver.compile(sb.toString(), false);
- tasks.addAll(driver.getPlan().getRootTasks());
- inputs.addAll(driver.getPlan().getInputs());
+ compileRebuild(sb.toString(), idx, mmWriteId);
}
}
private void doIndexUpdate(List<Index> tblIndexes, Map<String, String>
- partSpec) throws HiveException {
+ partSpec, Long mmWriteId) throws HiveException {
for (Index index : tblIndexes) {
if (containsPartition(index, partSpec)) {
- doIndexUpdate(index, partSpec);
+ doIndexUpdate(index, partSpec, mmWriteId);
}
}
}
- private void doIndexUpdate(Index index, Map<String, String> partSpec) throws
- HiveException {
+ private void doIndexUpdate(Index index, Map<String, String> partSpec, Long mmWriteId)
+ throws HiveException {
StringBuilder ps = new StringBuilder();
boolean first = true;
ps.append("(");
@@ -133,14 +134,25 @@ public class IndexUpdater {
sb.append(" PARTITION ");
sb.append(ps.toString());
sb.append(" REBUILD");
+ compileRebuild(sb.toString(), index, mmWriteId);
+ }
+
+ private void compileRebuild(String query, Index index, Long mmWriteId)
+ throws HiveException {
Driver driver = new Driver(this.conf);
- driver.compile(sb.toString(), false);
+ driver.compile(query, false);
+ if (mmWriteId != null) {
+ // TODO: this is rather fragile
+ ValidWriteIds.addCurrentToConf(
+ parentConf, index.getDbName(), index.getOrigTableName(), mmWriteId);
+ }
tasks.addAll(driver.getPlan().getRootTasks());
inputs.addAll(driver.getPlan().getInputs());
}
- private boolean containsPartition(Index index, Map<String, String> partSpec)
- throws HiveException {
+
+ private boolean containsPartition(Index index,
+ Map<String, String> partSpec) throws HiveException {
String[] qualified = Utilities.getDbTableName(index.getDbName(), index.getIndexTableName());
Table indexTable = hive.getTable(qualified[0], qualified[1]);
List<Partition> parts = hive.getPartitions(indexTable, partSpec);
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
index c13a404..41f471d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.lib.Node;
@@ -179,7 +180,8 @@ public class ProcessAnalyzeTable implements NodeProcessor {
aggregationKey = aggregationKeyBuffer.toString();
// scan work
- PartialScanWork scanWork = new PartialScanWork(inputPaths);
+ PartialScanWork scanWork = new PartialScanWork(inputPaths,
+ Utilities.getTableDesc(tableScan.getConf().getTableMetadata()));
scanWork.setMapperCannotSpanPartns(true);
scanWork.setAggKey(aggregationKey);
scanWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir(), parseContext.getConf());
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index dbeb8c6..0a196c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6698,6 +6698,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
List<FieldSchema> field_schemas = null;
CreateTableDesc tblDesc = qb.getTableDesc();
CreateViewDesc viewDesc = qb.getViewDesc();
+ boolean isCtas = false;
if (tblDesc != null) {
field_schemas = new ArrayList<FieldSchema>();
destTableIsTemporary = tblDesc.isTemporary();
@@ -6838,7 +6839,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
FileSinkDesc fileSinkDesc = createFileSinkDesc(table_desc, dest_part,
dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
- canBeMerged, mmWriteId);
+ canBeMerged, mmWriteId, isMmCtas);
if (isMmCtas) {
// Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
tableDesc.setWriter(fileSinkDesc);
@@ -6943,20 +6944,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
boolean destTableIsAcid, boolean destTableIsTemporary,
boolean destTableIsMaterialization, Path queryTmpdir,
SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx,
- RowSchema fsRS, boolean canBeMerged, Long mmWriteId) throws SemanticException {
- FileSinkDesc fileSinkDesc = new FileSinkDesc(
- queryTmpdir,
- table_desc,
- conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT),
- currentTableId,
- rsCtx.isMultiFileSpray(),
- canBeMerged,
- rsCtx.getNumFiles(),
- rsCtx.getTotalFiles(),
- rsCtx.getPartnCols(),
- dpCtx,
- dest_path,
- mmWriteId);
+ RowSchema fsRS, boolean canBeMerged, Long mmWriteId, boolean isMmCtas) throws SemanticException {
+ FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc,
+ conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(),
+ canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx,
+ dest_path, mmWriteId, isMmCtas);
fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
// If this is an insert, update, or delete on an ACID table then mark that so the
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
index 52186b4..b48735a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.lib.Node;
@@ -174,7 +175,8 @@ public class SparkProcessAnalyzeTable implements NodeProcessor {
aggregationKey = aggregationKeyBuffer.toString();
// scan work
- PartialScanWork scanWork = new PartialScanWork(inputPaths);
+ PartialScanWork scanWork = new PartialScanWork(inputPaths,
+ Utilities.getTableDesc(tableScan.getConf().getTableMetadata()));
scanWork.setMapperCannotSpanPartns(true);
scanWork.setAggKey(aggregationKey);
scanWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir(), parseContext.getConf());
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 1f84531..504a6ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -98,6 +98,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
private boolean isHiveServerQuery;
private Long mmWriteId;
private boolean isMerge;
+ private boolean isMmCtas;
public FileSinkDesc() {
}
@@ -109,7 +110,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
final boolean compressed, final int destTableId, final boolean multiFileSpray,
final boolean canBeMerged, final int numFiles, final int totalFiles,
final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath,
- Long mmWriteId) {
+ Long mmWriteId, boolean isMmCtas) {
this.dirName = dirName;
this.tableInfo = tableInfo;
@@ -124,6 +125,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
this.dpSortState = DPSortState.NONE;
this.destPath = destPath;
this.mmWriteId = mmWriteId;
+ this.isMmCtas = isMmCtas;
}
public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -145,7 +147,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
public Object clone() throws CloneNotSupportedException {
FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
- partitionCols, dpCtx, destPath, mmWriteId);
+ partitionCols, dpCtx, destPath, mmWriteId, isMmCtas);
ret.setCompressCodec(compressCodec);
ret.setCompressType(compressType);
ret.setGatherStats(gatherStats);
@@ -500,4 +502,8 @@ public class FileSinkDesc extends AbstractOperatorDesc {
public boolean isMerge() {
return isMerge;
}
+
+ public boolean isMmCtas() {
+ return isMmCtas;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index 921461f..ee112d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -79,6 +79,8 @@ public class PartitionDesc implements Serializable, Cloneable {
public PartitionDesc() {
}
+ private final static org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(PartitionDesc.class);
+
public PartitionDesc(final TableDesc table, final LinkedHashMap<String, String> partSpec) {
this.tableDesc = table;
this.partSpec = partSpec;
@@ -325,4 +327,13 @@ public class PartitionDesc implements Serializable, Cloneable {
public VectorPartitionDesc getVectorPartitionDesc() {
return vectorPartitionDesc;
}
+
+ @Override
+ public String toString() {
+ return "PartitionDesc [tableDesc=" + tableDesc + ", partSpec=" + partSpec
+ + ", inputFileFormatClass=" + inputFileFormatClass
+ + ", outputFileFormatClass=" + outputFileFormatClass + ", properties="
+ + properties + ", baseFileName=" + baseFileName
+ + ", vectorPartitionDesc=" + vectorPartitionDesc + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index 0a611f9..977b39f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -214,4 +214,11 @@ public class TableDesc implements Serializable, Cloneable {
jobProperties.equals(target.jobProperties));
return ret;
}
+
+ @Override
+ public String toString() {
+ return "TableDesc [inputFileFormatClass=" + inputFileFormatClass
+ + ", outputFileFormatClass=" + outputFileFormatClass + ", properties="
+ + properties + ", jobProperties=" + jobProperties + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 909114c..4938e2f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -285,7 +285,8 @@ public class TestFileSinkOperator {
partColMap.put(PARTCOL_NAME, null);
DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
//todo: does this need the finalDestination?
- desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, null);
+ desc = new FileSinkDesc(basePath, tableDesc, false, 1, false,
+ false, 1, 1, partCols, dpCtx, null, null, false);
} else {
desc = new FileSinkDesc(basePath, tableDesc, false);
}