You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2011/02/06 19:11:24 UTC
svn commit: r1067716 - in /hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/
Author: namit
Date: Sun Feb 6 18:11:24 2011
New Revision: 1067716
URL: http://svn.apache.org/viewvc?rev=1067716&view=rev
Log:
HIVE-1961 Make Stats gathering more flexible with timeout and atomicity
(Ning Zhang via namit)
Modified:
hive/trunk/CHANGES.txt
hive/trunk/build-common.xml
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
Modified: hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hive/trunk/CHANGES.txt?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/CHANGES.txt (original)
+++ hive/trunk/CHANGES.txt Sun Feb 6 18:11:24 2011
@@ -172,6 +172,9 @@ Trunk - Unreleased
HIVE-1956 Provide DFS initialization script for Hive
(Bruno Mahe via namit)
+ HIVE-1961 Make Stats gathering more flexible with timeout and atomicity
+ (Ning Zhang via namit)
+
IMPROVEMENTS
HIVE-1235 use Ivy for fetching HBase dependencies (John Sichi via cws)
Modified: hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hive/trunk/build-common.xml?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/build-common.xml (original)
+++ hive/trunk/build-common.xml Sun Feb 6 18:11:24 2011
@@ -54,7 +54,7 @@
<property name="test.timeout" value="18200000"/>
<property name="test.junit.output.format" value="xml"/>
<property name="test.junit.output.usefile" value="true"/>
- <property name="minimr.query.files" value="input16_cc.q,scriptfile1.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q"/>
+ <property name="minimr.query.files" value="input16_cc.q,scriptfile1.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,merge_dynamic_partition2.q,merge_dynamic_partition3.q"/>
<property name="test.silent" value="true"/>
<property name="hadoopVersion" value="${hadoop.version.ant-internal}"/>
<property name="test.serialize.qplan" value="false"/>
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sun Feb 6 18:11:24 2011
@@ -325,6 +325,10 @@ public class HiveConf extends Configurat
""), // default stats publisher if none of JDBC/HBase is specified
HIVE_STATS_DEFAULT_AGGREGATOR("hive.stats.default.aggregator",
""), // default stats aggregator if none of JDBC/HBase is specified
+ HIVE_STATS_JDBC_TIMEOUT("hive.stats.jdbc.timeout",
+ 30), // default timeout in sec for JDBC connection & SQL statements
+ HIVE_STATS_ATOMIC("hive.stats.atomic",
+ false), // whether to update metastore stats only if all stats are available
// Concurrency
@@ -368,7 +372,7 @@ public class HiveConf extends Configurat
HIVE_ERROR_ON_EMPTY_PARTITION("hive.error.on.empty.partition", false),
- HIVE_INDEX_IGNORE_HDFS_LOC("hive.index.compact.file.ignore.hdfs", false),
+ HIVE_INDEX_IGNORE_HDFS_LOC("hive.index.compact.file.ignore.hdfs", false),
;
Modified: hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml (original)
+++ hive/trunk/conf/hive-default.xml Sun Feb 6 18:11:24 2011
@@ -734,6 +734,30 @@
</property>
<property>
+ <name>hive.stats.default.publisher</name>
+ <value></value>
+ <description>The Java class (implementing the StatsPublisher interface) that is used by default if hive.stats.dbclass is not JDBC or HBase.</description>
+</property>
+
+<property>
+ <name>hive.stats.default.aggregator</name>
+ <value></value>
+ <description>The Java class (implementing the StatsAggregator interface) that is used by default if hive.stats.dbclass is not JDBC or HBase.</description>
+</property>
+
+<property>
+ <name>hive.stats.jdbc.timeout</name>
+ <value>30</value>
+ <description>Timeout value (number of seconds) used by JDBC connection and statements.</description>
+</property>
+
+<property>
+ <name>hive.stats.jdbc.atomic</name>
+ <value>false</value>
+ <description>If this is set to true then the metastore stats will be updated only if all types of stats (# of rows, # of files, # of bytes etc.) are available. Otherwise metastore stats are updated in a best effort fashion with whatever are available.</description>
+</property>
+
+<property>
<name>hive.support.concurrency</name>
<value>false</value>
<description>Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.</description>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sun Feb 6 18:11:24 2011
@@ -773,7 +773,12 @@ public class FileSinkOperator extends Te
// Initializing a stats publisher
StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc);
- if (statsPublisher == null || !statsPublisher.connect(hconf)) {
+ if (statsPublisher == null) {
+ // just return, stats gathering should not block the main query
+ LOG.error("StatsPublishing error: StatsPublisher is not initialized.");
+ return;
+ }
+ if (!statsPublisher.connect(hconf)) {
// just return, stats gathering should not block the main query
LOG.error("StatsPublishing error: cannot connect to database");
return;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Sun Feb 6 18:11:24 2011
@@ -226,6 +226,11 @@ public class StatsTask extends Task<Stat
}
private int aggregateStats() {
+
+ String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+ StatsFactory.setImplementation(statsImplementationClass, conf);
+ StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+
try {
// Stats setup:
Warehouse wh = new Warehouse(conf);
@@ -233,19 +238,10 @@ public class StatsTask extends Task<Stat
FileStatus[] fileStatus;
// manufacture a StatsAggregator
- StatsAggregator statsAggregator;
- String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- StatsFactory.setImplementation(statsImplementationClass, conf);
- statsAggregator = StatsFactory.getStatsAggregator();
if (!statsAggregator.connect(conf)) {
- // this should not fail the whole job, return 0 so that the job won't fail.
- console.printInfo("[WARNING] Could not update table/partition level stats.",
- "StatsAggregator.connect() failed: stats class = " +
- statsImplementationClass);
- return 0;
+ throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
}
-
TableStatistics tblStats = new TableStatistics();
//
@@ -287,6 +283,10 @@ public class StatsTask extends Task<Stat
String rows = statsAggregator.aggregateStats(work.getAggKey(), StatsSetupConst.ROW_COUNT);
if (rows != null) {
tblStats.setNumRows(Long.parseLong(rows));
+ } else {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
+ throw new HiveException("StatsAggregator failed to get numRows.");
+ }
}
} else {
// Partitioned table:
@@ -304,6 +304,10 @@ public class StatsTask extends Task<Stat
String rows = statsAggregator.aggregateStats(partitionID, StatsSetupConst.ROW_COUNT);
if (rows != null) {
newPartStats.setNumRows(Long.parseLong(rows));
+ } else {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
+ throw new HiveException("StatsAggregator failed to get numRows.");
+ }
}
fileSys = partn.getPartitionPath().getFileSystem(conf);
@@ -358,7 +362,6 @@ public class StatsTask extends Task<Stat
}
}
- statsAggregator.closeConnection();
//
// write table stats to metastore
@@ -375,15 +378,16 @@ public class StatsTask extends Task<Stat
console.printInfo("Table " + table.getTableName() + " stats: [" + tblStats.toString() + ']');
- return 0;
- }
- catch (Exception e) {
+ } catch (Exception e) {
// return 0 since StatsTask should not fail the whole job
console.printInfo("[Warning] could not update stats.",
"Failed with exception " + e.getMessage() + "\n"
+ StringUtils.stringifyException(e));
- return 0;
+ } finally {
+ statsAggregator.closeConnection();
}
+ // StatsTask always return 0 so that the whole job won't fail
+ return 0;
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java Sun Feb 6 18:11:24 2011
@@ -37,14 +37,17 @@ public class JDBCStatsAggregator impleme
private String connectionString;
private Configuration hiveconf;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
+ private int timeout = 30;
public boolean connect(Configuration hiveconf) {
try {
this.hiveconf = hiveconf;
+ timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
- DriverManager.setLoginTimeout(3); // stats should not block
+ // stats is non-blocking -- throw an exception when timeout
+ DriverManager.setLoginTimeout(timeout);
conn = DriverManager.getConnection(connectionString);
return true;
} catch (Exception e) {
@@ -67,6 +70,7 @@ public class JDBCStatsAggregator impleme
try {
long retval = 0;
Statement stmt = conn.createStatement();
+ stmt.setQueryTimeout(timeout);
String select =
"SELECT SUM" + "(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" +
" FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java Sun Feb 6 18:11:24 2011
@@ -40,6 +40,7 @@ public class JDBCStatsPublisher implemen
private Configuration hiveconf;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
private PreparedStatement selStmt, updStmt, insStmt;
+ private int timeout = 30; // default timeout in sec. for JDBC connection and statements
public JDBCStatsPublisher() {
selStmt = updStmt = insStmt = null;
@@ -49,9 +50,10 @@ public class JDBCStatsPublisher implemen
try {
this.hiveconf = hiveconf;
connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
+ timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
- DriverManager.setLoginTimeout(3); // stats should not block
+ DriverManager.setLoginTimeout(timeout); // stats is non-blocking
conn = DriverManager.getConnection(connectionString);
// prepare the SELECT/UPDATE/INSERT statements
@@ -74,9 +76,9 @@ public class JDBCStatsPublisher implemen
insStmt = conn.prepareStatement(insert);
// make the statements non-blocking
- selStmt.setQueryTimeout(5);
- updStmt.setQueryTimeout(5);
- insStmt.setQueryTimeout(5);
+ selStmt.setQueryTimeout(timeout);
+ updStmt.setQueryTimeout(timeout);
+ insStmt.setQueryTimeout(timeout);
return true;
} catch (Exception e) {
@@ -168,9 +170,11 @@ public class JDBCStatsPublisher implemen
connectionString = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
String driver = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
Class.forName(driver).newInstance();
+ DriverManager.setLoginTimeout(timeout);
conn = DriverManager.getConnection(connectionString);
Statement stmt = conn.createStatement();
+ stmt.setQueryTimeout(timeout);
// Check if the table exists
DatabaseMetaData dbm = conn.getMetaData();