You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/05/24 21:17:46 UTC

svn commit: r1127229 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java

Author: nzhang
Date: Tue May 24 19:17:46 2011
New Revision: 1127229

URL: http://svn.apache.org/viewvc?rev=1127229&view=rev
Log:
HIVE-2144. reduce workload generated by JDBCStatsPublisher (Tomasz Nykiel via Ning Zhang)

Added:
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java?rev=1127229&r1=1127228&r2=1127229&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java Tue May 24 19:17:46 2011
@@ -24,6 +24,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.SQLRecoverableException;
 import java.sql.Statement;
 import java.util.Random;
@@ -42,7 +43,7 @@ public class JDBCStatsPublisher implemen
   private String connectionString;
   private Configuration hiveconf;
   private final Log LOG = LogFactory.getLog(this.getClass().getName());
-  private PreparedStatement selStmt, updStmt, insStmt;
+  private PreparedStatement updStmt, insStmt;
   private int timeout; // default timeout in sec. for JDBC connection and statements
   // SQL comment that identifies where the SQL statement comes from
   private final String comment = "Hive stats publishing: " + this.getClass().getName();
@@ -70,19 +71,21 @@ public class JDBCStatsPublisher implemen
     }
 
     // prepare the SELECT/UPDATE/INSERT statements
-    String select =
-      "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME +
-      " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
-      " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
 
     String update =
-      "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
-      " SET " +  JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " +
-      " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
+        "UPDATE /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+            " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " +
+            " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ? " +
+            " AND ? > ( SELECT TEMP." + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME
+            + " FROM ( " +
+            " SELECT " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + " FROM "
+            + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+            " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME
+            + " = ?) TEMP )";
 
     String insert =
-      "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
-      " VALUES (?, ?)";
+        "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+            " VALUES (?, ?)";
 
     DriverManager.setLoginTimeout(timeout); // stats is non-blocking
 
@@ -95,25 +98,24 @@ public class JDBCStatsPublisher implemen
       }
     };
 
-    for (int failures = 0; ; failures++) {
+    for (int failures = 0;; failures++) {
       try {
         conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries);
 
         // prepare statements
-        selStmt = Utilities.prepareWithRetry(conn, select, waitWindow, maxRetries);
         updStmt = Utilities.prepareWithRetry(conn, update, waitWindow, maxRetries);
         insStmt = Utilities.prepareWithRetry(conn, insert, waitWindow, maxRetries);
 
         // set query timeout
-        Utilities.executeWithRetry(setQueryTimeout, selStmt, waitWindow, maxRetries);
         Utilities.executeWithRetry(setQueryTimeout, updStmt, waitWindow, maxRetries);
         Utilities.executeWithRetry(setQueryTimeout, insStmt, waitWindow, maxRetries);
 
         return true;
+
       } catch (SQLRecoverableException e) {
         if (failures >= maxRetries) {
           LOG.error("Error during JDBC connection to " + connectionString + ". ", e);
-          return false;  // just return false without fail the task
+          return false; // just return false without fail the task
         }
         long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
         try {
@@ -143,13 +145,6 @@ public class JDBCStatsPublisher implemen
     }
     LOG.info("Stats publishing for key " + fileID + ". Value = " + value);
 
-    Utilities.SQLCommand<ResultSet> execQuery = new Utilities.SQLCommand<ResultSet>() {
-      @Override
-      public ResultSet run(PreparedStatement stmt) throws SQLException {
-        return stmt.executeQuery();
-      }
-    };
-
     Utilities.SQLCommand<Void> execUpdate = new Utilities.SQLCommand<Void>() {
       @Override
       public Void run(PreparedStatement stmt) throws SQLException {
@@ -158,45 +153,40 @@ public class JDBCStatsPublisher implemen
       }
     };
 
-    for (int failures = 0; ; failures++) {
+    for (int failures = 0;; failures++) {
       try {
+        insStmt.setString(1, fileID);
+        insStmt.setString(2, value);
+        Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries);
+        return true;
+      } catch (SQLIntegrityConstraintViolationException e) {
+
+        // We assume that the table used for partial statistics has a primary key declared on the
+        // "fileID". The exception will be thrown if two tasks report results for the same fileID.
+        // In such case, we either update the row, or abandon changes depending on which statistic
+        // is newer.
 
-        // Check to see if a previous task (mapper attempt) had published a previous stat
-        selStmt.setString(1, fileID);
-        ResultSet result = Utilities.executeWithRetry(execQuery, selStmt, waitWindow, maxRetries);
-
-        if (result.next()) {
-          long currval = result.getLong(1);
-          // Only update if the previous value is smaller (i.e. the previous attempt was a fail and
-          // hopefully this attempt is a success (as it has a greater value).
-          if (currval < Long.parseLong(value)) {
+        for (int updateFailures = 0;; updateFailures++) {
+          try {
             updStmt.setString(1, value);
             updStmt.setString(2, fileID);
+            updStmt.setString(3, value);
+            updStmt.setString(4, fileID);
             Utilities.executeWithRetry(execUpdate, updStmt, waitWindow, maxRetries);
+            return true;
+          } catch (SQLRecoverableException ue) {
+            // need to start from scratch (connection)
+            if (!handleSQLRecoverableException(ue, updateFailures)) {
+              return false;
+            }
+          } catch (SQLException ue) {
+            LOG.error("Error during publishing statistics. ", e);
+            return false;
           }
-        } else {
-          // No previous attempts.
-          insStmt.setString(1, fileID);
-          insStmt.setString(2, value);
-          Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries);
         }
-        return true;
       } catch (SQLRecoverableException e) {
         // need to start from scratch (connection)
-        if (failures >= maxRetries) {
-          return false;
-        }
-        // close the current connection
-        closeConnection();
-        long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
-        try {
-          Thread.sleep(waitTime);
-        } catch (InterruptedException iex) {
-        }
-        // get a new connection
-        if (!connect(hiveconf)) {
-          // if cannot reconnect, just fail because connect() already handles retries.
-          LOG.error("Error during publishing aggregation. " + e);
+        if (!handleSQLRecoverableException(e, failures)) {
           return false;
         }
       } catch (SQLException e) {
@@ -206,6 +196,26 @@ public class JDBCStatsPublisher implemen
     }
   }
 
+  private boolean handleSQLRecoverableException(Exception e, int failures) {
+    if (failures >= maxRetries) {
+      return false;
+    }
+    // close the current connection
+    closeConnection();
+    long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+    try {
+      Thread.sleep(waitTime);
+    } catch (InterruptedException iex) {
+    }
+    // get a new connection
+    if (!connect(hiveconf)) {
+      // if cannot reconnect, just fail because connect() already handles retries.
+      LOG.error("Error during publishing aggregation. " + e);
+      return false;
+    }
+    return true;
+  }
+
   @Override
   public boolean closeConnection() {
     if (conn == null) {
@@ -218,20 +228,18 @@ public class JDBCStatsPublisher implemen
       if (updStmt != null) {
         updStmt.close();
       }
-      if (selStmt != null) {
-        selStmt.close();
-      }
 
       conn.close();
 
       // In case of derby, explicitly shutdown the database otherwise it reports error when
       // trying to connect to the same JDBC connection string again.
-      if(HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCLASS).equalsIgnoreCase("jdbc:derby")) {
+      if (HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCLASS).equalsIgnoreCase(
+          "jdbc:derby")) {
         try {
-          // The following closes the derby connection. It throws an exception that has to be caught and ignored.
+          // The following closes the derby connection. It throws an exception that has to be caught
+          // and ignored.
           DriverManager.getConnection(connectionString + ";shutdown=true");
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
           // Do nothing because we know that an exception is thrown anyway.
         }
       }
@@ -265,9 +273,9 @@ public class JDBCStatsPublisher implemen
       boolean tblExists = rs.next();
       if (!tblExists) { // Table does not exist, create it
         String createTable =
-          "CREATE TABLE " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + " (" +
-          JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " VARCHAR(255), " +
-          JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + " BIGINT)";
+            "CREATE TABLE " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + " (" +
+                JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " VARCHAR(255) PRIMARY KEY, " +
+                JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + " BIGINT )";
 
         stmt.executeUpdate(createTable);
         stmt.close();

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java?rev=1127229&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java Tue May 24 19:17:46 2011
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.stats.StatsAggregator;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * TestOperators.
+ *
+ */
+public class TestStatsPublisher extends TestCase {
+
+  protected Configuration conf;
+  protected String statsImplementationClass;
+
+  public TestStatsPublisher(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void setUp() {
+    System.out.println("StatPublisher Test");
+    conf = new JobConf(TestStatsPublisher.class);
+
+    statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+    StatsFactory.setImplementation(statsImplementationClass, conf);
+  }
+
+  public void testStatsPublisherOneStat() throws Throwable {
+    try {
+      System.out.println("StatsPublisher - one stat published per key - aggregating matching key");
+
+      // instantiate stats publisher
+      StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+      assertNotNull(statsPublisher);
+      assertTrue(statsPublisher.init(conf));
+      assertTrue(statsPublisher.connect(conf));
+
+      // instantiate stats aggregator
+      StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+      assertNotNull(statsAggregator);
+      assertTrue(statsAggregator.connect(conf));
+
+      // publish stats
+      assertTrue(statsPublisher.publishStat("file_00000", StatsSetupConst.ROW_COUNT, "200"));
+      assertTrue(statsPublisher.publishStat("file_00001", StatsSetupConst.ROW_COUNT, "400"));
+
+      // aggregate existing stats for prefixes
+      String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+      assertEquals("200", rows0);
+      String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+      assertEquals("400", rows1);
+
+      // close connections
+      assertTrue(statsPublisher.closeConnection());
+      assertTrue(statsAggregator.closeConnection());
+
+      System.out.println("StatsPublisher - one stat published per key - aggregating matching key - OK");
+    } catch (Throwable e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+
+  public void testStatsPublisher() throws Throwable {
+    try {
+      System.out.println("StatsPublisher - basic functionality");
+
+      // instantiate stats publisher
+      StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+      assertNotNull(statsPublisher);
+      assertTrue(statsPublisher.init(conf));
+      assertTrue(statsPublisher.connect(conf));
+
+      // instantiate stats aggregator
+      StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+      assertNotNull(statsAggregator);
+      assertTrue(statsAggregator.connect(conf));
+
+      // publish stats
+      assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "200"));
+      assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "300"));
+      assertTrue(statsPublisher.publishStat("file_00001_a", StatsSetupConst.ROW_COUNT, "400"));
+      assertTrue(statsPublisher.publishStat("file_00001_b", StatsSetupConst.ROW_COUNT, "500"));
+
+      // aggregate existing stats for prefixes
+      String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+      assertEquals("500", rows0);
+      String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+      assertEquals("900", rows1);
+
+      // aggregate non-existent stats
+      String rowsX = statsAggregator.aggregateStats("file_00002", StatsSetupConst.ROW_COUNT);
+      assertEquals("0", rowsX);
+
+      // close connections
+      assertTrue(statsPublisher.closeConnection());
+      assertTrue(statsAggregator.closeConnection());
+
+      System.out.println("StatsPublisher - basic functionality - OK");
+    } catch (Throwable e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  public void testStatsPublisherMultipleUpdates() throws Throwable {
+    try {
+      System.out.println("StatsPublisher - multiple updates");
+
+      // instantiate stats publisher
+      StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+      assertNotNull(statsPublisher);
+      assertTrue(statsPublisher.init(conf));
+      assertTrue(statsPublisher.connect(conf));
+
+      // instantiate stats aggregator
+      StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+      assertNotNull(statsAggregator);
+      assertTrue(statsAggregator.connect(conf));
+
+      // publish stats
+      assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "200"));
+      assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "300"));
+      assertTrue(statsPublisher.publishStat("file_00001_a", StatsSetupConst.ROW_COUNT, "400"));
+      assertTrue(statsPublisher.publishStat("file_00001_b", StatsSetupConst.ROW_COUNT, "500"));
+
+      // repetitive update - should not change the stored value - as the published values are
+      // smaller than the current ones
+      assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "100"));
+      assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "150"));
+
+      // should change the stored value - the published values are greater than the current values
+      assertTrue(statsPublisher.publishStat("file_00001_a", StatsSetupConst.ROW_COUNT, "500"));
+      assertTrue(statsPublisher.publishStat("file_00001_b", StatsSetupConst.ROW_COUNT, "600"));
+
+      // aggregate stats
+      String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+      assertEquals("500", rows0);
+      String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+      assertEquals("1100", rows1);
+
+      // close connections
+      assertTrue(statsPublisher.closeConnection());
+      assertTrue(statsAggregator.closeConnection());
+
+      System.out.println("StatsPublisher - multiple updates - OK");
+    } catch (Throwable e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  public void testStatsPublisherAfterAggregationCleanUp() throws Throwable {
+    try {
+      System.out.println("StatsPublisher - clean-up after aggregation");
+
+      // instantiate stats publisher
+      StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+      assertNotNull(statsPublisher);
+      assertTrue(statsPublisher.init(conf));
+      assertTrue(statsPublisher.connect(conf));
+
+      // instantiate stats aggregator
+      StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+      assertNotNull(statsAggregator);
+      assertTrue(statsAggregator.connect(conf));
+
+      // publish stats
+      assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "200"));
+      assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "300"));
+      assertTrue(statsPublisher.publishStat("file_00001_a", StatsSetupConst.ROW_COUNT, "400"));
+      assertTrue(statsPublisher.publishStat("file_00001_b", StatsSetupConst.ROW_COUNT, "500"));
+
+      // aggregate stats
+      String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+      assertEquals("500", rows0);
+      String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+      assertEquals("900", rows1);
+
+      // now the table should be empty
+      rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+      assertEquals("0", rows0);
+      rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+      assertEquals("0", rows1);
+
+      // close connections
+      assertTrue(statsPublisher.closeConnection());
+      assertTrue(statsAggregator.closeConnection());
+
+      System.out.println("StatsPublisher - clean-up after aggregation - OK");
+    } catch (Throwable e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  public void testStatsAggregatorCleanUp() throws Throwable {
+    try {
+      System.out.println("StatsAggregator - clean-up");
+
+      // instantiate stats publisher
+      StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+      assertNotNull(statsPublisher);
+      assertTrue(statsPublisher.init(conf));
+      assertTrue(statsPublisher.connect(conf));
+
+      // instantiate stats aggregator
+      StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+      assertNotNull(statsAggregator);
+      assertTrue(statsAggregator.connect(conf));
+
+      // publish stats
+      assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "200"));
+      assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "300"));
+
+      // cleanUp (closes the connection)
+      assertTrue(statsAggregator.cleanUp("file_00000"));
+
+      // now the connection should be closed (aggregator will report an error)
+      String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+      assertNull(rows0);
+
+      // close connections
+      assertTrue(statsPublisher.closeConnection());
+      assertTrue(statsAggregator.closeConnection());
+
+      System.out.println("StatsAggregator - clean-up - OK");
+    } catch (Throwable e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+}