You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2011/02/06 19:11:24 UTC

svn commit: r1067716 - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/

Author: namit
Date: Sun Feb  6 18:11:24 2011
New Revision: 1067716

URL: http://svn.apache.org/viewvc?rev=1067716&view=rev
Log:
HIVE-1961 Make Stats gathering more flexible with timeout and atomicity
(Ning Zhang via namit)


Modified:
    hive/trunk/CHANGES.txt
    hive/trunk/build-common.xml
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java

Modified: hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hive/trunk/CHANGES.txt?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/CHANGES.txt (original)
+++ hive/trunk/CHANGES.txt Sun Feb  6 18:11:24 2011
@@ -172,6 +172,9 @@ Trunk -  Unreleased
     HIVE-1956 Provide DFS initialization script for Hive
     (Bruno Mahe via namit)
 
+    HIVE-1961 Make Stats gathering more flexible with timeout and atomicity
+    (Ning Zhang via namit)
+
   IMPROVEMENTS
 
     HIVE-1235 use Ivy for fetching HBase dependencies (John Sichi via cws)

Modified: hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hive/trunk/build-common.xml?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/build-common.xml (original)
+++ hive/trunk/build-common.xml Sun Feb  6 18:11:24 2011
@@ -54,7 +54,7 @@
   <property name="test.timeout" value="18200000"/>
   <property name="test.junit.output.format" value="xml"/>
   <property name="test.junit.output.usefile" value="true"/>
-  <property name="minimr.query.files" value="input16_cc.q,scriptfile1.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q"/>
+  <property name="minimr.query.files" value="input16_cc.q,scriptfile1.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,merge_dynamic_partition2.q,merge_dynamic_partition3.q"/>
   <property name="test.silent" value="true"/>
   <property name="hadoopVersion" value="${hadoop.version.ant-internal}"/>
   <property name="test.serialize.qplan" value="false"/>

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sun Feb  6 18:11:24 2011
@@ -325,6 +325,10 @@ public class HiveConf extends Configurat
         ""), // default stats publisher if none of JDBC/HBase is specified
     HIVE_STATS_DEFAULT_AGGREGATOR("hive.stats.default.aggregator",
         ""), // default stats aggregator if none of JDBC/HBase is specified
+    HIVE_STATS_JDBC_TIMEOUT("hive.stats.jdbc.timeout",
+        30), // default timeout in sec for JDBC connection & SQL statements
+    HIVE_STATS_ATOMIC("hive.stats.atomic",
+        false), // whether to update metastore stats only if all stats are available
 
 
     // Concurrency
@@ -368,7 +372,7 @@ public class HiveConf extends Configurat
 
     HIVE_ERROR_ON_EMPTY_PARTITION("hive.error.on.empty.partition", false),
 
-    HIVE_INDEX_IGNORE_HDFS_LOC("hive.index.compact.file.ignore.hdfs", false), 
+    HIVE_INDEX_IGNORE_HDFS_LOC("hive.index.compact.file.ignore.hdfs", false),
     ;
 
 

Modified: hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml (original)
+++ hive/trunk/conf/hive-default.xml Sun Feb  6 18:11:24 2011
@@ -734,6 +734,30 @@
 </property>
 
 <property>
+  <name>hive.stats.default.publisher</name>
+  <value></value>
+  <description>The Java class (implementing the StatsPublisher interface) that is used by default if hive.stats.dbclass is not JDBC or HBase.</description>
+</property>
+
+<property>
+  <name>hive.stats.default.aggregator</name>
+  <value></value>
+  <description>The Java class (implementing the StatsAggregator interface) that is used by default if hive.stats.dbclass is not JDBC or HBase.</description>
+</property>
+
+<property>
+  <name>hive.stats.jdbc.timeout</name>
+  <value>30</value>
+  <description>Timeout value (number of seconds) used by JDBC connection and statements.</description>
+</property>
+
+<property>
+  <name>hive.stats.jdbc.atomic</name>
+  <value>false</value>
+  <description>If this is set to true then the metastore stats will be updated only if all types of stats (# of rows, # of files, # of bytes etc.) are available. Otherwise metastore stats are updated in a best effort fashion with whatever are available.</description>
+</property>
+
+<property>
   <name>hive.support.concurrency</name>
   <value>false</value>
   <description>Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.</description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sun Feb  6 18:11:24 2011
@@ -773,7 +773,12 @@ public class FileSinkOperator extends Te
     // Initializing a stats publisher
     StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc);
 
