You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/09/14 10:22:33 UTC

[10/24] hive git commit: HIVE-11645 : Add in-place updates for dynamic partitions loading (Ashutosh Chauhan via Prasanth J)

HIVE-11645 : Add in-place updates for dynamic partitions loading (Ashutosh Chauhan via Prasanth J)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f4361bf3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f4361bf3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f4361bf3

Branch: refs/heads/llap
Commit: f4361bf30689c4767e966e11c610f7ead632415a
Parents: 9fe8802
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Thu Sep 10 14:52:43 2015 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Sep 10 14:52:43 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/MoveTask.java    | 12 ++--
 .../apache/hadoop/hive/ql/exec/StatsTask.java   | 13 +++-
 .../hadoop/hive/ql/exec/tez/InPlaceUpdates.java | 65 +++++++++++++++++++
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  | 66 ++------------------
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 25 ++++++--
 5 files changed, 106 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 50c4a96..a1f8973 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -361,7 +361,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
             if (dps != null && dps.size() > 0) {
               pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
             }
-
+            console.printInfo(System.getProperty("line.separator"));
             long startTime = System.currentTimeMillis();
             // load the list of DP partitions and return the list of partition specs
             // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
@@ -381,8 +381,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
                 isSkewedStoredAsDirs(tbd),
                 work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
                 SessionState.get().getTxnMgr().getCurrentTxnId());
-            console.printInfo("\t Time taken for load dynamic partitions : "  +
-                (System.currentTimeMillis() - startTime));
+
+            console.printInfo("\t Time taken to load dynamic partitions: "  +
+                (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
 
             if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
               throw new HiveException("This query creates no partitions." +
@@ -425,11 +426,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
                 SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
                     table.getCols());
               }
-
-              console.printInfo("\tLoading partition " + entry.getKey());
+              LOG.info("\tLoading partition " + entry.getKey());
             }
             console.printInfo("\t Time taken for adding to write entity : " +
-                (System.currentTimeMillis() - startTime));
+                (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
             dc = null; // reset data container to prevent it being added again.
           } else { // static partitions
             List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),

http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 2a8167a..41ece04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -182,8 +183,10 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
         parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
 
         db.alterTable(tableFullName, new Table(tTable));
-
-        console.printInfo("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
+        if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+          console.printInfo("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
+        }
+        LOG.info("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
       } else {
         // Partitioned table:
         // Need to get the old stats of the partition
@@ -215,7 +218,11 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
           parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
           updates.add(new Partition(table, tPart));
 
-          console.printInfo("Partition " + tableFullName + partn.getSpec() +
+          if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+            console.printInfo("Partition " + tableFullName + partn.getSpec() +
+            " stats: [" + toString(parameters) + ']');
+          }
+          LOG.info("Partition " + tableFullName + partn.getSpec() +
               " stats: [" + toString(parameters) + ']');
         }
         if (!updates.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java
new file mode 100644
index 0000000..6ecfe71
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.fusesource.jansi.Ansi.ansi;
+import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
+import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
+import static org.fusesource.jansi.internal.CLibrary.isatty;
+
+import java.io.PrintStream;
+
+import jline.TerminalFactory;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.fusesource.jansi.Ansi;
+
+public class InPlaceUpdates {
+
+  private static final int MIN_TERMINAL_WIDTH = 80;
+
+  static boolean isUnixTerminal() {
+
+    String os = System.getProperty("os.name");
+    if (os.startsWith("Windows")) {
+      // we do not support Windows, we will revisit this if we really need it for windows.
+      return false;
+    }
+
+    // We must be on some unix variant..
+    // check if standard out is a terminal
+    try {
+      // isatty system call will return 1 if the file descriptor is terminal else 0
+      if (isatty(STDOUT_FILENO) == 0) {
+        return false;
+      }
+      if (isatty(STDERR_FILENO) == 0) {
+        return false;
+      }
+    } catch (NoClassDefFoundError ignore) {
+      // These errors happen if the JNI lib is not available for your platform.
+      return false;
+    } catch (UnsatisfiedLinkError ignore) {
+      // These errors happen if the JNI lib is not available for your platform.
+      return false;
+    }
+    return true;
+  }
+
+  public static boolean inPlaceEligible(HiveConf conf) {
+    boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
+
+    // we need at least 80 chars wide terminal to display in-place updates properly
+    return inPlaceUpdates && !SessionState.getConsole().getIsSilent() && isUnixTerminal()
+      && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
+  }
+
+  public static void reprintLine(PrintStream out, String line) {
+    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+    out.flush();
+  }
+
+  public static void rePositionCursor(PrintStream ps) {
+    ps.print(ansi().cursorUp(0).toString());
+    ps.flush();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 1a4decf..1e1603b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -20,9 +20,6 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
 import static org.fusesource.jansi.Ansi.ansi;
-import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.isatty;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -74,7 +71,7 @@ import jline.TerminalFactory;
 public class TezJobMonitor {
 
   private static final String CLASS_NAME = TezJobMonitor.class.getName();
-  private static final int MIN_TERMINAL_WIDTH = 80;
+
   private static final int COLUMN_1_WIDTH = 16;
   private static final int SEPARATOR_WIDTH = 80;
 
@@ -156,42 +153,13 @@ public class TezJobMonitor {
     }
   }
 
-  private static boolean isUnixTerminal() {
-
-    String os = System.getProperty("os.name");
-    if (os.startsWith("Windows")) {
-      // we do not support Windows, we will revisit this if we really need it for windows.
-      return false;
-    }
-
-    // We must be on some unix variant..
-    // check if standard out is a terminal
-    try {
-      // isatty system call will return 1 if the file descriptor is terminal else 0
-      if (isatty(STDOUT_FILENO) == 0) {
-        return false;
-      }
-      if (isatty(STDERR_FILENO) == 0) {
-        return false;
-      }
-    } catch (NoClassDefFoundError ignore) {
-      // These errors happen if the JNI lib is not available for your platform.
-      return false;
-    } catch (UnsatisfiedLinkError ignore) {
-      // These errors happen if the JNI lib is not available for your platform.
-      return false;
-    }
-    return true;
-  }
-
   /**
    * NOTE: Use this method only if isUnixTerminal is true.
    * Erases the current line and prints the given line.
    * @param line - line to print
    */
   public void reprintLine(String line) {
-    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
-    out.flush();
+    InPlaceUpdates.reprintLine(out, line);
     lines++;
   }
 
@@ -234,15 +202,6 @@ public class TezJobMonitor {
   }
 
   /**
-   * NOTE: Use this method only if isUnixTerminal is true.
-   * Gets the width of the terminal
-   * @return - width of terminal
-   */
-  public int getTerminalWidth() {
-    return TerminalFactory.get().getWidth();
-  }
-
-  /**
    * monitorExecution handles status printing, failures during execution and final status retrieval.
    *
    * @param dagClient client that was used to kick off the job
@@ -265,26 +224,11 @@ public class TezJobMonitor {
     Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
     Heartbeater heartbeater = new Heartbeater(txnMgr, conf);
     long startTime = 0;
-    boolean isProfileEnabled = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
+    boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
       Utilities.isPerfOrAboveLogging(conf);
-    boolean inPlaceUpdates = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
-    boolean wideTerminal = false;
-    boolean isTerminal = inPlaceUpdates == true ? isUnixTerminal() : false;
-
-    // we need at least 80 chars wide terminal to display in-place updates properly
-    if (isTerminal) {
-      if (getTerminalWidth() >= MIN_TERMINAL_WIDTH) {
-        wideTerminal = true;
-      }
-    }
-
-    boolean inPlaceEligible = false;
-    if (inPlaceUpdates && isTerminal && wideTerminal && !console.getIsSilent()) {
-      inPlaceEligible = true;
-    }
 
+    boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf);
     shutdownList.add(dagClient);
-
     console.printInfo("\n");
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
@@ -470,7 +414,7 @@ public class TezJobMonitor {
       DAGClient dagClient, HiveConf conf, DAG dag) {
 
     /* Strings for headers and counters */
-    String hiveCountersGroup = conf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+    String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
     Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
     TezCounters hiveCounters = null;
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/f4361bf3/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index c449aee..c78e8f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.metadata;
 
 import com.google.common.collect.Sets;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -81,6 +82,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.FunctionTask;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
@@ -101,6 +103,7 @@ import org.apache.thrift.TException;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -124,7 +127,6 @@ import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
 import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
 import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
 
-
 /**
  * This class has functions that implement meta data/DDL operations using calls
  * to the metastore.
@@ -1606,22 +1608,31 @@ private void constructOneLBLocationMap(FileStatus fSta,
         }
       }
 
-      if (validPartitions.size() == 0) {
+      int partsToLoad = validPartitions.size();
+      if (partsToLoad == 0) {
         LOG.warn("No partition is generated by dynamic partitioning");
       }
 
-      if (validPartitions.size() > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) {
-        throw new HiveException("Number of dynamic partitions created is " + validPartitions.size()
+      if (partsToLoad > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) {
+        throw new HiveException("Number of dynamic partitions created is " + partsToLoad
             + ", which is more than "
             + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)
             +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
-            + " to at least " + validPartitions.size() + '.');
+            + " to at least " + partsToLoad + '.');
       }
 
       Table tbl = getTable(tableName);
       // for each dynamically created DP directory, construct a full partition spec
       // and load the partition based on that
       Iterator<Path> iter = validPartitions.iterator();
+      LOG.info("Going to load " + partsToLoad + " partitions.");
+      PrintStream ps = null;
+      boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
+          && InPlaceUpdates.inPlaceEligible(conf);
+      if(inPlaceEligible) {
+        ps = SessionState.getConsole().getInfoStream();
+      }
+      int partitionsLoaded = 0;
       while (iter.hasNext()) {
         // get the dynamically created directory
         Path partPath = iter.next();
@@ -1634,6 +1645,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
         Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace,
             holdDDLTime, true, listBucketingEnabled, false, isAcid);
         partitionsMap.put(fullPartSpec, newPartition);
+        if (inPlaceEligible) {
+          InPlaceUpdates.rePositionCursor(ps);
+          InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + "/" + partsToLoad +" partitions.");
+        }
         LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
       }
       if (isAcid) {