You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2016/05/24 04:11:42 UTC
[7/7] hive git commit: HIVE-13566: Auto-gather column stats - phase 1
(Pengcheng Xiong, reviewed by Ashutosh Chauhan)
HIVE-13566: Auto-gather column stats - phase 1 (Pengcheng Xiong, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec4b936e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec4b936e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec4b936e
Branch: refs/heads/master
Commit: ec4b936e66db559cd7226f66d416dad02864530f
Parents: 2ed4783
Author: Pengcheng Xiong <px...@apache.org>
Authored: Mon May 23 20:22:33 2016 -0700
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Mon May 23 20:22:33 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hive/beeline/BeeLine.java | 14 +-
.../java/org/apache/hive/beeline/Commands.java | 72 +-
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +-
.../hive/beeline/TestBeeLineWithArgs.java | 24 +-
jdbc/src/java/org/apache/hive/jdbc/Utils.java | 54 +-
.../apache/hadoop/hive/ql/exec/Operator.java | 6 +-
.../hadoop/hive/ql/exec/TextRecordReader.java | 4 +-
.../hadoop/hive/ql/exec/TextRecordWriter.java | 4 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 24 -
.../hadoop/hive/ql/exec/tez/TezProcessor.java | 3 -
.../apache/hadoop/hive/ql/metadata/Hive.java | 10 -
.../physical/GenMRSkewJoinProcessor.java | 17 +-
.../physical/GenSparkSkewJoinProcessor.java | 18 +-
.../physical/SparkMapJoinResolver.java | 4 +-
.../ql/parse/ColumnStatsAutoGatherContext.java | 291 ++
.../ql/parse/ColumnStatsSemanticAnalyzer.java | 71 +-
.../hadoop/hive/ql/parse/ParseContext.java | 12 +
.../hadoop/hive/ql/parse/QBParseInfo.java | 20 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 71 +-
.../hadoop/hive/ql/parse/TaskCompiler.java | 63 +-
.../ql/plan/ConditionalResolverSkewJoin.java | 10 +-
.../queries/clientpositive/autoColumnStats_1.q | 192 ++
.../queries/clientpositive/autoColumnStats_2.q | 214 ++
.../queries/clientpositive/autoColumnStats_3.q | 67 +
.../queries/clientpositive/autoColumnStats_4.q | 20 +
.../queries/clientpositive/autoColumnStats_5.q | 47 +
.../queries/clientpositive/autoColumnStats_6.q | 41 +
.../queries/clientpositive/autoColumnStats_7.q | 19 +
.../queries/clientpositive/autoColumnStats_8.q | 27 +
.../queries/clientpositive/autoColumnStats_9.q | 22 +
.../clientpositive/autoColumnStats_1.q.out | 1379 +++++++++
.../clientpositive/autoColumnStats_2.q.out | 1500 ++++++++++
.../clientpositive/autoColumnStats_3.q.out | 420 +++
.../clientpositive/autoColumnStats_4.q.out | 260 ++
.../clientpositive/autoColumnStats_5.q.out | 664 +++++
.../clientpositive/autoColumnStats_6.q.out | 299 ++
.../clientpositive/autoColumnStats_7.q.out | 216 ++
.../clientpositive/autoColumnStats_8.q.out | 2624 ++++++++++++++++++
.../clientpositive/autoColumnStats_9.q.out | 268 ++
39 files changed, 8845 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/beeline/src/java/org/apache/hive/beeline/BeeLine.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/BeeLine.java b/beeline/src/java/org/apache/hive/beeline/BeeLine.java
index 9138613..734eeb8 100644
--- a/beeline/src/java/org/apache/hive/beeline/BeeLine.java
+++ b/beeline/src/java/org/apache/hive/beeline/BeeLine.java
@@ -93,9 +93,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.io.IOUtils;
import org.apache.hive.beeline.cli.CliOptionsProcessor;
-import org.apache.hive.jdbc.Utils;
-import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
-
/**
* A console SQL shell with command completion.
* <p>
@@ -142,6 +139,7 @@ public class BeeLine implements Closeable {
private static final Options options = new Options();
public static final String BEELINE_DEFAULT_JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
+ public static final String BEELINE_DEFAULT_JDBC_URL = "jdbc:hive2://";
public static final String DEFAULT_DATABASE_NAME = "default";
private static final String SCRIPT_OUTPUT_PREFIX = ">>>";
@@ -768,14 +766,6 @@ public class BeeLine implements Closeable {
*/
if (url != null) {
- if (user == null) {
- user = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER);
- }
-
- if (pass == null) {
- pass = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD);
- }
-
String com = constructCmd(url, user, pass, driver, false);
String comForDebug = constructCmd(url, user, pass, driver, true);
debug("issuing: " + comForDebug);
@@ -904,7 +894,7 @@ public class BeeLine implements Closeable {
}
private int embeddedConnect() {
- if (!execCommandWithPrefix("!connect " + Utils.URL_PREFIX + " '' ''")) {
+ if (!execCommandWithPrefix("!connect " + BEELINE_DEFAULT_JDBC_URL + " '' ''")) {
return ERRNO_OTHER;
} else {
return ERRNO_OK;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/beeline/src/java/org/apache/hive/beeline/Commands.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java
index 3a204c0..80703ff 100644
--- a/beeline/src/java/org/apache/hive/beeline/Commands.java
+++ b/beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -61,8 +61,6 @@ import java.util.TreeSet;
import org.apache.hadoop.hive.common.cli.ShellCmdExecutor;
import org.apache.hive.jdbc.HiveStatement;
-import org.apache.hive.jdbc.Utils;
-import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
public class Commands {
@@ -1316,41 +1314,18 @@ public class Commands {
Properties props = new Properties();
if (url != null) {
String saveUrl = getUrlToUse(url);
- props.setProperty(JdbcConnectionParams.PROPERTY_URL, url);
+ props.setProperty("url", saveUrl);
}
-
- String value = null;
if (driver != null) {
- props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, driver);
- } else {
- value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.PROPERTY_DRIVER);
- if (value != null) {
- props.setProperty(JdbcConnectionParams.PROPERTY_DRIVER, value);
- }
+ props.setProperty("driver", driver);
}
-
if (user != null) {
- props.setProperty(JdbcConnectionParams.AUTH_USER, user);
- } else {
- value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_USER);
- if (value != null) {
- props.setProperty(JdbcConnectionParams.AUTH_USER, value);
- }
+ props.setProperty("user", user);
}
-
if (pass != null) {
- props.setProperty(JdbcConnectionParams.AUTH_PASSWD, pass);
- } else {
- value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PASSWD);
- if (value != null) {
- props.setProperty(JdbcConnectionParams.AUTH_PASSWD, value);
- }
+ props.setProperty("password", pass);
}
- value = Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_TYPE);
- if (value != null) {
- props.setProperty(JdbcConnectionParams.AUTH_TYPE, value);
- }
return connect(props);
}
@@ -1403,25 +1378,26 @@ public class Commands {
public boolean connect(Properties props) throws IOException {
String url = getProperty(props, new String[] {
- JdbcConnectionParams.PROPERTY_URL,
+ "url",
"javax.jdo.option.ConnectionURL",
"ConnectionURL",
});
String driver = getProperty(props, new String[] {
- JdbcConnectionParams.PROPERTY_DRIVER,
+ "driver",
"javax.jdo.option.ConnectionDriverName",
"ConnectionDriverName",
});
String username = getProperty(props, new String[] {
- JdbcConnectionParams.AUTH_USER,
+ "user",
"javax.jdo.option.ConnectionUserName",
"ConnectionUserName",
});
String password = getProperty(props, new String[] {
- JdbcConnectionParams.AUTH_PASSWD,
+ "password",
"javax.jdo.option.ConnectionPassword",
"ConnectionPassword",
});
+ String auth = getProperty(props, new String[] {"auth"});
if (url == null || url.length() == 0) {
return beeLine.error("Property \"url\" is required");
@@ -1432,25 +1408,23 @@ public class Commands {
}
}
- String auth = getProperty(props, new String[] {JdbcConnectionParams.AUTH_TYPE});
+ beeLine.info("Connecting to " + url);
+
+ if (username == null) {
+ username = beeLine.getConsoleReader().readLine("Enter username for " + url + ": ");
+ }
+ props.setProperty("user", username);
+ if (password == null) {
+ password = beeLine.getConsoleReader().readLine("Enter password for " + url + ": ",
+ new Character('*'));
+ }
+ props.setProperty("password", password);
+
if (auth == null) {
auth = beeLine.getOpts().getAuthType();
- if (auth != null) {
- props.setProperty(JdbcConnectionParams.AUTH_TYPE, auth);
- }
}
-
- beeLine.info("Connecting to " + url);
- if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PRINCIPAL) == null) {
- if (username == null) {
- username = beeLine.getConsoleReader().readLine("Enter username for " + url + ": ");
- }
- props.setProperty(JdbcConnectionParams.AUTH_USER, username);
- if (password == null) {
- password = beeLine.getConsoleReader().readLine("Enter password for " + url + ": ",
- new Character('*'));
- }
- props.setProperty(JdbcConnectionParams.AUTH_PASSWD, password);
+ if (auth != null) {
+ props.setProperty("auth", auth);
}
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c0843b9..ed20069 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1519,7 +1519,9 @@ public class HiveConf extends Configuration {
// Statistics
HIVESTATSAUTOGATHER("hive.stats.autogather", true,
- "A flag to gather statistics automatically during the INSERT OVERWRITE command."),
+ "A flag to gather statistics (only basic) automatically during the INSERT OVERWRITE command."),
+ HIVESTATSCOLAUTOGATHER("hive.stats.column.autogather", false,
+ "A flag to gather column statistics automatically."),
HIVESTATSDBCLASS("hive.stats.dbclass", "fs", new PatternSet("custom", "fs"),
"The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'), \n" +
"each task writes statistics it has collected in a file on the filesystem, which will be aggregated \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
index ecfeddb..f9909ad 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
@@ -39,7 +39,6 @@ import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.jdbc.Utils;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -176,7 +175,6 @@ public class TestBeeLineWithArgs {
// Put the script content in a temp file
File scriptFile = File.createTempFile(this.getClass().getSimpleName(), "temp");
- System.out.println("script file is " + scriptFile.getAbsolutePath());
scriptFile.deleteOnExit();
PrintStream os = new PrintStream(new FileOutputStream(scriptFile));
os.print(scriptText);
@@ -657,7 +655,7 @@ public class TestBeeLineWithArgs {
@Test
public void testEmbeddedBeelineConnection() throws Throwable{
- String embeddedJdbcURL = Utils.URL_PREFIX+"/Default";
+ String embeddedJdbcURL = BeeLine.BEELINE_DEFAULT_JDBC_URL+"/Default";
List<String> argList = getBaseArgs(embeddedJdbcURL);
argList.add("--hivevar");
argList.add("DUMMY_TBL=embedded_table");
@@ -772,7 +770,7 @@ public class TestBeeLineWithArgs {
@Test
public void testEmbeddedBeelineOutputs() throws Throwable{
- String embeddedJdbcURL = Utils.URL_PREFIX+"/Default";
+ String embeddedJdbcURL = BeeLine.BEELINE_DEFAULT_JDBC_URL+"/Default";
List<String> argList = getBaseArgs(embeddedJdbcURL);
// Set to non-zk lock manager to avoid trying to connect to zookeeper
final String SCRIPT_TEXT =
@@ -845,22 +843,4 @@ public class TestBeeLineWithArgs {
}
- /**
- * Attempt to execute a simple script file with the usage of user & password variables in URL.
- * Test for presence of an expected pattern
- * in the output (stdout or stderr), fail if not found
- * Print PASSED or FAILED
- */
- @Test
- public void testConnectionWithURLParams() throws Throwable {
- final String EXPECTED_PATTERN = " hivetest ";
- List<String> argList = new ArrayList<String>();
- argList.add("-d");
- argList.add(BeeLine.BEELINE_DEFAULT_JDBC_DRIVER);
- argList.add("-u");
- argList.add(miniHS2.getBaseJdbcURL() + ";user=hivetest;password=hive");
- String SCRIPT_TEXT = "select current_user();";
-
- testScriptFile( SCRIPT_TEXT, EXPECTED_PATTERN, true, argList);
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/jdbc/src/java/org/apache/hive/jdbc/Utils.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 7ea6309..42181d7 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -37,12 +37,12 @@ import org.apache.http.cookie.Cookie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Utils {
+class Utils {
static final Logger LOG = LoggerFactory.getLogger(Utils.class.getName());
/**
* The required prefix for the connection URL.
*/
- public static final String URL_PREFIX = "jdbc:hive2://";
+ static final String URL_PREFIX = "jdbc:hive2://";
/**
* If host is provided, without a port.
@@ -63,7 +63,7 @@ public class Utils {
static final String HIVE_SERVER2_RETRY_TRUE = "true";
static final String HIVE_SERVER2_RETRY_FALSE = "false";
- public static class JdbcConnectionParams {
+ static class JdbcConnectionParams {
// Note on client side parameter naming convention:
// Prefer using a shorter camelCase param name instead of using the same name as the
// corresponding
@@ -76,33 +76,31 @@ public class Utils {
// Retry setting
static final String RETRIES = "retries";
- public static final String AUTH_TYPE = "auth";
+ static final String AUTH_TYPE = "auth";
// We're deprecating this variable's name.
- public static final String AUTH_QOP_DEPRECATED = "sasl.qop";
- public static final String AUTH_QOP = "saslQop";
- public static final String AUTH_SIMPLE = "noSasl";
- public static final String AUTH_TOKEN = "delegationToken";
- public static final String AUTH_USER = "user";
- public static final String AUTH_PRINCIPAL = "principal";
- public static final String AUTH_PASSWD = "password";
- public static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
- public static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
- public static final String ANONYMOUS_USER = "anonymous";
- public static final String ANONYMOUS_PASSWD = "anonymous";
- public static final String USE_SSL = "ssl";
- public static final String SSL_TRUST_STORE = "sslTrustStore";
- public static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
+ static final String AUTH_QOP_DEPRECATED = "sasl.qop";
+ static final String AUTH_QOP = "saslQop";
+ static final String AUTH_SIMPLE = "noSasl";
+ static final String AUTH_TOKEN = "delegationToken";
+ static final String AUTH_USER = "user";
+ static final String AUTH_PRINCIPAL = "principal";
+ static final String AUTH_PASSWD = "password";
+ static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
+ static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
+ static final String ANONYMOUS_USER = "anonymous";
+ static final String ANONYMOUS_PASSWD = "anonymous";
+ static final String USE_SSL = "ssl";
+ static final String SSL_TRUST_STORE = "sslTrustStore";
+ static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
// We're deprecating the name and placement of this in the parsed map (from hive conf vars to
// hive session vars).
static final String TRANSPORT_MODE_DEPRECATED = "hive.server2.transport.mode";
- public static final String TRANSPORT_MODE = "transportMode";
+ static final String TRANSPORT_MODE = "transportMode";
// We're deprecating the name and placement of this in the parsed map (from hive conf vars to
// hive session vars).
static final String HTTP_PATH_DEPRECATED = "hive.server2.thrift.http.path";
- public static final String HTTP_PATH = "httpPath";
- public static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
- public static final String PROPERTY_DRIVER = "driver";
- public static final String PROPERTY_URL = "url";
+ static final String HTTP_PATH = "httpPath";
+ static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
// Don't use dynamic service discovery
static final String SERVICE_DISCOVERY_MODE_NONE = "none";
// Use ZooKeeper for indirection while using dynamic service discovery
@@ -633,14 +631,4 @@ public class Utils {
}
return true;
}
-
- public static String parsePropertyFromUrl(final String url, final String key) {
- String[] tokens = url.split(";");
- for (String token : tokens) {
- if (token.trim().startsWith(key.trim() + "=")) {
- return token.trim().substring((key.trim() + "=").length());
- }
- }
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 00552a8..636f079 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -406,11 +406,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
}
}
if (asyncEx != null) {
- if (asyncEx instanceof Exception) {
- throw new HiveException("Async initialization failed", asyncEx);
- } else {
- throw (Error) asyncEx;
- }
+ throw new HiveException("Async initialization failed", asyncEx);
}
completeInitializationOp(os);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
index 47ab9c2..8319f11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
@@ -40,14 +40,12 @@ public class TextRecordReader implements RecordReader {
private InputStream in;
private Text row;
private Configuration conf;
- private boolean escape;
public void initialize(InputStream in, Configuration conf, Properties tbl)
throws IOException {
lineReader = new LineReader(in, conf);
this.in = in;
this.conf = conf;
- escape = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE);
}
public Writable createRow() throws IOException {
@@ -62,7 +60,7 @@ public class TextRecordReader implements RecordReader {
int bytesConsumed = lineReader.readLine((Text) row);
- if (escape) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE)) {
return HiveUtils.unescapeText((Text) row);
}
return bytesConsumed;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
index f15458d..10b4594 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
@@ -35,20 +35,18 @@ public class TextRecordWriter implements RecordWriter {
private OutputStream out;
private Configuration conf;
- private boolean escape;
public void initialize(OutputStream out, Configuration conf)
throws IOException {
this.out = out;
this.conf = conf;
- escape = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE);
}
public void write(Writable row) throws IOException {
Text text = (Text) row;
Text escapeText = text;
- if (escape) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPE)) {
escapeText = HiveUtils.escapeText(text);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 8144c3b..7082931 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec;
import java.util.ArrayList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import java.beans.DefaultPersistenceDelegate;
import java.beans.Encoder;
import java.beans.Expression;
@@ -97,7 +96,6 @@ import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -196,7 +194,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoException;
import com.google.common.base.Preconditions;
/**
@@ -439,7 +436,6 @@ public final class Utilities {
throw new RuntimeException("Unknown work type: " + name);
}
}
-
gWorkMap.get(conf).put(path, gWork);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Found plan in cache for name: " + name);
@@ -450,16 +446,6 @@ public final class Utilities {
LOG.debug("File not found: " + fnf.getMessage());
LOG.info("No plan file found: "+path);
return null;
- } catch (KryoException ke) {
- Throwable cnfThrowable = findClassNotFoundException(ke);
- if (LlapProxy.isDaemon() && (cnfThrowable != null)) {
- LOG.error("Missing class \"" + cnfThrowable.getMessage() + "\". If this is a UDF and you " +
- "are running LLAP, you may need to regenerate the llap startup script and restart " +
- "llap with jars for your udf.", cnfThrowable);
- throw new RuntimeException("Cannot find \"" + cnfThrowable.getMessage() + "\" You may" +
- " need to regenerate the LLAP startup script and restart llap daemons.", cnfThrowable);
- }
- throw new RuntimeException(ke);
} catch (Exception e) {
String msg = "Failed to load plan: " + path + ": " + e;
LOG.error(msg, e);
@@ -474,16 +460,6 @@ public final class Utilities {
}
}
- private static Throwable findClassNotFoundException(Throwable ke) {
- while (ke != null) {
- if (ke instanceof ClassNotFoundException) {
- return ke;
- }
- ke = ke.getCause();
- }
- return null;
- }
-
public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
try {
Graph stageGraph = plan.getQueryPlan().getStageGraph();
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index a33b6e2..c560f37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -39,7 +39,6 @@ import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.library.api.KeyValueWriter;
/**
@@ -179,8 +178,6 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
} finally {
if (originalThrowable != null && originalThrowable instanceof Error) {
LOG.error(StringUtils.stringifyException(originalThrowable));
- getContext().reportFailure(TaskFailureType.FATAL, originalThrowable,
- "Cannot recover from this error");
throw new RuntimeException(originalThrowable);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index d9f58f2..3fa1233 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2860,16 +2860,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (destIsSubDir) {
FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
- if (inheritPerms) {
- try {
- HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, false);
- } catch (IOException e) {
- String msg = "Error setting permission of file " + destf;
- LOG.error(msg);
- throw new HiveException(msg, e);
- }
- }
-
List<Future<Void>> futures = new LinkedList<>();
final ExecutorService pool = Executors.newFixedThreadPool(
conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index 9fbbd4c..f41fa4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -117,12 +117,6 @@ public final class GenMRSkewJoinProcessor {
}
List<Task<? extends Serializable>> children = currTask.getChildTasks();
- if (children != null && children.size() > 1) {
- throw new SemanticException("Should not happened");
- }
-
- Task<? extends Serializable> child =
- children != null && children.size() == 1 ? children.get(0) : null;
Path baseTmpDir = parseCtx.getContext().getMRTmpPath();
@@ -347,13 +341,14 @@ public final class GenMRSkewJoinProcessor {
tsk.addDependentTask(oldChild);
}
}
- }
- if (child != null) {
- currTask.removeDependentTask(child);
- listTasks.add(child);
+ currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>());
+ for (Task<? extends Serializable> oldChild : children) {
+ oldChild.getParentTasks().remove(currTask);
+ }
+ listTasks.addAll(children);
}
ConditionalResolverSkewJoinCtx context =
- new ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, child);
+ new ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, children);
ConditionalWork cndWork = new ConditionalWork(listWorks);
ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index 11ec07a..ded9231 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -93,9 +93,6 @@ public class GenSparkSkewJoinProcessor {
List<Task<? extends Serializable>> children = currTask.getChildTasks();
- Task<? extends Serializable> child =
- children != null && children.size() == 1 ? children.get(0) : null;
-
Path baseTmpDir = parseCtx.getContext().getMRTmpPath();
JoinDesc joinDescriptor = joinOp.getConf();
@@ -334,14 +331,17 @@ public class GenSparkSkewJoinProcessor {
tsk.addDependentTask(oldChild);
}
}
- }
- if (child != null) {
- currTask.removeDependentTask(child);
- listTasks.add(child);
- listWorks.add(child.getWork());
+ currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>());
+ for (Task<? extends Serializable> oldChild : children) {
+ oldChild.getParentTasks().remove(currTask);
+ }
+ listTasks.addAll(children);
+ for (Task<? extends Serializable> oldChild : children) {
+ listWorks.add(oldChild.getWork());
+ }
}
ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx context =
- new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, child);
+ new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, children);
ConditionalWork cndWork = new ConditionalWork(listWorks);
ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
index 8e56263..a3ec990 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
@@ -411,7 +411,9 @@ public class SparkMapJoinResolver implements PhysicalPlanResolver {
context.setDirToTaskMap(newbigKeysDirToTaskMap);
// update no skew task
if (context.getNoSkewTask() != null && context.getNoSkewTask().equals(originalTask)) {
- context.setNoSkewTask(newTask);
+ List<Task<? extends Serializable>> noSkewTask = new ArrayList<>();
+ noSkewTask.add(newTask);
+ context.setNoSkewTask(noSkewTask);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
new file mode 100644
index 0000000..15a47dc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * ColumnStatsAutoGatherContext: This is passed to the compiler when set
+ * hive.stats.autogather=true during the INSERT OVERWRITE command.
+ *
+ **/
+
+public class ColumnStatsAutoGatherContext {
+
+ public AnalyzeRewriteContext analyzeRewrite;
+ private final List<LoadFileDesc> loadFileWork = new ArrayList<>();
+ private final SemanticAnalyzer sa;
+ private final HiveConf conf;
+ private final Operator<? extends OperatorDesc> op;
+ private final List<FieldSchema> columns;
+ private final List<FieldSchema> partitionColumns;
+ private boolean isInsertInto;
+ private Table tbl;
+ private Map<String, String> partSpec;
+
+ public ColumnStatsAutoGatherContext(SemanticAnalyzer sa, HiveConf conf,
+ Operator<? extends OperatorDesc> op, Table tbl, Map<String, String> partSpec,
+ boolean isInsertInto) throws SemanticException {
+ super();
+ this.sa = sa;
+ this.conf = conf;
+ this.op = op;
+ this.tbl = tbl;
+ this.partSpec = partSpec;
+ this.isInsertInto = isInsertInto;
+ columns = tbl.getCols();
+ partitionColumns = tbl.getPartCols();
+ }
+
+ public List<LoadFileDesc> getLoadFileWork() {
+ return loadFileWork;
+ }
+
+ public AnalyzeRewriteContext getAnalyzeRewrite() {
+ return analyzeRewrite;
+ }
+
+ public void setAnalyzeRewrite(AnalyzeRewriteContext analyzeRewrite) {
+ this.analyzeRewrite = analyzeRewrite;
+ }
+
+ public void insertAnalyzePipeline() throws SemanticException{
+ // 1. Generate the statement of analyze table [tablename] compute statistics for columns
+ // In non-partitioned table case, it will generate TS-SEL-GBY-RS-GBY-SEL-FS operator
+ // In static-partitioned table case, it will generate TS-FIL(partitionKey)-SEL-GBY(partitionKey)-RS-GBY-SEL-FS operator
+ // In dynamic-partitioned table case, it will generate TS-SEL-GBY(partitionKey)-RS-GBY-SEL-FS operator
+ // However, we do not need to specify the partition-spec because (1) the data is going to be inserted to that specific partition
+ // (2) we can compose the static/dynamic partition using a select operator in replaceSelectOperatorProcess..
+ String analyzeCommand = "analyze table `" + tbl.getDbName() + "`.`" + tbl.getTableName() + "`"
+ + " compute statistics for columns ";
+
+ // 2. Based on the statement, generate the selectOperator
+ Operator<?> selOp = null;
+ try {
+ selOp = genSelOpForAnalyze(analyzeCommand);
+ } catch (IOException | ParseException e) {
+ throw new SemanticException(e);
+ }
+
+ // 3. attach this SEL to the operator right before FS
+ op.getChildOperators().add(selOp);
+ selOp.getParentOperators().clear();
+ selOp.getParentOperators().add(op);
+
+ // 4. address the colExp, colList, etc for the SEL
+ try {
+ replaceSelectOperatorProcess((SelectOperator)selOp, op);
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private Operator genSelOpForAnalyze(String analyzeCommand) throws IOException, ParseException, SemanticException{
+ //0. initialization
+ Context ctx = new Context(conf);
+ ParseDriver pd = new ParseDriver();
+ ASTNode tree = pd.parse(analyzeCommand, ctx);
+ tree = ParseUtils.findRootNonNullToken(tree);
+
+ //1. get the ColumnStatsSemanticAnalyzer
+ BaseSemanticAnalyzer baseSem = SemanticAnalyzerFactory.get(new QueryState(conf), tree);
+ ColumnStatsSemanticAnalyzer colSem = (ColumnStatsSemanticAnalyzer) baseSem;
+
+ //2. get the rewritten AST
+ ASTNode ast = colSem.rewriteAST(tree, this);
+ baseSem = SemanticAnalyzerFactory.get(new QueryState(conf), ast);
+ SemanticAnalyzer sem = (SemanticAnalyzer) baseSem;
+ QB qb = new QB(null, null, false);
+ ASTNode child = ast;
+ ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext();
+ subPCtx.setContext(ctx);
+ ((SemanticAnalyzer) sem).initParseCtx(subPCtx);
+ sem.doPhase1(child, qb, sem.initPhase1Ctx(), null);
+ // This will trigger new calls to metastore to collect metadata
+ // TODO: cache the information from the metastore
+ sem.getMetaData(qb);
+ Operator<?> operator = sem.genPlan(qb);
+
+ //3. populate the load file work so that ColumnStatsTask can work
+ loadFileWork.addAll(sem.getLoadFileWork());
+
+ //4. because there is only one TS for analyze statement, we can get it.
+ if (sem.topOps.values().size() != 1) {
+ throw new SemanticException(
+ "ColumnStatsAutoGatherContext is expecting exactly one TS, but finds "
+ + sem.topOps.values().size());
+ }
+ operator = sem.topOps.values().iterator().next();
+
+ //5. get the first SEL after TS
+ while(!(operator instanceof SelectOperator)){
+ operator = operator.getChildOperators().get(0);
+ }
+ return operator;
+ }
+
+ /**
+ * @param operator : the select operator in the analyze statement
+ * @param input : the operator right before FS in the insert overwrite statement
+ * @throws HiveException
+ */
+ private void replaceSelectOperatorProcess(SelectOperator operator, Operator<? extends OperatorDesc> input)
+ throws HiveException {
+ RowSchema selRS = operator.getSchema();
+ ArrayList<ColumnInfo> signature = new ArrayList<>();
+ OpParseContext inputCtx = sa.opParseCtx.get(input);
+ RowResolver inputRR = inputCtx.getRowResolver();
+ ArrayList<ColumnInfo> columns = inputRR.getColumnInfos();
+ ArrayList<ExprNodeDesc> colList = new ArrayList<ExprNodeDesc>();
+ ArrayList<String> columnNames = new ArrayList<String>();
+ Map<String, ExprNodeDesc> columnExprMap =
+ new HashMap<String, ExprNodeDesc>();
+ // the column positions in the operator should be like this
+ // <----non-partition columns---->|<--static partition columns-->|<--dynamic partition columns-->
+ // ExprNodeColumnDesc | ExprNodeConstantDesc | ExprNodeColumnDesc
+ // from input | generate itself | from input
+ // |
+
+ // 1. deal with non-partition columns
+ for (int i = 0; i < this.columns.size(); i++) {
+ ColumnInfo col = columns.get(i);
+ ExprNodeDesc exprNodeDesc = new ExprNodeColumnDesc(col);
+ colList.add(exprNodeDesc);
+ String internalName = selRS.getColumnNames().get(i);
+ columnNames.add(internalName);
+ columnExprMap.put(internalName, exprNodeDesc);
+ signature.add(selRS.getSignature().get(i));
+ }
+ // if there is any partition column (in static partition or dynamic
+ // partition or mixed case)
+ int dynamicPartBegin = -1;
+ for (int i = 0; i < partitionColumns.size(); i++) {
+ ExprNodeDesc exprNodeDesc = null;
+ String partColName = partitionColumns.get(i).getName();
+ // 2. deal with static partition columns
+ if (partSpec != null && partSpec.containsKey(partColName)
+ && partSpec.get(partColName) != null) {
+ if (dynamicPartBegin > 0) {
+ throw new SemanticException(
+ "Dynamic partition columns should not come before static partition columns.");
+ }
+ exprNodeDesc = new ExprNodeConstantDesc(partSpec.get(partColName));
+ TypeInfo srcType = exprNodeDesc.getTypeInfo();
+ TypeInfo destType = selRS.getSignature().get(this.columns.size() + i).getType();
+ if (!srcType.equals(destType)) {
+ // This may be possible when srcType is string but destType is integer
+ exprNodeDesc = ParseUtils
+ .createConversionCast(exprNodeDesc, (PrimitiveTypeInfo) destType);
+ }
+ }
+ // 3. dynamic partition columns
+ else {
+ dynamicPartBegin++;
+ ColumnInfo col = columns.get(this.columns.size() + dynamicPartBegin);
+ TypeInfo srcType = col.getType();
+ TypeInfo destType = selRS.getSignature().get(this.columns.size() + i).getType();
+ exprNodeDesc = new ExprNodeColumnDesc(col);
+ if (!srcType.equals(destType)) {
+ exprNodeDesc = ParseUtils
+ .createConversionCast(exprNodeDesc, (PrimitiveTypeInfo) destType);
+ }
+ }
+ colList.add(exprNodeDesc);
+ String internalName = selRS.getColumnNames().get(this.columns.size() + i);
+ columnNames.add(internalName);
+ columnExprMap.put(internalName, exprNodeDesc);
+ signature.add(selRS.getSignature().get(this.columns.size() + i));
+ }
+ operator.setConf(new SelectDesc(colList, columnNames));
+ operator.setColumnExprMap(columnExprMap);
+ selRS.setSignature(signature);
+ operator.setSchema(selRS);
+ }
+
+ public String getCompleteName() {
+ return tbl.getDbName() + "." + tbl.getTableName();
+ }
+
+ public boolean isInsertInto() {
+ return isInsertInto;
+ }
+
+ public static boolean canRunAutogatherStats(Operator curr) {
+ // check the ObjectInspector
+ for (ColumnInfo cinfo : curr.getSchema().getSignature()) {
+ if (cinfo.getIsVirtualCol()) {
+ return false;
+ } else if (cinfo.getObjectInspector().getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ return false;
+ } else {
+ switch (((PrimitiveTypeInfo) cinfo.getType()).getPrimitiveCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case TIMESTAMP:
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case DECIMAL:
+ // TODO: Support case DATE:
+ break;
+ default:
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
index 3b6cbce..d3aef41 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
@@ -110,11 +110,18 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
partValsSpecified += partSpec.get(partKey) == null ? 0 : 1;
}
try {
- if ((partValsSpecified == tbl.getPartitionKeys().size()) && (db.getPartition(tbl, partSpec, false, null, false) == null)) {
- throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + " : " + partSpec);
+ // for static partition, it may not exist when HIVESTATSCOLAUTOGATHER is
+ // set to true
+ if (!conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)) {
+ if ((partValsSpecified == tbl.getPartitionKeys().size())
+ && (db.getPartition(tbl, partSpec, false, null, false) == null)) {
+ throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg()
+ + " : " + partSpec);
+ }
}
} catch (HiveException he) {
- throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + " : " + partSpec);
+ throw new SemanticException(ErrorMsg.COLUMNSTATSCOLLECTOR_INVALID_PARTITION.getMsg() + " : "
+ + partSpec);
}
// User might have only specified partial list of partition keys, in which case add other partition keys in partSpec
@@ -157,7 +164,7 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
} else {
groupByClause.append(",");
}
- groupByClause.append(fs.getName());
+ groupByClause.append("`" + fs.getName() + "`");
}
// attach the predicate and group by to the return clause
@@ -235,12 +242,12 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
if (isPartitionStats) {
for (FieldSchema fs : tbl.getPartCols()) {
- rewrittenQueryBuilder.append(" , " + fs.getName());
+ rewrittenQueryBuilder.append(" , `" + fs.getName() + "`");
}
}
- rewrittenQueryBuilder.append(" from ");
+ rewrittenQueryBuilder.append(" from `");
rewrittenQueryBuilder.append(tbl.getDbName());
- rewrittenQueryBuilder.append(".");
+ rewrittenQueryBuilder.append("`.");
rewrittenQueryBuilder.append("`" + tbl.getTableName() + "`");
isRewritten = true;
@@ -378,4 +385,54 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer {
analyzeInternal(originalTree);
}
}
+
+ /**
+ * @param ast
+ * is the original analyze ast
+ * @param qb
+ * is the qb that calls this function
+ * @param sem
+ * is the semantic analyzer that calls this function
+ * @return
+ * @throws SemanticException
+ */
+ public ASTNode rewriteAST(ASTNode ast, ColumnStatsAutoGatherContext context)
+ throws SemanticException {
+ tbl = AnalyzeCommandUtils.getTable(ast, this);
+ colNames = getColumnName(ast);
+ // Save away the original AST
+ originalTree = ast;
+ boolean isPartitionStats = AnalyzeCommandUtils.isPartitionLevelStats(ast);
+ Map<String, String> partSpec = null;
+ checkForPartitionColumns(colNames,
+ Utilities.getColumnNamesFromFieldSchema(tbl.getPartitionKeys()));
+ validateSpecifiedColumnNames(colNames);
+ if (conf.getBoolVar(ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS) && tbl.isPartitioned()) {
+ isPartitionStats = true;
+ }
+
+ if (isPartitionStats) {
+ isTableLevel = false;
+ partSpec = AnalyzeCommandUtils.getPartKeyValuePairsFromAST(tbl, ast, conf);
+ handlePartialPartitionSpec(partSpec);
+ } else {
+ isTableLevel = true;
+ }
+ colType = getColumnTypes(colNames);
+ int numBitVectors = 0;
+ try {
+ numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
+ } catch (Exception e) {
+ throw new SemanticException(e.getMessage());
+ }
+ rewrittenQuery = genRewrittenQuery(colNames, numBitVectors, partSpec, isPartitionStats);
+ rewrittenTree = genRewrittenTree(rewrittenQuery);
+
+ context.analyzeRewrite = new AnalyzeRewriteContext();
+ context.analyzeRewrite.setTableName(tbl.getDbName() + "." + tbl.getTableName());
+ context.analyzeRewrite.setTblLvl(isTableLevel);
+ context.analyzeRewrite.setColName(colNames);
+ context.analyzeRewrite.setColType(colType);
+ return rewrittenTree;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 96ef20d..b2125ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -80,6 +80,7 @@ public class ParseContext {
private HashMap<String, SplitSample> nameToSplitSample;
private List<LoadTableDesc> loadTableWork;
private List<LoadFileDesc> loadFileWork;
+ private List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts;
private Context ctx;
private QueryState queryState;
private HiveConf conf;
@@ -166,6 +167,7 @@ public class ParseContext {
Set<JoinOperator> joinOps,
Set<SMBMapJoinOperator> smbMapJoinOps,
List<LoadTableDesc> loadTableWork, List<LoadFileDesc> loadFileWork,
+ List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts,
Context ctx, HashMap<String, String> idToTableNameMap, int destTableId,
UnionProcContext uCtx, List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinOpsNoReducer,
Map<String, PrunedPartitionList> prunedPartitions,
@@ -188,6 +190,7 @@ public class ParseContext {
this.smbMapJoinOps = smbMapJoinOps;
this.loadFileWork = loadFileWork;
this.loadTableWork = loadTableWork;
+ this.columnStatsAutoGatherContexts = columnStatsAutoGatherContexts;
this.topOps = topOps;
this.ctx = ctx;
this.idToTableNameMap = idToTableNameMap;
@@ -608,4 +611,13 @@ public class ParseContext {
public Map<String, Table> getTabNameToTabObject() {
return tabNameToTabObject;
}
+
+ public List<ColumnStatsAutoGatherContext> getColumnStatsAutoGatherContexts() {
+ return columnStatsAutoGatherContexts;
+ }
+
+ public void setColumnStatsAutoGatherContexts(
+ List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts) {
+ this.columnStatsAutoGatherContexts = columnStatsAutoGatherContexts;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index 3a226e7..3a0402e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -63,7 +63,9 @@ public class QBParseInfo {
private final Set<String> destCubes;
private final Set<String> destGroupingSets;
private final Map<String, ASTNode> destToHaving;
- private final HashSet<String> insertIntoTables;
+ // insertIntoTables/insertOverwriteTables map a table's fullName to its ast;
+ private final Map<String, ASTNode> insertIntoTables;
+ private final Map<String, ASTNode> insertOverwriteTables;
private boolean isAnalyzeCommand; // used for the analyze command (statistics)
private boolean isNoScanAnalyzeCommand; // used for the analyze command (statistics) (noscan)
@@ -133,7 +135,8 @@ public class QBParseInfo {
destToSortby = new HashMap<String, ASTNode>();
destToOrderby = new HashMap<String, ASTNode>();
destToLimit = new HashMap<String, SimpleEntry<Integer, Integer>>();
- insertIntoTables = new HashSet<String>();
+ insertIntoTables = new HashMap<String, ASTNode>();
+ insertOverwriteTables = new HashMap<String, ASTNode>();
destRollups = new HashSet<String>();
destCubes = new HashSet<String>();
destGroupingSets = new HashSet<String>();
@@ -174,13 +177,13 @@ public class QBParseInfo {
}
}
- public void addInsertIntoTable(String fullName) {
- insertIntoTables.add(fullName.toLowerCase());
+ public void addInsertIntoTable(String fullName, ASTNode ast) {
+ insertIntoTables.put(fullName.toLowerCase(), ast);
}
public boolean isInsertIntoTable(String dbName, String table) {
String fullName = dbName + "." + table;
- return insertIntoTables.contains(fullName.toLowerCase());
+ return insertIntoTables.containsKey(fullName.toLowerCase());
}
/**
@@ -189,7 +192,7 @@ public class QBParseInfo {
* @return
*/
public boolean isInsertIntoTable(String fullTableName) {
- return insertIntoTables.contains(fullTableName.toLowerCase());
+ return insertIntoTables.containsKey(fullTableName.toLowerCase());
}
public HashMap<String, ASTNode> getAggregationExprsForClause(String clause) {
@@ -636,6 +639,11 @@ public class QBParseInfo {
public void setPartialScanAnalyzeCommand(boolean isPartialScanAnalyzeCommand) {
this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand;
}
+
+ public Map<String, ASTNode> getInsertOverwriteTables() {
+ return insertOverwriteTables;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 7162c08..6937308 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -129,6 +130,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverterPostProc;
import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec.SpecType;
import org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher;
@@ -182,6 +184,7 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -259,6 +262,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
protected LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx;
private List<LoadTableDesc> loadTableWork;
private List<LoadFileDesc> loadFileWork;
+ private List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts;
private final Map<JoinOperator, QBJoinTree> joinContext;
private final Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
private final HashMap<TableScanOperator, Table> topToTable;
@@ -353,6 +357,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
topOps = new LinkedHashMap<String, TableScanOperator>();
loadTableWork = new ArrayList<LoadTableDesc>();
loadFileWork = new ArrayList<LoadFileDesc>();
+ columnStatsAutoGatherContexts = new ArrayList<ColumnStatsAutoGatherContext>();
opParseCtx = new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
joinContext = new HashMap<JoinOperator, QBJoinTree>();
smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>();
@@ -390,6 +395,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
tabNameToTabObject.clear();
loadTableWork.clear();
loadFileWork.clear();
+ columnStatsAutoGatherContexts.clear();
topOps.clear();
destTableId = 1;
idToTableNameMap.clear();
@@ -448,7 +454,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
return new ParseContext(queryState, opToPartPruner, opToPartList, topOps,
new HashSet<JoinOperator>(joinContext.keySet()),
new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
- loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
@@ -1401,18 +1407,25 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
case HiveParser.TOK_INSERT_INTO:
String currentDatabase = SessionState.get().getCurrentDatabase();
String tab_name = getUnescapedName((ASTNode) ast.getChild(0).getChild(0), currentDatabase);
- qbp.addInsertIntoTable(tab_name);
+ qbp.addInsertIntoTable(tab_name, ast);
case HiveParser.TOK_DESTINATION:
ctx_1.dest = "insclause-" + ctx_1.nextNum;
ctx_1.nextNum++;
boolean isTmpFileDest = false;
if (ast.getChildCount() > 0 && ast.getChild(0) instanceof ASTNode) {
- ASTNode ch = (ASTNode)ast.getChild(0);
- if (ch.getToken().getType() == HiveParser.TOK_DIR
- && ch.getChildCount() > 0 && ch.getChild(0) instanceof ASTNode) {
- ch = (ASTNode)ch.getChild(0);
+ ASTNode ch = (ASTNode) ast.getChild(0);
+ if (ch.getToken().getType() == HiveParser.TOK_DIR && ch.getChildCount() > 0
+ && ch.getChild(0) instanceof ASTNode) {
+ ch = (ASTNode) ch.getChild(0);
isTmpFileDest = ch.getToken().getType() == HiveParser.TOK_TMP_FILE;
+ } else {
+ if (ast.getToken().getType() == HiveParser.TOK_DESTINATION
+ && ast.getChild(0).getType() == HiveParser.TOK_TAB) {
+ String fullTableName = getUnescapedName((ASTNode) ast.getChild(0).getChild(0),
+ SessionState.get().getCurrentDatabase());
+ qbp.getInsertOverwriteTables().put(fullTableName, ast);
+ }
}
}
@@ -6516,6 +6529,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
DynamicPartitionCtx dpCtx = null;
LoadTableDesc ltd = null;
ListBucketingCtx lbCtx = null;
+ Map<String, String> partSpec = null;
switch (dest_type.intValue()) {
case QBMetaData.DEST_TABLE: {
@@ -6531,7 +6545,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
}
- Map<String, String> partSpec = qbm.getPartSpecForAlias(dest);
+ partSpec = qbm.getPartSpecForAlias(dest);
dest_path = dest_tab.getPath();
// If the query here is an INSERT_INTO and the target is an immutable table,
@@ -6875,6 +6889,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);
+
inputRR = opParseCtx.get(input).getRowResolver();
ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
@@ -7004,9 +7019,41 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
FileSinkOperator fso = (FileSinkOperator) output;
fso.getConf().setTable(dest_tab);
fsopToTable.put(fso, dest_tab);
+ // the following code is used to collect column stats when
+ // hive.stats.autogather=true
+ // and it is an insert overwrite or insert into table
+ if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
+ && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
+ && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
+ if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
+ genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
+ .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
+ } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
+ genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
+ .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
+
+ }
+ }
return output;
}
+ private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc,
+ Map<String, String> partSpec, Operator curr, boolean isInsertInto) throws SemanticException {
+ String tableName = table_desc.getTableName();
+ Table table = null;
+ try {
+ table = db.getTable(tableName);
+ } catch (HiveException e) {
+ throw new SemanticException(e.getMessage());
+ }
+ LOG.info("Generate an operator pipleline to autogather column stats for table " + tableName
+ + " in query " + ctx.getCmd());
+ ColumnStatsAutoGatherContext columnStatsAutoGatherContext = null;
+ columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(this, conf, curr, table, partSpec, isInsertInto);
+ columnStatsAutoGatherContext.insertAnalyzePipeline();
+ columnStatsAutoGatherContexts.add(columnStatsAutoGatherContext);
+ }
+
String fixCtasColumnName(String colName) {
return colName;
}
@@ -10689,7 +10736,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps,
new HashSet<JoinOperator>(joinContext.keySet()),
new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
- loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,
globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
@@ -12895,4 +12942,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
String.format("Warning: %s", msg));
}
+ public List<LoadFileDesc> getLoadFileWork() {
+ return loadFileWork;
+ }
+
+ public void setLoadFileWork(List<LoadFileDesc> loadFileWork) {
+ this.loadFileWork = loadFileWork;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 4049f40..4b34ebf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -20,15 +20,19 @@ package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -69,6 +73,8 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
+import akka.util.Collections;
+
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
@@ -251,15 +257,6 @@ public abstract class TaskCompiler {
generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs);
- /*
- * If the query was the result of analyze table column compute statistics rewrite, create
- * a column stats task instead of a fetch task to persist stats to the metastore.
- */
- if (isCStats) {
- genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadTableWork, loadFileWork,
- rootTasks, outerQueryLimit);
- }
-
// For each task, set the key descriptor for the reducer
for (Task<? extends Serializable> rootTask : rootTasks) {
GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
@@ -273,6 +270,35 @@ public abstract class TaskCompiler {
optimizeTaskPlan(rootTasks, pCtx, ctx);
+ /*
+ * If the query was the result of analyze table column compute statistics rewrite, create
+ * a column stats task instead of a fetch task to persist stats to the metastore.
+ */
+ if (isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()) {
+ Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? extends Serializable>>();
+ getLeafTasks(rootTasks, leafTasks);
+ if (isCStats) {
+ genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, outerQueryLimit, 0);
+ } else {
+ for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx
+ .getColumnStatsAutoGatherContexts()) {
+ if (!columnStatsAutoGatherContext.isInsertInto()) {
+ genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
+ columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, 0);
+ } else {
+ int numBitVector;
+ try {
+ numBitVector = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
+ } catch (Exception e) {
+ throw new SemanticException(e.getMessage());
+ }
+ genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
+ columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, numBitVector);
+ }
+ }
+ }
+ }
+
decideExecMode(rootTasks, ctx, globalLimitCtx);
if (pCtx.getQueryProperties().isCTAS() && !pCtx.getCreateTable().isMaterialization()) {
@@ -355,8 +381,9 @@ public abstract class TaskCompiler {
* @param qb
*/
@SuppressWarnings("unchecked")
- protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite, List<LoadTableDesc> loadTableWork,
- List<LoadFileDesc> loadFileWork, List<Task<? extends Serializable>> rootTasks, int outerQueryLimit) {
+ protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite,
+ List<LoadFileDesc> loadFileWork, Set<Task<? extends Serializable>> leafTasks,
+ int outerQueryLimit, int numBitVector) {
ColumnStatsTask cStatsTask = null;
ColumnStatsWork cStatsWork = null;
FetchWork fetch = null;
@@ -385,12 +412,12 @@ public abstract class TaskCompiler {
fetch = new FetchWork(loadFileWork.get(0).getSourcePath(), resultTab, outerQueryLimit);
ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName,
- colName, colType, isTblLevel);
+ colName, colType, isTblLevel, numBitVector);
cStatsWork = new ColumnStatsWork(fetch, cStatsDesc);
cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
- // This is a column stats task. According to the semantic, there should be
- // only one MR task in the rootTask.
- rootTasks.get(0).addDependentTask(cStatsTask);
+ for (Task<? extends Serializable> tsk : leafTasks) {
+ tsk.addDependentTask(cStatsTask);
+ }
}
@@ -398,7 +425,7 @@ public abstract class TaskCompiler {
* Find all leaf tasks of the list of root tasks.
*/
protected void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
- HashSet<Task<? extends Serializable>> leaves) {
+ Set<Task<? extends Serializable>> leaves) {
for (Task<? extends Serializable> root : rootTasks) {
getLeafTasks(root, leaves);
@@ -406,7 +433,7 @@ public abstract class TaskCompiler {
}
private void getLeafTasks(Task<? extends Serializable> task,
- HashSet<Task<? extends Serializable>> leaves) {
+ Set<Task<? extends Serializable>> leaves) {
if (task.getDependentTasks() == null) {
if (!leaves.contains(task)) {
leaves.add(task);
@@ -453,7 +480,7 @@ public abstract class TaskCompiler {
ParseContext clone = new ParseContext(queryState,
pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(),
pCtx.getJoinOps(), pCtx.getSmbMapJoinOps(),
- pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getContext(),
+ pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getColumnStatsAutoGatherContexts(), pCtx.getContext(),
pCtx.getIdToTableNameMap(), pCtx.getDestTableId(), pCtx.getUCtx(),
pCtx.getListMapJoinOpsNoReducer(),
pCtx.getPrunedPartitions(), pCtx.getTabNameToTabObject(), pCtx.getOpToSamplePruner(), pCtx.getGlobalLimitCtx(),
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
index 9934fdf..778d6f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
@@ -51,7 +51,7 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali
// this map stores mapping from "big key dir" to its corresponding mapjoin
// task.
private HashMap<Path, Task<? extends Serializable>> dirToTaskMap;
- private Task<? extends Serializable> noSkewTask;
+ private List<Task<? extends Serializable>> noSkewTask;
/**
* For serialization use only.
@@ -61,7 +61,7 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali
public ConditionalResolverSkewJoinCtx(
HashMap<Path, Task<? extends Serializable>> dirToTaskMap,
- Task<? extends Serializable> noSkewTask) {
+ List<Task<? extends Serializable>> noSkewTask) {
super();
this.dirToTaskMap = dirToTaskMap;
this.noSkewTask = noSkewTask;
@@ -76,11 +76,11 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali
this.dirToTaskMap = dirToTaskMap;
}
- public Task<? extends Serializable> getNoSkewTask() {
+ public List<Task<? extends Serializable>> getNoSkewTask() {
return noSkewTask;
}
- public void setNoSkewTask(Task<? extends Serializable> noSkewTask) {
+ public void setNoSkewTask(List<Task<? extends Serializable>> noSkewTask) {
this.noSkewTask = noSkewTask;
}
}
@@ -121,7 +121,7 @@ public class ConditionalResolverSkewJoin implements ConditionalResolver, Seriali
e.printStackTrace();
}
if (resTsks.isEmpty() && ctx.getNoSkewTask() != null) {
- resTsks.add(ctx.getNoSkewTask());
+ resTsks.addAll(ctx.getNoSkewTask());
}
return resTsks;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/test/queries/clientpositive/autoColumnStats_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_1.q b/ql/src/test/queries/clientpositive/autoColumnStats_1.q
new file mode 100644
index 0000000..bb7252a
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_1.q
@@ -0,0 +1,192 @@
+set hive.stats.column.autogather=true;
+set hive.stats.fetch.column.stats=true;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.auto.convert.join=true;
+set hive.join.emit.interval=2;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
+set hive.optimize.bucketingsorting=false;
+
+drop table src_multi1;
+
+create table src_multi1 like src;
+
+insert overwrite table src_multi1 select * from src;
+
+explain extended select * from src_multi1;
+
+describe formatted src_multi1;
+
+drop table a;
+drop table b;
+create table a like src;
+create table b like src;
+
+from src
+insert overwrite table a select *
+insert overwrite table b select *;
+
+describe formatted a;
+describe formatted b;
+
+drop table a;
+drop table b;
+create table a like src;
+create table b like src;
+
+from src
+insert overwrite table a select *
+insert into table b select *;
+
+describe formatted a;
+describe formatted b;
+
+
+drop table src_multi2;
+
+create table src_multi2 like src;
+
+insert overwrite table src_multi2 select subq.key, src.value from (select * from src union select * from src1)subq join src on subq.key=src.key;
+
+describe formatted src_multi2;
+
+
+drop table nzhang_part14;
+
+create table if not exists nzhang_part14 (key string)
+ partitioned by (value string);
+
+insert overwrite table nzhang_part14 partition(value)
+select key, value from (
+ select * from (select 'k1' as key, cast(null as string) as value from src limit 2)a
+ union all
+ select * from (select 'k2' as key, '' as value from src limit 2)b
+ union all
+ select * from (select 'k3' as key, ' ' as value from src limit 2)c
+) T;
+
+explain select key from nzhang_part14;
+
+
+drop table src5;
+
+create table src5 as select key, value from src limit 5;
+
+insert overwrite table nzhang_part14 partition(value)
+select key, value from src5;
+
+explain select key from nzhang_part14;
+
+
+create table alter5 ( col1 string ) partitioned by (dt string);
+
+alter table alter5 add partition (dt='a') location 'parta';
+
+describe formatted alter5 partition (dt='a');
+
+insert overwrite table alter5 partition (dt='a') select key from src ;
+
+describe formatted alter5 partition (dt='a');
+
+explain select * from alter5 where dt='a';
+
+
+drop table src_stat_part;
+create table src_stat_part(key string, value string) partitioned by (partitionId int);
+
+insert overwrite table src_stat_part partition (partitionId=1)
+select * from src1 limit 5;
+
+describe formatted src_stat_part PARTITION(partitionId=1);
+
+insert overwrite table src_stat_part partition (partitionId=2)
+select * from src1;
+
+describe formatted src_stat_part PARTITION(partitionId=2);
+
+drop table srcbucket_mapjoin;
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+drop table tab_part;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+drop table srcbucket_mapjoin_part;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+insert overwrite table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+describe formatted tab_part partition (ds='2008-04-08');
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+describe formatted tab partition (ds='2008-04-08');
+
+drop table nzhang_part14;
+
+create table if not exists nzhang_part14 (key string, value string)
+ partitioned by (ds string, hr string);
+
+describe formatted nzhang_part14;
+
+insert overwrite table nzhang_part14 partition(ds, hr)
+select key, value, ds, hr from (
+ select * from (select 'k1' as key, cast(null as string) as value, '1' as ds, '2' as hr from src limit 2)a
+ union all
+ select * from (select 'k2' as key, '' as value, '1' as ds, '3' as hr from src limit 2)b
+ union all
+ select * from (select 'k3' as key, ' ' as value, '2' as ds, '1' as hr from src limit 2)c
+) T;
+
+desc formatted nzhang_part14 partition(ds='1', hr='3');
+
+
+INSERT OVERWRITE TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr)
+SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10;
+
+desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12');
+
+
+drop table nzhang_part14;
+create table if not exists nzhang_part14 (key string, value string)
+partitioned by (ds string, hr string);
+
+INSERT OVERWRITE TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr)
+SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10;
+
+desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12');
+
+drop table a;
+create table a (key string, value string)
+partitioned by (ds string, hr string);
+
+drop table b;
+create table b (key string, value string)
+partitioned by (ds string, hr string);
+
+drop table c;
+create table c (key string, value string)
+partitioned by (ds string, hr string);
+
+
+FROM srcpart
+INSERT OVERWRITE TABLE a PARTITION (ds='2010-03-11', hr) SELECT key, value, hr WHERE ds is not null and hr>10
+INSERT OVERWRITE TABLE b PARTITION (ds='2010-04-11', hr) SELECT key, value, hr WHERE ds is not null and hr>11
+INSERT OVERWRITE TABLE c PARTITION (ds='2010-05-11', hr) SELECT key, value, hr WHERE hr>0;
+
+explain select key from a;
+explain select value from b;
+explain select key from b;
+explain select value from c;
+explain select key from c;
+
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4b936e/ql/src/test/queries/clientpositive/autoColumnStats_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_2.q b/ql/src/test/queries/clientpositive/autoColumnStats_2.q
new file mode 100644
index 0000000..c1abcb1
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_2.q
@@ -0,0 +1,214 @@
+set hive.stats.column.autogather=true;
+set hive.stats.fetch.column.stats=true;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.auto.convert.join=true;
+set hive.join.emit.interval=2;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
+set hive.optimize.bucketingsorting=false;
+
+drop table src_multi1;
+
+create table src_multi1 like src;
+
+insert into table src_multi1 select * from src;
+
+explain extended select * from src_multi1;
+
+describe formatted src_multi1;
+
+drop table a;
+drop table b;
+create table a like src;
+create table b like src;
+
+from src
+insert into table a select *
+insert into table b select *;
+
+describe formatted a key;
+describe formatted b key;
+
+from src
+insert overwrite table a select *
+insert into table b select *;
+
+describe formatted a;
+describe formatted b;
+
+describe formatted b key;
+describe formatted b value;
+
+insert into table b select NULL, NULL from src limit 10;
+
+describe formatted b key;
+describe formatted b value;
+
+insert into table b(value) select key+100000 from src limit 10;
+
+describe formatted b key;
+describe formatted b value;
+
+drop table src_multi2;
+
+create table src_multi2 like src;
+
+insert into table src_multi2 select subq.key, src.value from (select * from src union select * from src1)subq join src on subq.key=src.key;
+
+describe formatted src_multi2;
+
+
+drop table nzhang_part14;
+
+create table if not exists nzhang_part14 (key string)
+ partitioned by (value string);
+
+insert into table nzhang_part14 partition(value)
+select key, value from (
+ select * from (select 'k1' as key, cast(null as string) as value from src limit 2)a
+ union all
+ select * from (select 'k2' as key, '' as value from src limit 2)b
+ union all
+ select * from (select 'k3' as key, ' ' as value from src limit 2)c
+) T;
+
+explain select key from nzhang_part14;
+
+
+drop table src5;
+
+create table src5 as select key, value from src limit 5;
+
+insert into table nzhang_part14 partition(value)
+select key, value from src5;
+
+explain select key from nzhang_part14;
+
+drop table alter5;
+
+create table alter5 ( col1 string ) partitioned by (dt string);
+
+alter table alter5 add partition (dt='a');
+
+describe formatted alter5 partition (dt='a');
+
+insert into table alter5 partition (dt='a') select key from src ;
+
+describe formatted alter5 partition (dt='a');
+
+explain select * from alter5 where dt='a';
+
+drop table alter5;
+
+create table alter5 ( col1 string ) partitioned by (dt string);
+
+alter table alter5 add partition (dt='a') location 'parta';
+
+describe formatted alter5 partition (dt='a');
+
+insert into table alter5 partition (dt='a') select key from src ;
+
+describe formatted alter5 partition (dt='a');
+
+explain select * from alter5 where dt='a';
+
+
+drop table src_stat_part;
+create table src_stat_part(key string, value string) partitioned by (partitionId int);
+
+insert into table src_stat_part partition (partitionId=1)
+select * from src1 limit 5;
+
+describe formatted src_stat_part PARTITION(partitionId=1);
+
+insert into table src_stat_part partition (partitionId=2)
+select * from src1;
+
+describe formatted src_stat_part PARTITION(partitionId=2);
+
+drop table srcbucket_mapjoin;
+CREATE TABLE srcbucket_mapjoin(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+drop table tab_part;
+CREATE TABLE tab_part (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+drop table srcbucket_mapjoin_part;
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin partition(ds='2008-04-08');
+
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+insert into table tab_part partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+
+describe formatted tab_part partition (ds='2008-04-08');
+
+CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+insert into table tab partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin;
+
+describe formatted tab partition (ds='2008-04-08');
+
+drop table nzhang_part14;
+
+create table if not exists nzhang_part14 (key string, value string)
+ partitioned by (ds string, hr string);
+
+describe formatted nzhang_part14;
+
+insert into table nzhang_part14 partition(ds, hr)
+select key, value, ds, hr from (
+ select * from (select 'k1' as key, cast(null as string) as value, '1' as ds, '2' as hr from src limit 2)a
+ union all
+ select * from (select 'k2' as key, '' as value, '1' as ds, '3' as hr from src limit 2)b
+ union all
+ select * from (select 'k3' as key, ' ' as value, '2' as ds, '1' as hr from src limit 2)c
+) T;
+
+desc formatted nzhang_part14 partition(ds='1', hr='3');
+
+
+INSERT into TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr)
+SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10;
+
+desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12');
+
+
+drop table nzhang_part14;
+create table if not exists nzhang_part14 (key string, value string)
+partitioned by (ds string, hr string);
+
+INSERT into TABLE nzhang_part14 PARTITION (ds='2010-03-03', hr)
+SELECT key, value, hr FROM srcpart WHERE ds is not null and hr>10;
+
+desc formatted nzhang_part14 PARTITION(ds='2010-03-03', hr='12');
+
+drop table a;
+create table a (key string, value string)
+partitioned by (ds string, hr string);
+
+drop table b;
+create table b (key string, value string)
+partitioned by (ds string, hr string);
+
+drop table c;
+create table c (key string, value string)
+partitioned by (ds string, hr string);
+
+
+FROM srcpart
+INSERT into TABLE a PARTITION (ds='2010-03-11', hr) SELECT key, value, hr WHERE ds is not null and hr>10
+INSERT into TABLE b PARTITION (ds='2010-04-11', hr) SELECT key, value, hr WHERE ds is not null and hr>11
+INSERT into TABLE c PARTITION (ds='2010-05-11', hr) SELECT key, value, hr WHERE hr>0;
+
+explain select key from a;
+explain select value from b;
+explain select key from b;
+explain select value from c;
+explain select key from c;
+