You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/01/27 18:00:22 UTC
hive git commit: HIVE-12897 : Improve dynamic partition loading
(Ashutosh Chauhan via Prasanth J)
Repository: hive
Updated Branches:
refs/heads/master ef2db2122 -> fe81a3760
HIVE-12897 : Improve dynamic partition loading (Ashutosh Chauhan via Prasanth J)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fe81a376
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fe81a376
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fe81a376
Branch: refs/heads/master
Commit: fe81a3760a87f3d8e9aa32dd51cfbb948e4f793a
Parents: ef2db21
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Thu Jan 21 14:07:19 2016 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed Jan 27 08:58:34 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 17 +++++++++--------
.../hadoop/hive/metastore/MetaStoreUtils.java | 2 +-
.../hadoop/hive/metastore/ObjectStore.java | 7 +++++--
.../hadoop/hive/ql/exec/FileSinkOperator.java | 8 ++++++++
.../apache/hadoop/hive/ql/metadata/Hive.java | 14 ++++----------
.../index/RewriteParseContextGenerator.java | 4 ++--
.../parse/ExplainSQRewriteSemanticAnalyzer.java | 2 --
.../hive/ql/parse/ExplainSemanticAnalyzer.java | 2 +-
.../hive/ql/plan/DynamicPartitionCtx.java | 20 +++++++++++++++++++-
.../dynamic_partitions_with_whitelist.q.out | 4 ++--
10 files changed, 51 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 97fe7bc..74a8749 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -210,7 +210,8 @@ public class HiveConf extends Configuration {
public static final HiveConf.ConfVars[] metaConfVars = {
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL,
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL_DDL,
- HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT
+ HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT,
+ HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN
};
static {
@@ -1574,32 +1575,32 @@ public class HiveConf extends Configuration {
HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000,
"Number of aborted transactions involving a given table or partition that will trigger\n" +
"a major compaction."),
-
+
COMPACTOR_INITIATOR_FAILED_THRESHOLD("hive.compactor.initiator.failed.compacts.threshold", 2,
new RangeValidator(1, 20), "Number of consecutive compaction failures (per table/partition) " +
"after which automatic compactions will not be scheduled any more. Note that this must be less " +
"than hive.compactor.history.retention.failed."),
-
+
HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
"Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."),
-
+
COMPACTOR_HISTORY_RETENTION_SUCCEEDED("hive.compactor.history.retention.succeeded", 3,
new RangeValidator(0, 100), "Determines how many successful compaction records will be " +
"retained in compaction history for a given table/partition."),
-
+
COMPACTOR_HISTORY_RETENTION_FAILED("hive.compactor.history.retention.failed", 3,
new RangeValidator(0, 100), "Determines how many failed compaction records will be " +
"retained in compaction history for a given table/partition."),
-
+
COMPACTOR_HISTORY_RETENTION_ATTEMPTED("hive.compactor.history.retention.attempted", 2,
new RangeValidator(0, 100), "Determines how many attempted compaction records will be " +
"retained in compaction history for a given table/partition."),
-
+
COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m",
new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"),
-
+
HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s",
new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index eee7f1b..c8859f3 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -1618,7 +1618,7 @@ public class MetaStoreUtils {
}
- private static String getPartitionValWithInvalidCharacter(List<String> partVals,
+ public static String getPartitionValWithInvalidCharacter(List<String> partVals,
Pattern partitionValidationPattern) {
if (partitionValidationPattern == null) {
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index e044c73..b808728 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -290,7 +290,7 @@ public class ObjectStore implements RawStore, Configurable {
String partitionValidationRegex =
hiveConf.get(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.name());
- if (partitionValidationRegex != null && partitionValidationRegex.equals("")) {
+ if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) {
partitionValidationPattern = Pattern.compile(partitionValidationRegex);
} else {
partitionValidationPattern = null;
@@ -759,7 +759,7 @@ public class ObjectStore implements RawStore, Configurable {
String queryStr = "select name from org.apache.hadoop.hive.metastore.model.MDatabase";
Query query = null;
-
+
openTransaction();
try {
query = pm.newQuery(queryStr);
@@ -1054,14 +1054,17 @@ public class ObjectStore implements RawStore, Configurable {
return tbls;
}
+ @Override
public int getDatabaseCount() throws MetaException {
return getObjectCount("name", MDatabase.class.getName());
}
+ @Override
public int getPartitionCount() throws MetaException {
return getObjectCount("partitionName", MPartition.class.getName());
}
+ @Override
public int getTableCount() throws MetaException {
return getObjectCount("tableName", MTable.class.getName());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 3289cfc..14121b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -696,6 +697,13 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
}
+ String invalidPartitionVal;
+ if((invalidPartitionVal = MetaStoreUtils.getPartitionValWithInvalidCharacter(dpVals, dpCtx.getWhiteListPattern()))!=null) {
+ throw new HiveFatalException("Partition value '" + invalidPartitionVal +
+ "' contains a character not matched by whitelist pattern '" +
+ dpCtx.getWhiteListPattern().toString() + "'. " + "(configure with " +
+ HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname + ")");
+ }
fpaths = getDynOutPaths(dpVals, lbDirName);
// use SubStructObjectInspector to serialize the non-partitioning columns in the input row
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index efb50b2..50681c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1427,7 +1427,7 @@ public class Hive {
* @param isSrcLocal
* If the source directory is LOCAL
* @param isAcid true if this is an ACID operation
- * @throws JSONException
+ * @throws JSONException
*/
public Partition loadPartition(Path loadPath, Table tbl,
Map<String, String> partSpec, boolean replace,
@@ -1622,7 +1622,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
* @param txnId txnId, can be 0 unless isAcid == true
* @return partition map details (PartitionSpec and Partition)
* @throws HiveException
- * @throws JSONException
+ * @throws JSONException
*/
public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath,
String tableName, Map<String, String> partSpec, boolean replace,
@@ -1635,16 +1635,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
LinkedHashMap<Map<String, String>, Partition>();
FileSystem fs = loadPath.getFileSystem(conf);
- FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs);
+ FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs);
// Check for empty partitions
for (FileStatus s : leafStatus) {
- try {
- validatePartitionNameCharacters(
- Warehouse.getPartValuesFromPartName(s.getPath().getParent().toString()));
- } catch (MetaException e) {
- throw new HiveException(e);
- }
- validPartitions.add(s.getPath().getParent());
+ validPartitions.add(s.getPath());
}
int partsToLoad = validPartitions.size();
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java
index 48105de..64f9734 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java
@@ -102,9 +102,9 @@ public final class RewriteParseContextGenerator {
ASTNode ast, Context ctx) throws SemanticException {
QB qb = new QB(null, null, false);
ASTNode child = ast;
- ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext();
+ ParseContext subPCtx = sem.getParseContext();
subPCtx.setContext(ctx);
- ((SemanticAnalyzer) sem).initParseCtx(subPCtx);
+ sem.initParseCtx(subPCtx);
LOG.info("Starting Sub-query Semantic Analysis");
sem.doPhase1(child, qb, sem.initPhase1Ctx(), null);
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
index 2c2339a..6f0f3a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
@@ -17,13 +17,11 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.ExplainSQRewriteTask;
-import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork;
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
index c1e9ec1..e1e3eb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
@@ -79,7 +79,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
if (tasks == null) {
tasks = Collections.emptyList();
}
-
+
FetchTask fetchTask = sem.getFetchTask();
if (fetchTask != null) {
// Initialize fetch work such that operator tree will be constructed.
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 95d5635..e6ec3ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -21,10 +21,15 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
public class DynamicPartitionCtx implements Serializable {
@@ -44,12 +49,13 @@ public class DynamicPartitionCtx implements Serializable {
private List<String> dpNames; // dp column names
private String defaultPartName; // default partition name in case of null or empty value
private int maxPartsPerNode; // maximum dynamic partitions created per mapper/reducer
+ private Pattern whiteListPattern;
public DynamicPartitionCtx() {
}
public DynamicPartitionCtx(Table tbl, Map<String, String> partSpec, String defaultPartName,
- int maxParts) {
+ int maxParts) throws SemanticException {
this.partSpec = partSpec;
this.spNames = new ArrayList<String>();
this.dpNames = new ArrayList<String>();
@@ -71,6 +77,13 @@ public class DynamicPartitionCtx implements Serializable {
} else {
this.spPath = null;
}
+ String confVal;
+ try {
+ confVal = Hive.get().getMetaConf(ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname);
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+ this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal);
}
public DynamicPartitionCtx(DynamicPartitionCtx dp) {
@@ -84,6 +97,11 @@ public class DynamicPartitionCtx implements Serializable {
this.dpNames = dp.dpNames;
this.defaultPartName = dp.defaultPartName;
this.maxPartsPerNode = dp.maxPartsPerNode;
+ this.whiteListPattern = dp.whiteListPattern;
+ }
+
+ public Pattern getWhiteListPattern() {
+ return whiteListPattern;
}
public int getMaxPartitionsPerNode() {
http://git-wip-us.apache.org/repos/asf/hive/blob/fe81a376/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out b/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out
index f069ae8..654d892 100644
--- a/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out
+++ b/ql/src/test/results/clientnegative/dynamic_partitions_with_whitelist.q.out
@@ -32,5 +32,5 @@ PREHOOK: type: QUERY
PREHOOK: Input: default@source_table
PREHOOK: Input: default@source_table@ds=2008-04-08/hr=11
PREHOOK: Output: default@dest_table
-Failed with exception MetaException(message:Partition value 'val_129' contains a character not matched by whitelist pattern '[^9]*'. (configure with hive.metastore.partition.name.whitelist.pattern))
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask