You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2016/05/24 04:11:42 UTC

[7/7] hive git commit: HIVE-13566: Auto-gather column stats - phase 1 (Pengcheng Xiong, reviewed by Ashutosh Chauhan)

HIVE-13566: Auto-gather column stats - phase 1 (Pengcheng Xiong, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec4b936e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec4b936e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec4b936e

Branch: refs/heads/master
Commit: ec4b936e66db559cd7226f66d416dad02864530f
Parents: 2ed4783
Author: Pengcheng Xiong <px...@apache.org>
Authored: Mon May 23 20:22:33 2016 -0700
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Mon May 23 20:22:33 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hive/beeline/BeeLine.java   |   14 +-
 .../java/org/apache/hive/beeline/Commands.java  |   72 +-
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    4 +-
 .../hive/beeline/TestBeeLineWithArgs.java       |   24 +-
 jdbc/src/java/org/apache/hive/jdbc/Utils.java   |   54 +-
 .../apache/hadoop/hive/ql/exec/Operator.java    |    6 +-
 .../hadoop/hive/ql/exec/TextRecordReader.java   |    4 +-
 .../hadoop/hive/ql/exec/TextRecordWriter.java   |    4 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   |   24 -
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   |    3 -
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   10 -
 .../physical/GenMRSkewJoinProcessor.java        |   17 +-
 .../physical/GenSparkSkewJoinProcessor.java     |   18 +-
 .../physical/SparkMapJoinResolver.java          |    4 +-
 .../ql/parse/ColumnStatsAutoGatherContext.java  |  291 ++
 .../ql/parse/ColumnStatsSemanticAnalyzer.java   |   71 +-
 .../hadoop/hive/ql/parse/ParseContext.java      |   12 +
 .../hadoop/hive/ql/parse/QBParseInfo.java       |   20 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   71 +-
 .../hadoop/hive/ql/parse/TaskCompiler.java      |   63 +-
 .../ql/plan/ConditionalResolverSkewJoin.java    |   10 +-
 .../queries/clientpositive/autoColumnStats_1.q  |  192 ++
 .../queries/clientpositive/autoColumnStats_2.q  |  214 ++
 .../queries/clientpositive/autoColumnStats_3.q  |   67 +
 .../queries/clientpositive/autoColumnStats_4.q  |   20 +
 .../queries/clientpositive/autoColumnStats_5.q  |   47 +
 .../queries/clientpositive/autoColumnStats_6.q  |   41 +
 .../queries/clientpositive/autoColumnStats_7.q  |   19 +
 .../queries/clientpositive/autoColumnStats_8.q  |   27 +
 .../queries/clientpositive/autoColumnStats_9.q  |   22 +
 .../clientpositive/autoColumnStats_1.q.out      | 1379 +++++++++
 .../clientpositive/autoColumnStats_2.q.out      | 1500 ++++++++++
 .../clientpositive/autoColumnStats_3.q.out      |  420 +++
 .../clientpositive/autoColumnStats_4.q.out      |  260 ++
 .../clientpositive/autoColumnStats_5.q.out      |  664 +++++
 .../clientpositive/autoColumnStats_6.q.out      |  299 ++
 .../clientpositive/autoColumnStats_7.q.out      |  216 ++
 .../clientpositive/autoColumnStats_8.q.out      | 2624 ++++++++++++++++++
 .../clientpositive/autoColumnStats_9.q.out      |  268 ++
 39 files changed, 8845 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/beeline/src/java/org/apache/hive/beeline/BeeLine.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/BeeLine.java b/beeline/src/java/org/apache/hive/beeline/BeeLine.java
index 9138613..734eeb8 100644
--- a/beeline/src/java/org/apache/hive/beeline/BeeLine.java
+++ b/beeline/src/java/org/apache/hive/beeline/BeeLine.java
@@ -93,9 +93,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hive.beeline.cli.CliOptionsProcessor;
 
-import org.apache.hive.jdbc.Utils;
-import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
-
 /**
  * A console SQL shell with command completion.
  * <p>
@@ -142,6 +139,7 @@ public class BeeLine implements Closeable {
   private static final Options options = new Options();
 
   public static final String BEELINE_DEFAULT_JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
+  public static final String BEELINE_DEFAULT_JDBC_URL = "jdbc:hive2://";
   public static final String DEFAULT_DATABASE_NAME = "default";
 
   private static final String SCRIPT_OUTPUT_PREFIX = ">>>";
@@ -768,14 +766,6 @@ public class BeeLine implements Closeable {
     */
 
     if (url != null) {
-      if (user == null) {
-        user = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER);
-      }
-
-      if (pass == null) {
-        pass = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD);
-      }
-
       String com = constructCmd(url, user, pass, driver, false);
       String comForDebug = constructCmd(url, user, pass, driver, true);
       debug("issuing: " + comForDebug);
@@ -904,7 +894,7 @@ public class BeeLine implements Closeable {
   }
 
   private int embeddedConnect() {
-    if (!execCommandWithPrefix("!connect " + Utils.URL_PREFIX + " '' ''")) {
+    if (!execCommandWithPrefix("!connect " + BEELINE_DEFAULT_JDBC_URL + " '' ''")) {
       return ERRNO_OTHER;
     } else {
       return ERRNO_OK;

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/beeline/src/java/org/apache/hive/beeline/Commands.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java
index 3a204c0..80703ff 100644
--- a/beeline/src/java/org/apache/hive/beeline/Commands.java
+++ b/beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -61,8 +61,6 @@ import java.util.TreeSet;
 
 import org.apache.hadoop.hive.common.cli.ShellCmdExecutor;
 import org.apache.hive.jdbc.HiveStatement;
-import org.apache.hive.jdbc.Utils;
-import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
 
 
 public class Commands {
@@ -1316,41 +1314,18 @@ public class Commands {
     Properties props = new Properties();
     if (url != null) {
       String saveUrl = getUrlToUse(url);
-      props.setProperty(JdbcConnectionParams.PROPERTY_URL, url);
+      props.setProperty("url", saveUrl);
     }
-
-    String value = null;
     if (driver != null) {
-      props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, driver);
-    } else {
-      value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.PROPERTY_DRIVER);
-      if (value != null) {
-        props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, value);
-      }
+      props.setProperty("driver", driver);
     }
-
     if (user != null) {
-      props.setProperty(JdbcConnectionParams.AUTH_USER, user);
-    } else {
-      value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER);
-      if (value != null) {
-        props.setProperty(JdbcConnectionParams.AUTH_USER, value);
-      }
+      props.setProperty("user", user);
     }
-
     if (pass != null) {
-      props.setProperty(JdbcConnectionParams.AUTH_PASSWD, pass);
-    } else {
-      value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD);
-      if (value != null) {
-        props.setProperty(JdbcConnectionParams.AUTH_PASSWD, value);
-      }
+      props.setProperty("password", pass);
     }
 
-    value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_TYPE);
-    if (value != null) {
-      props.setProperty(JdbcConnectionParams.AUTH_TYPE, value);
-    }
     return connect(props);
   }
 
@@ -1403,25 +1378,26 @@ public class Commands {
 
   public boolean connect(Properties props) throws IOException {
     String url = getProperty(props, new String[] {
-        JdbcConnectionParams.PROPERTY_URL,
+        "url",
         "javax.jdo.option.ConnectionURL",
         "ConnectionURL",
     });
     String driver = getProperty(props, new String[] {
-        JdbcConnectionParams.PROPERTY_DRIVER,
+        "driver",
         "javax.jdo.option.ConnectionDriverName",
         "ConnectionDriverName",
     });
     String username = getProperty(props, new String[] {
-        JdbcConnectionParams.AUTH_USER,
+        "user",
         "javax.jdo.option.ConnectionUserName",
         "ConnectionUserName",
     });
     String password = getProperty(props, new String[] {
-        JdbcConnectionParams.AUTH_PASSWD,
+        "password",
         "javax.jdo.option.ConnectionPassword",
         "ConnectionPassword",
     });
+    String auth = getProperty(props, new String[] {"auth"});
 
     if (url == null || url.length() == 0) {
       return beeLine.error("Property \"url\" is required");
@@ -1432,25 +1408,23 @@ public class Commands {
       }
     }
 
-    String auth = getProperty(props, new String[] {JdbcConnectionParams.AUTH_TYPE});
+    beeLine.info("Connecting to " + url);
+
+    if (username == null) {
+      username = beeLine.getConsoleReader().readLine("Enter username for " + url + ": ");
+    }
+    props.setProperty("user", username);
+    if (password == null) {
+      password = beeLine.getConsoleReader().readLine("Enter password for " + url + ": ",
+          new Character('*'));
+    }
+    props.setProperty("password", password);
+
     if (auth == null) {
       auth = beeLine.getOpts().getAuthType();
-      if (auth != null) {
-        props.setProperty(JdbcConnectionParams.AUTH_TYPE, auth);
-      }
     }
-
-    beeLine.info("Connecting to " + url);
-    if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PRINCIPAL) == null) {
-      if (username == null) {
-        username = beeLine.getConsoleReader().readLine("Enter username for " + url + ": ");
-      }
-      props.setProperty(JdbcConnectionParams.AUTH_USER, username);
-      if (password == null) {
-        password = beeLine.getConsoleReader().readLine("Enter password for " + url + ": ",
-          new Character('*'));
-      }
-      props.setProperty(JdbcConnectionParams.AUTH_PASSWD, password);
+    if (auth != null) {
+      props.setProperty("auth", auth);
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c0843b9..ed20069 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1519,7 +1519,9 @@ public class HiveConf extends Configuration {
 
     // Statistics
     HIVESTATSAUTOGATHER("hive.stats.autogather", true,
-        "A flag to gather statistics automatically during the INSERT OVERWRITE command."),
+        "A flag to gather statistics (only basic) automatically during the INSERT OVERWRITE command."),
+    HIVESTATSCOLAUTOGATHER("hive.stats.column.autogather", false,
+        "A flag to gather column statistics automatically."),
     HIVESTATSDBCLASS("hive.stats.dbclass", "fs", new PatternSet("custom", "fs"),
         "The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'), \n" +
         "each task writes statistics it has collected in a file on the filesystem, which will be aggregated \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
index ecfeddb..f9909ad 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
@@ -39,7 +39,6 @@ import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.jdbc.Utils;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -176,7 +175,6 @@ public class TestBeeLineWithArgs {
 
     // Put the script content in a temp file
     File scriptFile = File.createTempFile(this.getClass().getSimpleName(), "temp");
-    System.out.println("script file is " + scriptFile.getAbsolutePath());
     scriptFile.deleteOnExit();
     PrintStream os = new PrintStream(new FileOutputStream(scriptFile));
     os.print(scriptText);
@@ -657,7 +655,7 @@ public class TestBeeLineWithArgs {
 
   @Test
   public void testEmbeddedBeelineConnection() throws Throwable{
-    String embeddedJdbcURL = Utils.URL_PREFIX+"/Default";
+    String embeddedJdbcURL = BeeLine.BEELINE_DEFAULT_JDBC_URL+"/Default";
     List<String> argList = getBaseArgs(embeddedJdbcURL);
 	  argList.add("--hivevar");
     argList.add("DUMMY_TBL=embedded_table");
@@ -772,7 +770,7 @@ public class TestBeeLineWithArgs {
 
   @Test
   public void testEmbeddedBeelineOutputs() throws Throwable{
-    String embeddedJdbcURL = Utils.URL_PREFIX+"/Default";
+    String embeddedJdbcURL = BeeLine.BEELINE_DEFAULT_JDBC_URL+"/Default";
     List<String> argList = getBaseArgs(embeddedJdbcURL);
     // Set to non-zk lock manager to avoid trying to connect to zookeeper
     final String SCRIPT_TEXT =
@@ -845,22 +843,4 @@ public class TestBeeLineWithArgs {
 
   }
 
-  /**
-   * Attempt to execute a simple script file with the usage of user & password variables in URL.
-   * Test for presence of an expected pattern
-   * in the output (stdout or stderr), fail if not found
-   * Print PASSED or FAILED
-   */
-  @Test
-  public void testConnectionWithURLParams() throws Throwable {
-    final String EXPECTED_PATTERN = " hivetest ";
-    List<String> argList = new ArrayList<String>();
-    argList.add("-d");
-    argList.add(BeeLine.BEELINE_DEFAULT_JDBC_DRIVER);
-    argList.add("-u");
-    argList.add(miniHS2.getBaseJdbcURL() + ";user=hivetest;password=hive");
-    String SCRIPT_TEXT = "select current_user();";
-
-    testScriptFile( SCRIPT_TEXT, EXPECTED_PATTERN, true, argList);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/jdbc/src/java/org/apache/hive/jdbc/Utils.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 7ea6309..42181d7 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -37,12 +37,12 @@ import org.apache.http.cookie.Cookie;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Utils {
+class Utils {
   static final Logger LOG = LoggerFactory.getLogger(Utils.class.getName());
   /**
     * The required prefix for the connection URL.
     */
-  public static final String URL_PREFIX = "jdbc:hive2://";
+  static final String URL_PREFIX = "jdbc:hive2://";
 
   /**
     * If host is provided, without a port.
@@ -63,7 +63,7 @@ public class Utils {
   static final String HIVE_SERVER2_RETRY_TRUE = "true";
   static final String HIVE_SERVER2_RETRY_FALSE = "false";
 
-  public static class JdbcConnectionParams {
+  static class JdbcConnectionParams {
     // Note on client side parameter naming convention:
     // Prefer using a shorter camelCase param name instead of using the same name as the
     // corresponding
@@ -76,33 +76,31 @@ public class Utils {
     // Retry setting
     static final String RETRIES = "retries";
 
-    public static final String AUTH_TYPE = "auth";
+    static final String AUTH_TYPE = "auth";
     // We're deprecating this variable's name.
-    public static final String AUTH_QOP_DEPRECATED = "sasl.qop";
-    public static final String AUTH_QOP = "saslQop";
-    public static final String AUTH_SIMPLE = "noSasl";
-    public static final String AUTH_TOKEN = "delegationToken";
-    public static final String AUTH_USER = "user";
-    public static final String AUTH_PRINCIPAL = "principal";
-    public static final String AUTH_PASSWD = "password";
-    public static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
-    public static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
-    public static final String ANONYMOUS_USER = "anonymous";
-    public static final String ANONYMOUS_PASSWD = "anonymous";
-    public static final String USE_SSL = "ssl";
-    public static final String SSL_TRUST_STORE = "sslTrustStore";
-    public static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
+    static final String AUTH_QOP_DEPRECATED = "sasl.qop";
+    static final String AUTH_QOP = "saslQop";
+    static final String AUTH_SIMPLE = "noSasl";
+    static final String AUTH_TOKEN = "delegationToken";
+    static final String AUTH_USER = "user";
+    static final String AUTH_PRINCIPAL = "principal";
+    static final String AUTH_PASSWD = "password";
+    static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
+    static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
+    static final String ANONYMOUS_USER = "anonymous";
+    static final String ANONYMOUS_PASSWD = "anonymous";
+    static final String USE_SSL = "ssl";
+    static final String SSL_TRUST_STORE = "sslTrustStore";
+    static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
     // We're deprecating the name and placement of this in the parsed map (from hive conf vars to
     // hive session vars).
     static final String TRANSPORT_MODE_DEPRECATED = "hive.server2.transport.mode";
-    public static final String TRANSPORT_MODE = "transportMode";
+    static final String TRANSPORT_MODE = "transportMode";
     // We're deprecating the name and placement of this in the parsed map (from hive conf vars to
     // hive session vars).
     static final String HTTP_PATH_DEPRECATED = "hive.server2.thrift.http.path";
-    public static final String HTTP_PATH = "httpPath";
-    public static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
-    public static final String PROPERTY_DRIVER        = "driver";
-    public static final String PROPERTY_URL           = "url";
+    static final String HTTP_PATH = "httpPath";
+    static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
     // Don't use dynamic service discovery
     static final String SERVICE_DISCOVERY_MODE_NONE = "none";
     // Use ZooKeeper for indirection while using dynamic service discovery
@@ -633,14 +631,4 @@ public class Utils {
     }
     return true;
   }
-
-  public static String parsePropertyFromUrl(final String url, final String key) {
-    String[] tokens = url.split(";");
-    for (String token : tokens) {
-      if (token.trim().startsWith(key.trim() + "=")) {
-        return token.trim().substring((key.trim() + "=").length());
-      }
-    }
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 00552a8..636f079 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -406,11 +406,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
       }
     }
     if (asyncEx != null) {
-      if (asyncEx instanceof Exception) {
-        throw new HiveException("Async initialization failed", asyncEx);
-      } else {
-        throw (Error) asyncEx;
-      }
+      throw new HiveException("Async initialization failed", asyncEx);
     }
     completeInitializationOp(os);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
index 47ab9c2..8319f11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
@@ -40,14 +40,12 @@ public class TextRecordReader implements RecordReader {
   private InputStream in;
   private Text row;
   private Configuration conf;
-  private boolean escape;
 
   public void initialize(InputStream in, Configuration conf, Properties tbl)
       throws IOException {
     lineReader = new LineReader(in, conf);
     this.in = in;
     this.conf = conf;
-    escape = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE);
   }
 
   public Writable createRow() throws IOException {
@@ -62,7 +60,7 @@ public class TextRecordReader implements RecordReader {
 
     int bytesConsumed = lineReader.readLine((Text) row);
 
-    if (escape) {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE)) {
       return HiveUtils.unescapeText((Text) row);
     }
     return bytesConsumed;

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
index f15458d..10b4594 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
@@ -35,20 +35,18 @@ public class TextRecordWriter implements RecordWriter {
 
   private OutputStream out;
   private Configuration conf;
-  private boolean escape;
 
   public void initialize(OutputStream out, Configuration conf)
       throws IOException {
     this.out = out;
     this.conf = conf;
-    escape = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE);
   }
 
   public void write(Writable row) throws IOException {
     Text text = (Text) row;
     Text escapeText = text;
 
-    if (escape) {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE)) {
       escapeText = HiveUtils.escapeText(text);
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 8144c3b..7082931 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec;
 import java.util.ArrayList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.beans.DefaultPersistenceDelegate;
 import java.beans.Encoder;
 import java.beans.Expression;
@@ -97,7 +96,6 @@ import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -196,7 +194,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoException;
 import com.google.common.base.Preconditions;
 
 /**
@@ -439,7 +436,6 @@ public final class Utilities {
             throw new RuntimeException("Unknown work type: " + name);
           }
         }
-
         gWorkMap.get(conf).put(path, gWork);
       } else if (LOG.isDebugEnabled()) {
         LOG.debug("Found plan in cache for name: " + name);
@@ -450,16 +446,6 @@ public final class Utilities {
       LOG.debug("File not found: " + fnf.getMessage());
       LOG.info("No plan file found: "+path);
       return null;
-    } catch (KryoException ke) {
-      Throwable cnfThrowable = findClassNotFoundException(ke);
-      if (LlapProxy.isDaemon() && (cnfThrowable != null)) {
-        LOG.error("Missing class \"" + cnfThrowable.getMessage() + "\". If this is a UDF and you " +
-            "are running LLAP, you may need to regenerate the llap startup script and restart " +
-            "llap with jars for your udf.", cnfThrowable);
-        throw new RuntimeException("Cannot find \"" + cnfThrowable.getMessage() + "\" You may" +
-           " need to regenerate the LLAP startup script and restart llap daemons.", cnfThrowable);
-      }
-      throw new RuntimeException(ke);
     } catch (Exception e) {
       String msg = "Failed to load plan: " + path + ": " + e;
       LOG.error(msg, e);
@@ -474,16 +460,6 @@ public final class Utilities {
     }
   }
 
-  private static Throwable findClassNotFoundException(Throwable ke) {
-    while (ke != null) {
-      if (ke instanceof ClassNotFoundException) {
-        return ke;
-      }
-      ke = ke.getCause();
-    }
-    return null;
-  }
-
   public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
     try {
       Graph stageGraph = plan.getQueryPlan().getStageGraph();

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index a33b6e2..c560f37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -39,7 +39,6 @@ import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 /**
@@ -179,8 +178,6 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
     } finally {
       if (originalThrowable != null && originalThrowable instanceof Error) {
         LOG.error(StringUtils.stringifyException(originalThrowable));
-        getContext().reportFailure(TaskFailureType.FATAL, originalThrowable,
-            "Cannot recover from this error");
         throw new RuntimeException(originalThrowable);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index d9f58f2..3fa1233 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2860,16 +2860,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
           if (destIsSubDir) {
             FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
 
-            if (inheritPerms) {
-              try {
-                HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, false);
-              } catch (IOException e) {
-                String msg = "Error setting permission of file " + destf;
-                LOG.error(msg);
-                throw new HiveException(msg, e);
-              }
-            }
-
             List<Future<Void>> futures = new LinkedList<>();
             final ExecutorService pool = Executors.newFixedThreadPool(
                 conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index 9fbbd4c..f41fa4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -117,12 +117,6 @@ public final class GenMRSkewJoinProcessor {
     }
 
     List<Task<? extends Serializable>> children = currTask.getChildTasks();
-    if (children != null && children.size() > 1) {
-      throw new SemanticException("Should not happened");
-    }
-
-    Task<? extends Serializable> child =
-        children != null && children.size() == 1 ? children.get(0) : null;
 
     Path baseTmpDir = parseCtx.getContext().getMRTmpPath();
 
@@ -347,13 +341,14 @@ public final class GenMRSkewJoinProcessor {
           tsk.addDependentTask(oldChild);
         }
       }
-    }
-    if (child != null) {
-      currTask.removeDependentTask(child);
-      listTasks.add(child);
+      currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>());
+      for (Task<? extends Serializable> oldChild : children) {
+        oldChild.getParentTasks().remove(currTask);
+      }
+      listTasks.addAll(children);
     }
     ConditionalResolverSkewJoinCtx context =
-        new ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, child);
+        new ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, children);
 
     ConditionalWork cndWork = new ConditionalWork(listWorks);
     ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index 11ec07a..ded9231 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -93,9 +93,6 @@ public class GenSparkSkewJoinProcessor {
 
     List<Task<? extends Serializable>> children = currTask.getChildTasks();
 
-    Task<? extends Serializable> child =
-        children != null && children.size() == 1 ? children.get(0) : null;
-
     Path baseTmpDir = parseCtx.getContext().getMRTmpPath();
 
     JoinDesc joinDescriptor = joinOp.getConf();
@@ -334,14 +331,17 @@ public class GenSparkSkewJoinProcessor {
           tsk.addDependentTask(oldChild);
         }
       }
-    }
-    if (child != null) {
-      currTask.removeDependentTask(child);
-      listTasks.add(child);
-      listWorks.add(child.getWork());
+      currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>());
+      for (Task<? extends Serializable> oldChild : children) {
+        oldChild.getParentTasks().remove(currTask);
+      }
+      listTasks.addAll(children);
+      for (Task<? extends Serializable> oldChild : children) {
+        listWorks.add(oldChild.getWork());
+      }
     }
     ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx context =
-        new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, child);
+        new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, children);
 
     ConditionalWork cndWork = new ConditionalWork(listWorks);
     ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
index 8e56263..a3ec990 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
@@ -411,7 +411,9 @@ public class SparkMapJoinResolver implements PhysicalPlanResolver {
         context.setDirToTaskMap(newbigKeysDirToTaskMap);
         // update no skew task
         if (context.getNoSkewTask() != null && context.getNoSkewTask().equals(originalTask)) {
-          context.setNoSkewTask(newTask);
+          List<Task<? extends Serializable>> noSkewTask = new ArrayList<>();
+          noSkewTask.add(newTask);
+          context.setNoSkewTask(noSkewTask);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
new file mode 100644
index 0000000..15a47dc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * ColumnStatsAutoGatherContext: This is passed to the compiler when set
+ * hive.stats.autogather=true during the INSERT OVERWRITE command.
+ *
+ **/
+
+public class ColumnStatsAutoGatherContext {
+
+  public AnalyzeRewriteContext analyzeRewrite;
+  private final List<LoadFileDesc> loadFileWork = new ArrayList<>();
+  private final SemanticAnalyzer sa;
+  private final HiveConf conf;
+  private final Operator<? extends OperatorDesc> op;
+  private final List<FieldSchema> columns;
+  private final List<FieldSchema> partitionColumns;
+  private boolean isInsertInto;
+  private Table tbl;
+  private Map<String, String> partSpec;
+  
+  public ColumnStatsAutoGatherContext(SemanticAnalyzer sa, HiveConf conf,
+      Operator<? extends OperatorDesc> op, Table tbl, Map<String, String> partSpec,
+      boolean isInsertInto) throws SemanticException {
+    super();
+    this.sa = sa;
+    this.conf = conf;
+    this.op = op;
+    this.tbl = tbl;
+    this.partSpec = partSpec;
+    this.isInsertInto = isInsertInto;
+    columns = tbl.getCols();
+    partitionColumns = tbl.getPartCols();
+  }
+
+  public List<LoadFileDesc> getLoadFileWork() {
+    return loadFileWork;
+  }
+
+  public AnalyzeRewriteContext getAnalyzeRewrite() {
+    return analyzeRewrite;
+  }
+
+  public void setAnalyzeRewrite(AnalyzeRewriteContext analyzeRewrite) {
+    this.analyzeRewrite = analyzeRewrite;
+  }
+
+  public void insertAnalyzePipeline() throws SemanticException{
+    // 1. Generate the statement of analyze table [tablename] compute statistics for columns
+    // In non-partitioned table case, it will generate TS-SEL-GBY-RS-GBY-SEL-FS operator
+    // In static-partitioned table case, it will generate TS-FIL(partitionKey)-SEL-GBY(partitionKey)-RS-GBY-SEL-FS operator
+    // In dynamic-partitioned table case, it will generate TS-SEL-GBY(partitionKey)-RS-GBY-SEL-FS operator
+    // However, we do not need to specify the partition-spec because (1) the data is going to be inserted to that specific partition
+    // (2) we can compose the static/dynamic partition using a select operator in replaceSelectOperatorProcess..
+    String analyzeCommand = "analyze table `" + tbl.getDbName() + "`.`" + tbl.getTableName() + "`"
+        + " compute statistics for columns ";
+
+    // 2. Based on the statement, generate the selectOperator
+    Operator<?> selOp = null;
+    try {
+      selOp = genSelOpForAnalyze(analyzeCommand);
+    } catch (IOException | ParseException e) {
+      throw new SemanticException(e);
+    }
+
+    // 3. attach this SEL to the operator right before FS
+    op.getChildOperators().add(selOp);
+    selOp.getParentOperators().clear();
+    selOp.getParentOperators().add(op);
+
+    // 4. address the colExp, colList, etc for the SEL
+    try {
+      replaceSelectOperatorProcess((SelectOperator)selOp, op);
+    } catch (HiveException e) {
+      throw new SemanticException(e);
+    }
+  }
+  
+  @SuppressWarnings("rawtypes")
+  private Operator genSelOpForAnalyze(String analyzeCommand) throws IOException, ParseException, SemanticException{
+    //0. initialization
+    Context ctx = new Context(conf);
+    ParseDriver pd = new ParseDriver();
+    ASTNode tree = pd.parse(analyzeCommand, ctx);
+    tree = ParseUtils.findRootNonNullToken(tree);
+
+    //1. get the ColumnStatsSemanticAnalyzer
+    BaseSemanticAnalyzer baseSem = SemanticAnalyzerFactory.get(new QueryState(conf), tree);
+    ColumnStatsSemanticAnalyzer colSem = (ColumnStatsSemanticAnalyzer) baseSem;
+
+    //2. get the rewritten AST
+    ASTNode ast = colSem.rewriteAST(tree, this);
+    baseSem = SemanticAnalyzerFactory.get(new QueryState(conf), ast);
+    SemanticAnalyzer sem = (SemanticAnalyzer) baseSem;
+    QB qb = new QB(null, null, false);
+    ASTNode child = ast;
+    ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext();
+    subPCtx.setContext(ctx);
+    ((SemanticAnalyzer) sem).initParseCtx(subPCtx);
+    sem.doPhase1(child, qb, sem.initPhase1Ctx(), null);
+    // This will trigger new calls to metastore to collect metadata
+    // TODO: cache the information from the metastore
+    sem.getMetaData(qb);
+    Operator<?> operator = sem.genPlan(qb);
+
+    //3. populate the load file work so that ColumnStatsTask can work
+    loadFileWork.addAll(sem.getLoadFileWork());
+
+    //4. because there is only one TS for analyze statement, we can get it.
+    if (sem.topOps.values().size() != 1) {
+      throw new SemanticException(
+          "ColumnStatsAutoGatherContext is expecting exactly one TS, but finds "
+              + sem.topOps.values().size());
+    }
+    operator = sem.topOps.values().iterator().next();
+
+    //5. get the first SEL after TS
+    while(!(operator instanceof SelectOperator)){
+      operator = operator.getChildOperators().get(0);
+    }
+    return operator;
+  }
+
+  /**
+   * @param operator : the select operator in the analyze statement
+   * @param input : the operator right before FS in the insert overwrite statement
+   * @throws HiveException 
+   */
+  private void replaceSelectOperatorProcess(SelectOperator operator, Operator<? extends OperatorDesc> input)
+      throws HiveException {
+    RowSchema selRS = operator.getSchema();
+    ArrayList<ColumnInfo> signature = new ArrayList<>();
+    OpParseContext inputCtx = sa.opParseCtx.get(input);
+    RowResolver inputRR = inputCtx.getRowResolver();
+    ArrayList<ColumnInfo> columns = inputRR.getColumnInfos();
+    ArrayList<ExprNodeDesc> colList = new ArrayList<ExprNodeDesc>();
+    ArrayList<String> columnNames = new ArrayList<String>();
+    Map<String, ExprNodeDesc> columnExprMap =
+        new HashMap<String, ExprNodeDesc>();
+    // the column positions in the operator should be like this
+    // <----non-partition columns---->|<--static partition columns-->|<--dynamic partition columns-->
+    //        ExprNodeColumnDesc      |      ExprNodeConstantDesc    |     ExprNodeColumnDesc
+    //           from input           |         generate itself      |        from input
+    //                                |
+
+    // 1. deal with non-partition columns
+    for (int i = 0; i < this.columns.size(); i++) {
+      ColumnInfo col = columns.get(i);
+      ExprNodeDesc exprNodeDesc = new ExprNodeColumnDesc(col);
+      colList.add(exprNodeDesc);
+      String internalName = selRS.getColumnNames().get(i);
+      columnNames.add(internalName);
+      columnExprMap.put(internalName, exprNodeDesc);
+      signature.add(selRS.getSignature().get(i));
+    }
+    // if there is any partition column (in static partition or dynamic
+    // partition or mixed case)
+    int dynamicPartBegin = -1;
+    for (int i = 0; i < partitionColumns.size(); i++) {
+      ExprNodeDesc exprNodeDesc = null;
+      String partColName = partitionColumns.get(i).getName();
+      // 2. deal with static partition columns
+      if (partSpec != null && partSpec.containsKey(partColName)
+          && partSpec.get(partColName) != null) {
+        if (dynamicPartBegin > 0) {
+          throw new SemanticException(
+              "Dynamic partition columns should not come before static partition columns.");
+        }
+        exprNodeDesc = new ExprNodeConstantDesc(partSpec.get(partColName));
+        TypeInfo srcType = exprNodeDesc.getTypeInfo();
+        TypeInfo destType = selRS.getSignature().get(this.columns.size() + i).getType();
+        if (!srcType.equals(destType)) {
+          // This may be possible when srcType is string but destType is integer
+          exprNodeDesc = ParseUtils
+              .createConversionCast(exprNodeDesc, (PrimitiveTypeInfo) destType);
+        }
+      }
+      // 3. dynamic partition columns
+      else {
+        dynamicPartBegin++;
+        ColumnInfo col = columns.get(this.columns.size() + dynamicPartBegin);
+        TypeInfo srcType = col.getType();
+        TypeInfo destType = selRS.getSignature().get(this.columns.size() + i).getType();
+        exprNodeDesc = new ExprNodeColumnDesc(col);
+        if (!srcType.equals(destType)) {
+          exprNodeDesc = ParseUtils
+              .createConversionCast(exprNodeDesc, (PrimitiveTypeInfo) destType);
+        }
+      }
+      colList.add(exprNodeDesc);
+      String internalName = selRS.getColumnNames().get(this.columns.size() + i);
+      columnNames.add(internalName);
+      columnExprMap.put(internalName, exprNodeDesc);
+      signature.add(selRS.getSignature().get(this.columns.size() + i));
+    }
+    operator.setConf(new SelectDesc(colList, columnNames));
+    operator.setColumnExprMap(columnExprMap);
+    selRS.setSignature(signature);
+    operator.setSchema(selRS);
+  }
+
+  public String getCompleteName() {
+    return tbl.getDbName() + "." + tbl.getTableName();
+  }
+
+  public boolean isInsertInto() {
+    return isInsertInto;
+  }
+
+  public static boolean canRunAutogatherStats(Operator curr) {
+    // check the ObjectInspector
+    for (ColumnInfo cinfo : curr.getSchema().getSignature()) {
+      if (cinfo.getIsVirtualCol()) {
+        return false;
+      } else if (cinfo.getObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE) {
+        return false;
+      } else {
+        switch (((PrimitiveTypeInfo) cinfo.getType()).getPrimitiveCategory()) {
+        case BOOLEAN:
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+        case TIMESTAMP:
+        case FLOAT:
+        case DOUBLE:
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+        case BINARY:
+        case DECIMAL:
+          // TODO: Support case DATE:
+          break;
+        default:
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
index 3b6cbce..d3aef41 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
@@ -110,11 +110,18 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
       partValsSpecified += partSpec.get(partKey) == null ? 0 : 1;
     }
     try {
-      if ((partValsSpecified == tbl.getPartitionKeys().size()) && (db.getPartition(tbl, partSpec, false, null, false) == null)) {
-        throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + " : " + partSpec);
+      // for static partition, it may not exist when HIVESTATSCOLAUTOGATHER is
+      // set to true
+      if (!conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)) {
+        if ((partValsSpecified == tbl.getPartitionKeys().size())
+            && (db.getPartition(tbl, partSpec, false, null, false) == null)) {
+          throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg()
+              + " : " + partSpec);
+        }
       }
     } catch (HiveException he) {
-      throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + " : " + partSpec);
+      throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + " : "
+          + partSpec);
     }
 
     // User might have only specified partial list of partition keys, in which case add other partition keys in partSpec
@@ -157,7 +164,7 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
         } else {
           groupByClause.append(",");
         }
-        groupByClause.append(fs.getName());
+      groupByClause.append("`" + fs.getName() + "`");
     }
 
     // attach the predicate and group by to the return clause
@@ -235,12 +242,12 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
 
     if (isPartitionStats) {
       for (FieldSchema fs : tbl.getPartCols()) {
-        rewrittenQueryBuilder.append(" , " + fs.getName());
+        rewrittenQueryBuilder.append(" , `" + fs.getName() + "`");
       }
     }
-    rewrittenQueryBuilder.append(" from ");
+    rewrittenQueryBuilder.append(" from `");
     rewrittenQueryBuilder.append(tbl.getDbName());
-    rewrittenQueryBuilder.append(".");
+    rewrittenQueryBuilder.append("`.");
     rewrittenQueryBuilder.append("`" + tbl.getTableName() + "`");
     isRewritten = true;
 
@@ -378,4 +385,54 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
       analyzeInternal(originalTree);
     }
   }
+
+  /**
+   * @param ast
+   *          is the original analyze ast
+   * @param qb
+   *          is the qb that calls this function
+   * @param sem
+   *          is the semantic analyzer that calls this function
+   * @return
+   * @throws SemanticException
+   */
+  public ASTNode rewriteAST(ASTNode ast, ColumnStatsAutoGatherContext context)
+      throws SemanticException {
+    tbl = AnalyzeCommandUtils.getTable(ast, this);
+    colNames = getColumnName(ast);
+    // Save away the original AST
+    originalTree = ast;
+    boolean isPartitionStats = AnalyzeCommandUtils.isPartitionLevelStats(ast);
+    Map<String, String> partSpec = null;
+    checkForPartitionColumns(colNames,
+        Utilities.getColumnNamesFromFieldSchema(tbl.getPartitionKeys()));
+    validateSpecifiedColumnNames(colNames);
+    if (conf.getBoolVar(ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS) && tbl.isPartitioned()) {
+      isPartitionStats = true;
+    }
+
+    if (isPartitionStats) {
+      isTableLevel = false;
+      partSpec = AnalyzeCommandUtils.getPartKeyValuePairsFromAST(tbl, ast, conf);
+      handlePartialPartitionSpec(partSpec);
+    } else {
+      isTableLevel = true;
+    }
+    colType = getColumnTypes(colNames);
+    int numBitVectors = 0;
+    try {
+      numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
+    } catch (Exception e) {
+      throw new SemanticException(e.getMessage());
+    }
+    rewrittenQuery = genRewrittenQuery(colNames, numBitVectors, partSpec, isPartitionStats);
+    rewrittenTree = genRewrittenTree(rewrittenQuery);
+
+    context.analyzeRewrite = new AnalyzeRewriteContext();
+    context.analyzeRewrite.setTableName(tbl.getDbName() + "." + tbl.getTableName());
+    context.analyzeRewrite.setTblLvl(isTableLevel);
+    context.analyzeRewrite.setColName(colNames);
+    context.analyzeRewrite.setColType(colType);
+    return rewrittenTree;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 96ef20d..b2125ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -80,6 +80,7 @@ public class ParseContext {
   private HashMap<String, SplitSample> nameToSplitSample;
   private List<LoadTableDesc> loadTableWork;
   private List<LoadFileDesc> loadFileWork;
+  private List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts;
   private Context ctx;
   private QueryState queryState;
   private HiveConf conf;
@@ -166,6 +167,7 @@ public class ParseContext {
       Set<JoinOperator> joinOps,
       Set<SMBMapJoinOperator> smbMapJoinOps,
       List<LoadTableDesc> loadTableWork, List<LoadFileDesc> loadFileWork,
+      List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts,
       Context ctx, HashMap<String, String> idToTableNameMap, int destTableId,
       UnionProcContext uCtx, List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinOpsNoReducer,
       Map<String, PrunedPartitionList> prunedPartitions,
@@ -188,6 +190,7 @@ public class ParseContext {
     this.smbMapJoinOps = smbMapJoinOps;
     this.loadFileWork = loadFileWork;
     this.loadTableWork = loadTableWork;
+    this.columnStatsAutoGatherContexts = columnStatsAutoGatherContexts;
     this.topOps = topOps;
     this.ctx = ctx;
     this.idToTableNameMap = idToTableNameMap;
@@ -608,4 +611,13 @@ public class ParseContext {
   public Map<String, Table> getTabNameToTabObject() {
     return tabNameToTabObject;
   }
+
+  public List<ColumnStatsAutoGatherContext> getColumnStatsAutoGatherContexts() {
+    return columnStatsAutoGatherContexts;
+  }
+
+  public void setColumnStatsAutoGatherContexts(
+      List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts) {
+    this.columnStatsAutoGatherContexts = columnStatsAutoGatherContexts;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index 3a226e7..3a0402e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -63,7 +63,9 @@ public class QBParseInfo {
   private final Set<String> destCubes;
   private final Set<String> destGroupingSets;
   private final Map<String, ASTNode> destToHaving;
-  private final HashSet<String> insertIntoTables;
+  // insertIntoTables/insertOverwriteTables map a table's fullName to its ast;
+  private final Map<String, ASTNode> insertIntoTables;
+  private final Map<String, ASTNode> insertOverwriteTables;
 
   private boolean isAnalyzeCommand; // used for the analyze command (statistics)
   private boolean isNoScanAnalyzeCommand; // used for the analyze command (statistics) (noscan)
@@ -133,7 +135,8 @@ public class QBParseInfo {
     destToSortby = new HashMap<String, ASTNode>();
     destToOrderby = new HashMap<String, ASTNode>();
     destToLimit = new HashMap<String, SimpleEntry<Integer, Integer>>();
-    insertIntoTables = new HashSet<String>();
+    insertIntoTables = new HashMap<String, ASTNode>();
+    insertOverwriteTables = new HashMap<String, ASTNode>();
     destRollups = new HashSet<String>();
     destCubes = new HashSet<String>();
     destGroupingSets = new HashSet<String>();
@@ -174,13 +177,13 @@ public class QBParseInfo {
     }
   }
 
-  public void addInsertIntoTable(String fullName) {
-    insertIntoTables.add(fullName.toLowerCase());
+  public void addInsertIntoTable(String fullName, ASTNode ast) {
+    insertIntoTables.put(fullName.toLowerCase(), ast);
   }
 
   public boolean isInsertIntoTable(String dbName, String table) {
     String fullName = dbName + "." + table;
-    return insertIntoTables.contains(fullName.toLowerCase());
+    return insertIntoTables.containsKey(fullName.toLowerCase());
   }
 
   /**
@@ -189,7 +192,7 @@ public class QBParseInfo {
    * @return
    */
   public boolean isInsertIntoTable(String fullTableName) {
-    return insertIntoTables.contains(fullTableName.toLowerCase());
+    return insertIntoTables.containsKey(fullTableName.toLowerCase());
   }
 
   public HashMap<String, ASTNode> getAggregationExprsForClause(String clause) {
@@ -636,6 +639,11 @@ public class QBParseInfo {
   public void setPartialScanAnalyzeCommand(boolean isPartialScanAnalyzeCommand) {
     this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand;
   }
+
+  public Map<String, ASTNode> getInsertOverwriteTables() {
+    return insertOverwriteTables;
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 7162c08..6937308 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -129,6 +130,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverterPostProc;
 import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec.SpecType;
 import org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher;
@@ -182,6 +184,7 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -259,6 +262,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   protected LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx;
   private List<LoadTableDesc> loadTableWork;
   private List<LoadFileDesc> loadFileWork;
+  private List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts;
   private final Map<JoinOperator, QBJoinTree> joinContext;
   private final Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
   private final HashMap<TableScanOperator, Table> topToTable;
@@ -353,6 +357,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     topOps = new LinkedHashMap<String, TableScanOperator>();
     loadTableWork = new ArrayList<LoadTableDesc>();
     loadFileWork = new ArrayList<LoadFileDesc>();
+    columnStatsAutoGatherContexts = new ArrayList<ColumnStatsAutoGatherContext>();
     opParseCtx = new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
     joinContext = new HashMap<JoinOperator, QBJoinTree>();
     smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>();
@@ -390,6 +395,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     tabNameToTabObject.clear();
     loadTableWork.clear();
     loadFileWork.clear();
+    columnStatsAutoGatherContexts.clear();
     topOps.clear();
     destTableId = 1;
     idToTableNameMap.clear();
@@ -448,7 +454,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     return new ParseContext(queryState, opToPartPruner, opToPartList, topOps,
         new HashSet<JoinOperator>(joinContext.keySet()),
         new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
-        loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+        loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
         opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
@@ -1401,18 +1407,25 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       case HiveParser.TOK_INSERT_INTO:
         String currentDatabase = SessionState.get().getCurrentDatabase();
         String tab_name = getUnescapedName((ASTNode) ast.getChild(0).getChild(0), currentDatabase);
-        qbp.addInsertIntoTable(tab_name);
+        qbp.addInsertIntoTable(tab_name, ast);
 
       case HiveParser.TOK_DESTINATION:
         ctx_1.dest = "insclause-" + ctx_1.nextNum;
         ctx_1.nextNum++;
         boolean isTmpFileDest = false;
         if (ast.getChildCount() > 0 && ast.getChild(0) instanceof ASTNode) {
-          ASTNode ch = (ASTNode)ast.getChild(0);
-          if (ch.getToken().getType() == HiveParser.TOK_DIR
-              && ch.getChildCount() > 0 && ch.getChild(0) instanceof ASTNode) {
-            ch = (ASTNode)ch.getChild(0);
+          ASTNode ch = (ASTNode) ast.getChild(0);
+          if (ch.getToken().getType() == HiveParser.TOK_DIR && ch.getChildCount() > 0
+              && ch.getChild(0) instanceof ASTNode) {
+            ch = (ASTNode) ch.getChild(0);
             isTmpFileDest = ch.getToken().getType() == HiveParser.TOK_TMP_FILE;
+          } else {
+            if (ast.getToken().getType() == HiveParser.TOK_DESTINATION
+                && ast.getChild(0).getType() == HiveParser.TOK_TAB) {
+              String fullTableName = getUnescapedName((ASTNode) ast.getChild(0).getChild(0),
+                  SessionState.get().getCurrentDatabase());
+              qbp.getInsertOverwriteTables().put(fullTableName, ast);
+            }
           }
         }
 
@@ -6516,6 +6529,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     DynamicPartitionCtx dpCtx = null;
     LoadTableDesc ltd = null;
     ListBucketingCtx lbCtx = null;
+    Map<String, String> partSpec = null;
 
     switch (dest_type.intValue()) {
     case QBMetaData.DEST_TABLE: {
@@ -6531,7 +6545,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
             ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
       }
 
-      Map<String, String> partSpec = qbm.getPartSpecForAlias(dest);
+      partSpec = qbm.getPartSpecForAlias(dest);
       dest_path = dest_tab.getPath();
 
       // If the query here is an INSERT_INTO and the target is an immutable table,
@@ -6875,6 +6889,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);
+ 
     inputRR = opParseCtx.get(input).getRowResolver();
 
     ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
@@ -7004,9 +7019,41 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     FileSinkOperator fso = (FileSinkOperator) output;
     fso.getConf().setTable(dest_tab);
     fsopToTable.put(fso, dest_tab);
+    // the following code is used to collect column stats when
+    // hive.stats.autogather=true
+    // and it is an insert overwrite or insert into table
+    if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
+        && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
+        && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
+      if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
+        genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
+            .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
+      } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
+        genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
+            .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
+
+      }
+    }
     return output;
   }
 
+  private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc,
+      Map<String, String> partSpec, Operator curr, boolean isInsertInto) throws SemanticException {
+    String tableName = table_desc.getTableName();
+    Table table = null;
+    try {
+      table = db.getTable(tableName);
+    } catch (HiveException e) {
+      throw new SemanticException(e.getMessage());
+    }
+    LOG.info("Generate an operator pipleline to autogather column stats for table " + tableName
+        + " in query " + ctx.getCmd());
+    ColumnStatsAutoGatherContext columnStatsAutoGatherContext = null;
+    columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(this, conf, curr, table, partSpec, isInsertInto);
+    columnStatsAutoGatherContext.insertAnalyzePipeline();
+    columnStatsAutoGatherContexts.add(columnStatsAutoGatherContext);
+  }
+
   String fixCtasColumnName(String colName) {
     return colName;
   }
@@ -10689,7 +10736,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps,
         new HashSet<JoinOperator>(joinContext.keySet()),
         new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
-        loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+        loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,
         globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
         viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
@@ -12895,4 +12942,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         String.format("Warning: %s", msg));
   }
 
+  public List<LoadFileDesc> getLoadFileWork() {
+    return loadFileWork;
+  }
+
+  public void setLoadFileWork(List<LoadFileDesc> loadFileWork) {
+    this.loadFileWork = loadFileWork;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 4049f40..4b34ebf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -20,15 +20,19 @@ package org.apache.hadoop.hive.ql.parse;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -69,6 +73,8 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
 import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
 
+import akka.util.Collections;
+
 import com.google.common.collect.Interner;
 import com.google.common.collect.Interners;
 
@@ -251,15 +257,6 @@ public abstract class TaskCompiler {
 
     generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs);
 
-    /*
-     * If the query was the result of analyze table column compute statistics rewrite, create
-     * a column stats task instead of a fetch task to persist stats to the metastore.
-     */
-    if (isCStats) {
-      genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadTableWork, loadFileWork,
-            rootTasks, outerQueryLimit);
-    }
-
     // For each task, set the key descriptor for the reducer
     for (Task<? extends Serializable> rootTask : rootTasks) {
       GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
@@ -273,6 +270,35 @@ public abstract class TaskCompiler {
 
     optimizeTaskPlan(rootTasks, pCtx, ctx);
 
+    /*
+     * If the query was the result of analyze table column compute statistics rewrite, create
+     * a column stats task instead of a fetch task to persist stats to the metastore.
+     */
+    if (isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()) {
+      Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? extends Serializable>>();
+      getLeafTasks(rootTasks, leafTasks);
+      if (isCStats) {
+        genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, outerQueryLimit, 0);
+      } else {
+        for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx
+            .getColumnStatsAutoGatherContexts()) {
+          if (!columnStatsAutoGatherContext.isInsertInto()) {
+            genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
+                columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, 0);
+          } else {
+            int numBitVector;
+            try {
+              numBitVector = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
+            } catch (Exception e) {
+              throw new SemanticException(e.getMessage());
+            }
+            genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
+                columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, numBitVector);
+          }
+        }
+      }
+    }
+
     decideExecMode(rootTasks, ctx, globalLimitCtx);
 
     if (pCtx.getQueryProperties().isCTAS() && !pCtx.getCreateTable().isMaterialization()) {
@@ -355,8 +381,9 @@ public abstract class TaskCompiler {
    * @param qb
    */
   @SuppressWarnings("unchecked")
-  protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite, List<LoadTableDesc> loadTableWork,
-      List<LoadFileDesc> loadFileWork, List<Task<? extends Serializable>> rootTasks, int outerQueryLimit) {
+  protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite,
+      List<LoadFileDesc> loadFileWork, Set<Task<? extends Serializable>> leafTasks,
+      int outerQueryLimit, int numBitVector) {
     ColumnStatsTask cStatsTask = null;
     ColumnStatsWork cStatsWork = null;
     FetchWork fetch = null;
@@ -385,12 +412,12 @@ public abstract class TaskCompiler {
     fetch = new FetchWork(loadFileWork.get(0).getSourcePath(), resultTab, outerQueryLimit);
 
     ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName,
-        colName, colType, isTblLevel);
+        colName, colType, isTblLevel, numBitVector);
     cStatsWork = new ColumnStatsWork(fetch, cStatsDesc);
     cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
-    // This is a column stats task. According to the semantic, there should be
-    // only one MR task in the rootTask.
-    rootTasks.get(0).addDependentTask(cStatsTask);
+    for (Task<? extends Serializable> tsk : leafTasks) {
+      tsk.addDependentTask(cStatsTask);
+    }
   }
 
 
@@ -398,7 +425,7 @@ public abstract class TaskCompiler {
    * Find all leaf tasks of the list of root tasks.
    */
   protected void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
-      HashSet<Task<? extends Serializable>> leaves) {
+      Set<Task<? extends Serializable>> leaves) {
 
     for (Task<? extends Serializable> root : rootTasks) {
       getLeafTasks(root, leaves);
@@ -406,7 +433,7 @@ public abstract class TaskCompiler {
   }
 
   private void getLeafTasks(Task<? extends Serializable> task,
-      HashSet<Task<? extends Serializable>> leaves) {
+      Set<Task<? extends Serializable>> leaves) {
     if (task.getDependentTasks() == null) {
       if (!leaves.contains(task)) {
         leaves.add(task);
@@ -453,7 +480,7 @@ public abstract class TaskCompiler {
     ParseContext clone = new ParseContext(queryState,
         pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(),
         pCtx.getJoinOps(), pCtx.getSmbMapJoinOps(),
-        pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getContext(),
+        pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getColumnStatsAutoGatherContexts(), pCtx.getContext(),
         pCtx.getIdToTableNameMap(), pCtx.getDestTableId(), pCtx.getUCtx(),
         pCtx.getListMapJoinOpsNoReducer(),
         pCtx.getPrunedPartitions(), pCtx.getTabNameToTabObject(), pCtx.getOpToSamplePruner(), pCtx.getGlobalLimitCtx(),

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
index 9934fdf..778d6f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
@@ -51,7 +51,7 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali
     // this map stores mapping from "big key dir" to its corresponding mapjoin
     // task.
     private HashMap<Path, Task<? extends Serializable>> dirToTaskMap;
-    private Task<? extends Serializable> noSkewTask;
+    private List<Task<? extends Serializable>> noSkewTask;
 
     /**
      * For serialization use only.
@@ -61,7 +61,7 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali
 
     public ConditionalResolverSkewJoinCtx(
         HashMap<Path, Task<? extends Serializable>> dirToTaskMap,
-        Task<? extends Serializable> noSkewTask) {
+        List<Task<? extends Serializable>> noSkewTask) {
       super();
       this.dirToTaskMap = dirToTaskMap;
       this.noSkewTask = noSkewTask;
@@ -76,11 +76,11 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali
       this.dirToTaskMap = dirToTaskMap;
     }
 
-    public Task<? extends Serializable> getNoSkewTask() {
+    public List<Task<? extends Serializable>> getNoSkewTask() {
       return noSkewTask;
     }
 
-    public void setNoSkewTask(Task<? extends Serializable> noSkewTask) {
+    public void setNoSkewTask(List<Task<? extends Serializable>> noSkewTask) {
       this.noSkewTask = noSkewTask;
     }
   }
@@ -121,7 +121,7 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali
       e.printStackTrace();
     }
     if (resTsks.isEmpty() && ctx.getNoSkewTask() != null) {
-      resTsks.add(ctx.getNoSkewTask());
+      resTsks.addAll(ctx.getNoSkewTask());
     }
     return resTsks;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/test/queries/clientpositive/autoColumnStats_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_1.q b/ql/src/test/queries/clientpositive/autoColumnStats_1.q
new file mode 100644
index 0000000..bb7252a
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_1.q
@@ -0,0 +1,192 @@
+set hive.stats.column.autogather=true;
+set hive.stats.fetch.column.stats=true;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.auto.convert.join=true;
+set hive.join.emit.interval=2;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
+set hive.optimize.bucketingsorting=false;
+
+drop table src_multi1;
+
+create table src_multi1 like src;
+
+insert overwrite table src_multi1 select * from src;
+
+explain extended select * from src_multi1;
+
+describe formatted src_multi1;
+
+drop table a;
+drop table b;
+create table a like src;
+create table b like src;
+
+from src
+insert overwrite table a select *
+insert overwrite table b select *;
+
+describe formatted a;
+describe formatted b;
+
+drop table a;
+drop table b;
+create table a like src;
+create table b like src;
+
+from src
+insert overwrite table a select *
+insert into table b select *;
+
+describe formatted a;
+describe formatted b;
+
+
+drop table src_multi2;
+
+create table src_multi2 like src;
+
+insert overwrite table src_multi2 select subq.key, src.value from (select * from src union select * from src1)subq join src on subq.key=src.key;
+
+describe formatted src_multi2;
+
+
+drop table nzhang_part14;
+
+create table if not exists nzhang_part14 (key string)
+  partitioned by (value string);
+
+insert overwrite table nzhang_part14 partition(value) 
+select key, value from (
+  select * from (select 'k1' as key, cast(null as string) as value from src limit 2)a 
+  union all
+  select * from (select 'k2' as key, '' as value from src limit 2)b
+  union all 
+  select * from (select 'k3' as key, ' ' as value from src limit 2)c
+) T;
+
+explain select key from nzhang_part14;
+
+
+drop table src5;
+
+create table src5 as select key, value from src limit 5; 
+
+insert overwrite table nzhang_part14 partition(value)
+select key, value from src5; 
+
+explain select key from nzhang_part14;
+
+
+create table alter5 ( col1 string ) partitioned by (dt string);
+
+alter table alter5 add partition (dt='a') location 'parta';
+
+describe formatted alter5 partition (dt='a');
+
+insert overwrite table alter5 partition (dt='a') select key from src ;
+
+describe formatted alter5 partition (dt='a');
+
+explain select * from alter5 where dt='a';
+
+
+drop table src_stat_part;
+create table src_stat_part(key string, value string) partitioned by (partitionId int);
+
+insert overwrite table src_stat_part partition (partitionId=1)
+select * from src1 limit 5;
+
+describe formatted src_stat_part PARTITION(partitionId=1);
+
+insert overwrite table src_stat_part partition (partitionId=2)
+select * from src1;
+
+describe formatted src_stat_part PARTITION(partitionId=2);
+
+drop table srcbucket_mapjoin;
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+drop table tab_part;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+drop table srcbucket_mapjoin_part;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+insert overwrite table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+describe formatted tab_part partition (ds='2008-04-08');
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+describe formatted tab partition (ds='2008-04-08');
+
+drop table nzhang_part14;
+
+create table if not exists nzhang_part14 (key string, value string)
+  partitioned by (ds string, hr string);
+
+describe formatted nzhang_part14;
+
+insert overwrite table nzhang_part14 partition(ds, hr) 
+select key, value, ds, hr from (
+  select * from (select 'k1' as key, cast(null as string) as value, '1' as ds, '2' as hr from src limit 2)a 
+  union all
+  select * from (select 'k2' as key, '' as value, '1' as ds, '3' as hr from src limit 2)b
+  union all 
+  select * from (select 'k3' as key, ' ' as value, '2' as ds, '1' as hr from src limit 2)c
+) T;
+
+desc formatted nzhang_part14 partition(ds='1', hr='3');
+
+
+INSERT OVERWRITE TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr)
+SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10;
+
+desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12');
+
+
+drop table nzhang_part14;
+create table if not exists nzhang_part14 (key string, value string)
+partitioned by (ds string, hr string);
+
+INSERT OVERWRITE TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr)
+SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10;
+
+desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12');
+
+drop table a;
+create table a (key string, value string)
+partitioned by (ds string, hr string);
+
+drop table b;
+create table b (key string, value string)
+partitioned by (ds string, hr string);
+
+drop table c;
+create table c (key string, value string)
+partitioned by (ds string, hr string);
+
+
+FROM srcpart 
+INSERT OVERWRITE TABLE a PARTITION (ds='2010-03-11', hr) SELECT key, value, hr WHERE ds is not null and hr>10
+INSERT OVERWRITE TABLE b PARTITION (ds='2010-04-11', hr) SELECT key, value, hr WHERE ds is not null and hr>11
+INSERT OVERWRITE TABLE c PARTITION (ds='2010-05-11', hr) SELECT key, value, hr WHERE hr>0;
+
+explain select key from a;
+explain select value from b;
+explain select key from b;
+explain select value from c;
+explain select key from c;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/test/queries/clientpositive/autoColumnStats_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_2.q b/ql/src/test/queries/clientpositive/autoColumnStats_2.q
new file mode 100644
index 0000000..c1abcb1
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_2.q
@@ -0,0 +1,214 @@
+set hive.stats.column.autogather=true;
+set hive.stats.fetch.column.stats=true;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.auto.convert.join=true;
+set hive.join.emit.interval=2;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
+set hive.optimize.bucketingsorting=false;
+
+drop table src_multi1;
+
+create table src_multi1 like src;
+
+insert into table src_multi1 select * from src;
+
+explain extended select * from src_multi1;
+
+describe formatted src_multi1;
+
+drop table a;
+drop table b;
+create table a like src;
+create table b like src;
+
+from src
+insert into table a select *
+insert into table b select *;
+
+describe formatted a key;
+describe formatted b key;
+
+from src
+insert overwrite table a select *
+insert into table b select *;
+
+describe formatted a;
+describe formatted b;
+
+describe formatted b key;
+describe formatted b value;
+
+insert into table b select NULL, NULL from src limit 10;
+
+describe formatted b key;
+describe formatted b value;
+
+insert into table b(value) select key+100000 from src limit 10;
+
+describe formatted b key;
+describe formatted b value;
+
+drop table src_multi2;
+
+create table src_multi2 like src;
+
+insert into table src_multi2 select subq.key, src.value from (select * from src union select * from src1)subq join src on subq.key=src.key;
+
+describe formatted src_multi2;
+
+
+drop table nzhang_part14;
+
+create table if not exists nzhang_part14 (key string)
+  partitioned by (value string);
+
+insert into table nzhang_part14 partition(value) 
+select key, value from (
+  select * from (select 'k1' as key, cast(null as string) as value from src limit 2)a 
+  union all
+  select * from (select 'k2' as key, '' as value from src limit 2)b
+  union all 
+  select * from (select 'k3' as key, ' ' as value from src limit 2)c
+) T;
+
+explain select key from nzhang_part14;
+
+
+drop table src5;
+
+create table src5 as select key, value from src limit 5; 
+
+insert into table nzhang_part14 partition(value)
+select key, value from src5; 
+
+explain select key from nzhang_part14;
+
+drop table alter5;
+
+create table alter5 ( col1 string ) partitioned by (dt string);
+
+alter table alter5 add partition (dt='a');
+
+describe formatted alter5 partition (dt='a');
+
+insert into table alter5 partition (dt='a') select key from src ;
+
+describe formatted alter5 partition (dt='a');
+
+explain select * from alter5 where dt='a';
+
+drop table alter5;
+
+create table alter5 ( col1 string ) partitioned by (dt string);
+
+alter table alter5 add partition (dt='a') location 'parta';
+
+describe formatted alter5 partition (dt='a');
+
+insert into table alter5 partition (dt='a') select key from src ;
+
+describe formatted alter5 partition (dt='a');
+
+explain select * from alter5 where dt='a';
+
+
+drop table src_stat_part;
+create table src_stat_part(key string, value string) partitioned by (partitionId int);
+
+insert into table src_stat_part partition (partitionId=1)
+select * from src1 limit 5;
+
+describe formatted src_stat_part PARTITION(partitionId=1);
+
+insert into table src_stat_part partition (partitionId=2)
+select * from src1;
+
+describe formatted src_stat_part PARTITION(partitionId=2);
+
+drop table srcbucket_mapjoin;
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+drop table tab_part;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+drop table srcbucket_mapjoin_part;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+insert into table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+describe formatted tab_part partition (ds='2008-04-08');
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert into table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+describe formatted tab partition (ds='2008-04-08');
+
+drop table nzhang_part14;
+
+create table if not exists nzhang_part14 (key string, value string)
+  partitioned by (ds string, hr string);
+
+describe formatted nzhang_part14;
+
+insert into table nzhang_part14 partition(ds, hr) 
+select key, value, ds, hr from (
+  select * from (select 'k1' as key, cast(null as string) as value, '1' as ds, '2' as hr from src limit 2)a 
+  union all
+  select * from (select 'k2' as key, '' as value, '1' as ds, '3' as hr from src limit 2)b
+  union all 
+  select * from (select 'k3' as key, ' ' as value, '2' as ds, '1' as hr from src limit 2)c
+) T;
+
+desc formatted nzhang_part14 partition(ds='1', hr='3');
+
+
+INSERT into TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr)
+SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10;
+
+desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12');
+
+
+drop table nzhang_part14;
+create table if not exists nzhang_part14 (key string, value string)
+partitioned by (ds string, hr string);
+
+INSERT into TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr)
+SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10;
+
+desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12');
+
+drop table a;
+create table a (key string, value string)
+partitioned by (ds string, hr string);
+
+drop table b;
+create table b (key string, value string)
+partitioned by (ds string, hr string);
+
+drop table c;
+create table c (key string, value string)
+partitioned by (ds string, hr string);
+
+
+FROM srcpart 
+INSERT into TABLE a PARTITION (ds='2010-03-11', hr) SELECT key, value, hr WHERE ds is not null and hr>10
+INSERT into TABLE b PARTITION (ds='2010-04-11', hr) SELECT key, value, hr WHERE ds is not null and hr>11
+INSERT into TABLE c PARTITION (ds='2010-05-11', hr) SELECT key, value, hr WHERE hr>0;
+
+explain select key from a;
+explain select value from b;
+explain select key from b;
+explain select value from c;
+explain select key from c;
+