You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/09/16 21:18:29 UTC
[11/50] [abbrv] hive git commit: HIVE-11645 : Add in-place updates
for dynamic partitions loading (Ashutosh Chauhan via Prasanth J)
HIVE-11645 : Add in-place updates for dynamic partitions loading (Ashutosh Chauhan via Prasanth J)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f4361bf3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f4361bf3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f4361bf3
Branch: refs/heads/spark
Commit: f4361bf30689c4767e966e11c610f7ead632415a
Parents: 9fe8802
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Thu Sep 10 14:52:43 2015 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Sep 10 14:52:43 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/MoveTask.java | 12 ++--
.../apache/hadoop/hive/ql/exec/StatsTask.java | 13 +++-
.../hadoop/hive/ql/exec/tez/InPlaceUpdates.java | 65 +++++++++++++++++++
.../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 66 ++------------------
.../apache/hadoop/hive/ql/metadata/Hive.java | 25 ++++++--
5 files changed, 106 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 50c4a96..a1f8973 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -361,7 +361,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
if (dps != null && dps.size() > 0) {
pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
}
-
+ console.printInfo(System.getProperty("line.separator"));
long startTime = System.currentTimeMillis();
// load the list of DP partitions and return the list of partition specs
// TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
@@ -381,8 +381,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
isSkewedStoredAsDirs(tbd),
work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
SessionState.get().getTxnMgr().getCurrentTxnId());
- console.printInfo("\t Time taken for load dynamic partitions : " +
- (System.currentTimeMillis() - startTime));
+
+ console.printInfo("\t Time taken to load dynamic partitions: " +
+ (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
throw new HiveException("This query creates no partitions." +
@@ -425,11 +426,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
table.getCols());
}
-
- console.printInfo("\tLoading partition " + entry.getKey());
+ LOG.info("\tLoading partition " + entry.getKey());
}
console.printInfo("\t Time taken for adding to write entity : " +
- (System.currentTimeMillis() - startTime));
+ (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 2a8167a..41ece04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -182,8 +183,10 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
db.alterTable(tableFullName, new Table(tTable));
-
- console.printInfo("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
+ if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+ console.printInfo("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
+ }
+ LOG.info("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
} else {
// Partitioned table:
// Need to get the old stats of the partition
@@ -215,7 +218,11 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
updates.add(new Partition(table, tPart));
- console.printInfo("Partition " + tableFullName + partn.getSpec() +
+ if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+ console.printInfo("Partition " + tableFullName + partn.getSpec() +
+ " stats: [" + toString(parameters) + ']');
+ }
+ LOG.info("Partition " + tableFullName + partn.getSpec() +
" stats: [" + toString(parameters) + ']');
}
if (!updates.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java
new file mode 100644
index 0000000..6ecfe71
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.fusesource.jansi.Ansi.ansi;
+import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
+import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
+import static org.fusesource.jansi.internal.CLibrary.isatty;
+
+import java.io.PrintStream;
+
+import jline.TerminalFactory;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.fusesource.jansi.Ansi;
+
+public class InPlaceUpdates {
+
+ private static final int MIN_TERMINAL_WIDTH = 80;
+
+ static boolean isUnixTerminal() {
+
+ String os = System.getProperty("os.name");
+ if (os.startsWith("Windows")) {
+ // we do not support Windows, we will revisit this if we really need it for windows.
+ return false;
+ }
+
+ // We must be on some unix variant..
+ // check if standard out is a terminal
+ try {
+ // isatty system call will return 1 if the file descriptor is terminal else 0
+ if (isatty(STDOUT_FILENO) == 0) {
+ return false;
+ }
+ if (isatty(STDERR_FILENO) == 0) {
+ return false;
+ }
+ } catch (NoClassDefFoundError ignore) {
+ // These errors happen if the JNI lib is not available for your platform.
+ return false;
+ } catch (UnsatisfiedLinkError ignore) {
+ // These errors happen if the JNI lib is not available for your platform.
+ return false;
+ }
+ return true;
+ }
+
+ public static boolean inPlaceEligible(HiveConf conf) {
+ boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
+
+ // we need at least 80 chars wide terminal to display in-place updates properly
+ return inPlaceUpdates && !SessionState.getConsole().getIsSilent() && isUnixTerminal()
+ && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
+ }
+
+ public static void reprintLine(PrintStream out, String line) {
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+ out.flush();
+ }
+
+ public static void rePositionCursor(PrintStream ps) {
+ ps.print(ansi().cursorUp(0).toString());
+ ps.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 1a4decf..1e1603b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -20,9 +20,6 @@ package org.apache.hadoop.hive.ql.exec.tez;
import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
import static org.fusesource.jansi.Ansi.ansi;
-import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.isatty;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -74,7 +71,7 @@ import jline.TerminalFactory;
public class TezJobMonitor {
private static final String CLASS_NAME = TezJobMonitor.class.getName();
- private static final int MIN_TERMINAL_WIDTH = 80;
+
private static final int COLUMN_1_WIDTH = 16;
private static final int SEPARATOR_WIDTH = 80;
@@ -156,42 +153,13 @@ public class TezJobMonitor {
}
}
- private static boolean isUnixTerminal() {
-
- String os = System.getProperty("os.name");
- if (os.startsWith("Windows")) {
- // we do not support Windows, we will revisit this if we really need it for windows.
- return false;
- }
-
- // We must be on some unix variant..
- // check if standard out is a terminal
- try {
- // isatty system call will return 1 if the file descriptor is terminal else 0
- if (isatty(STDOUT_FILENO) == 0) {
- return false;
- }
- if (isatty(STDERR_FILENO) == 0) {
- return false;
- }
- } catch (NoClassDefFoundError ignore) {
- // These errors happen if the JNI lib is not available for your platform.
- return false;
- } catch (UnsatisfiedLinkError ignore) {
- // These errors happen if the JNI lib is not available for your platform.
- return false;
- }
- return true;
- }
-
/**
* NOTE: Use this method only if isUnixTerminal is true.
* Erases the current line and prints the given line.
* @param line - line to print
*/
public void reprintLine(String line) {
- out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
- out.flush();
+ InPlaceUpdates.reprintLine(out, line);
lines++;
}
@@ -234,15 +202,6 @@ public class TezJobMonitor {
}
/**
- * NOTE: Use this method only if isUnixTerminal is true.
- * Gets the width of the terminal
- * @return - width of terminal
- */
- public int getTerminalWidth() {
- return TerminalFactory.get().getWidth();
- }
-
- /**
* monitorExecution handles status printing, failures during execution and final status retrieval.
*
* @param dagClient client that was used to kick off the job
@@ -265,26 +224,11 @@ public class TezJobMonitor {
Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
Heartbeater heartbeater = new Heartbeater(txnMgr, conf);
long startTime = 0;
- boolean isProfileEnabled = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
+ boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
Utilities.isPerfOrAboveLogging(conf);
- boolean inPlaceUpdates = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
- boolean wideTerminal = false;
- boolean isTerminal = inPlaceUpdates == true ? isUnixTerminal() : false;
-
- // we need at least 80 chars wide terminal to display in-place updates properly
- if (isTerminal) {
- if (getTerminalWidth() >= MIN_TERMINAL_WIDTH) {
- wideTerminal = true;
- }
- }
-
- boolean inPlaceEligible = false;
- if (inPlaceUpdates && isTerminal && wideTerminal && !console.getIsSilent()) {
- inPlaceEligible = true;
- }
+ boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf);
shutdownList.add(dagClient);
-
console.printInfo("\n");
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
@@ -470,7 +414,7 @@ public class TezJobMonitor {
DAGClient dagClient, HiveConf conf, DAG dag) {
/* Strings for headers and counters */
- String hiveCountersGroup = conf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+ String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
TezCounters hiveCounters = null;
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index c449aee..c78e8f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.metadata;
import com.google.common.collect.Sets;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -81,6 +82,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.FunctionTask;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
@@ -101,6 +103,7 @@ import org.apache.thrift.TException;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -124,7 +127,6 @@ import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
-
/**
* This class has functions that implement meta data/DDL operations using calls
* to the metastore.
@@ -1606,22 +1608,31 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
- if (validPartitions.size() == 0) {
+ int partsToLoad = validPartitions.size();
+ if (partsToLoad == 0) {
LOG.warn("No partition is generated by dynamic partitioning");
}
- if (validPartitions.size() > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) {
- throw new HiveException("Number of dynamic partitions created is " + validPartitions.size()
+ if (partsToLoad > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) {
+ throw new HiveException("Number of dynamic partitions created is " + partsToLoad
+ ", which is more than "
+ conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)
+". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
- + " to at least " + validPartitions.size() + '.');
+ + " to at least " + partsToLoad + '.');
}
Table tbl = getTable(tableName);
// for each dynamically created DP directory, construct a full partition spec
// and load the partition based on that
Iterator<Path> iter = validPartitions.iterator();
+ LOG.info("Going to load " + partsToLoad + " partitions.");
+ PrintStream ps = null;
+ boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
+ && InPlaceUpdates.inPlaceEligible(conf);
+ if(inPlaceEligible) {
+ ps = SessionState.getConsole().getInfoStream();
+ }
+ int partitionsLoaded = 0;
while (iter.hasNext()) {
// get the dynamically created directory
Path partPath = iter.next();
@@ -1634,6 +1645,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace,
holdDDLTime, true, listBucketingEnabled, false, isAcid);
partitionsMap.put(fullPartSpec, newPartition);
+ if (inPlaceEligible) {
+ InPlaceUpdates.rePositionCursor(ps);
+ InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + "/" + partsToLoad +" partitions.");
+ }
LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
}
if (isAcid) {