-    if (statsPublisher == null || !statsPublisher.connect(hconf)) {
+    if (statsPublisher == null) {
+      // just return, stats gathering should not block the main query
+      LOG.error("StatsPublishing error: StatsPublisher is not initialized.");
+      return;
+    }
+    if (!statsPublisher.connect(hconf)) {
       // just return, stats gathering should not block the main query
       LOG.error("StatsPublishing error: cannot connect to database");
       return;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Sun Feb  6 18:11:24 2011
@@ -226,6 +226,11 @@ public class StatsTask extends Task<Stat
   }
 
   private int aggregateStats() {
+
+    String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+    StatsFactory.setImplementation(statsImplementationClass, conf);
+    StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+
     try {
       // Stats setup:
       Warehouse wh = new Warehouse(conf);
@@ -233,19 +238,10 @@ public class StatsTask extends Task<Stat
       FileStatus[] fileStatus;
 
       // manufacture a StatsAggregator
-      StatsAggregator statsAggregator;
-      String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
-      StatsFactory.setImplementation(statsImplementationClass, conf);
-      statsAggregator = StatsFactory.getStatsAggregator();
       if (!statsAggregator.connect(conf)) {
-        // this should not fail the whole job, return 0 so that the job won't fail.
-        console.printInfo("[WARNING] Could not update table/partition level stats.",
-            "StatsAggregator.connect() failed: stats class = " +
-            statsImplementationClass);
-        return 0;
+        throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
       }
 
-
       TableStatistics tblStats = new TableStatistics();
 
       //
@@ -287,6 +283,10 @@ public class StatsTask extends Task<Stat
         String rows = statsAggregator.aggregateStats(work.getAggKey(), StatsSetupConst.ROW_COUNT);
         if (rows != null) {
           tblStats.setNumRows(Long.parseLong(rows));
+        } else {
+          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
+            throw new HiveException("StatsAggregator failed to get numRows.");
+          }
         }
       } else {
         // Partitioned table:
@@ -304,6 +304,10 @@ public class StatsTask extends Task<Stat
           String rows = statsAggregator.aggregateStats(partitionID, StatsSetupConst.ROW_COUNT);
           if (rows != null) {
             newPartStats.setNumRows(Long.parseLong(rows));
+          } else {
+            if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
+              throw new HiveException("StatsAggregator failed to get numRows.");
+            }
           }
 
           fileSys = partn.getPartitionPath().getFileSystem(conf);
@@ -358,7 +362,6 @@ public class StatsTask extends Task<Stat
         }
       }
 
-      statsAggregator.closeConnection();
 
       //
       // write table stats to metastore
@@ -375,15 +378,16 @@ public class StatsTask extends Task<Stat
 
       console.printInfo("Table " + table.getTableName() + " stats: [" + tblStats.toString() + ']');
 
-      return 0;
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       // return 0 since StatsTask should not fail the whole job
       console.printInfo("[Warning] could not update stats.",
           "Failed with exception " + e.getMessage() + "\n"
           + StringUtils.stringifyException(e));
-      return 0;
+    } finally {
+      statsAggregator.closeConnection();
     }
+    // StatsTask always return 0 so that the whole job won't fail
+    return 0;
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java Sun Feb  6 18:11:24 2011
@@ -37,14 +37,17 @@ public class JDBCStatsAggregator impleme
   private String connectionString;
   private Configuration hiveconf;
   private final Log LOG = LogFactory.getLog(this.getClass().getName());
+  private int timeout = 30;
 
   public boolean connect(Configuration hiveconf) {
     try {
       this.hiveconf = hiveconf;
+      timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
       connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
       String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
       Class.forName(driver).newInstance();
-      DriverManager.setLoginTimeout(3); // stats should not block
+      // stats is non-blocking -- throw an exception when timeout
+      DriverManager.setLoginTimeout(timeout);
       conn = DriverManager.getConnection(connectionString);
       return true;
     } catch (Exception e) {
@@ -67,6 +70,7 @@ public class JDBCStatsAggregator impleme
     try {
       long retval = 0;
       Statement stmt = conn.createStatement();
+      stmt.setQueryTimeout(timeout);
       String select =
         "SELECT SUM" + "(" + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + ")" +
         " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java?rev=1067716&r1=1067715&r2=1067716&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java Sun Feb  6 18:11:24 2011
@@ -40,6 +40,7 @@ public class JDBCStatsPublisher implemen
   private Configuration hiveconf;
   private final Log LOG = LogFactory.getLog(this.getClass().getName());
   private PreparedStatement selStmt, updStmt, insStmt;
+  private int timeout = 30; // default timeout in sec. for JDBC connection and statements
 
   public JDBCStatsPublisher() {
     selStmt = updStmt = insStmt = null;
@@ -49,9 +50,10 @@ public class JDBCStatsPublisher implemen
     try {
       this.hiveconf = hiveconf;
       connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
+      timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
       String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
       Class.forName(driver).newInstance();
-      DriverManager.setLoginTimeout(3); // stats should not block
+      DriverManager.setLoginTimeout(timeout); // stats is non-blocking
       conn = DriverManager.getConnection(connectionString);
 
       // prepare the SELECT/UPDATE/INSERT statements
@@ -74,9 +76,9 @@ public class JDBCStatsPublisher implemen
       insStmt = conn.prepareStatement(insert);
 
       // make the statements non-blocking
-      selStmt.setQueryTimeout(5);
-      updStmt.setQueryTimeout(5);
-      insStmt.setQueryTimeout(5);
+      selStmt.setQueryTimeout(timeout);
+      updStmt.setQueryTimeout(timeout);
+      insStmt.setQueryTimeout(timeout);
 
       return true;
     } catch (Exception e) {
@@ -168,9 +170,11 @@ public class JDBCStatsPublisher implemen
       connectionString = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
       String driver = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
       Class.forName(driver).newInstance();
+      DriverManager.setLoginTimeout(timeout);
       conn = DriverManager.getConnection(connectionString);
 
       Statement stmt = conn.createStatement();
+      stmt.setQueryTimeout(timeout);
 
       // Check if the table exists
       DatabaseMetaData dbm = conn.getMetaData();