You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/05/24 21:17:46 UTC
svn commit: r1127229 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java
Author: nzhang
Date: Tue May 24 19:17:46 2011
New Revision: 1127229
URL: http://svn.apache.org/viewvc?rev=1127229&view=rev
Log:
HIVE-2144. reduce workload generated by JDBCStatsPublisher (Tomasz Nykiel via Ning Zhang)
Added:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java?rev=1127229&r1=1127228&r2=1127229&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java Tue May 24 19:17:46 2011
@@ -24,6 +24,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLRecoverableException;
import java.sql.Statement;
import java.util.Random;
@@ -42,7 +43,7 @@ public class JDBCStatsPublisher implemen
private String connectionString;
private Configuration hiveconf;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
- private PreparedStatement selStmt, updStmt, insStmt;
+ private PreparedStatement updStmt, insStmt;
private int timeout; // default timeout in sec. for JDBC connection and statements
// SQL comment that identifies where the SQL statement comes from
private final String comment = "Hive stats publishing: " + this.getClass().getName();
@@ -70,19 +71,21 @@ public class JDBCStatsPublisher implemen
}
// prepare the SELECT/UPDATE/INSERT statements
- String select =
- "SELECT /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME +
- " FROM " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
String update =
- "UPDATE /* " + comment + " */ "+ JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " +
- " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ?";
+ "UPDATE /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " SET " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + "= ? " +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " = ? " +
+ " AND ? > ( SELECT TEMP." + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME
+ + " FROM ( " +
+ " SELECT " + JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + " FROM "
+ + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " WHERE " + JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME
+ + " = ?) TEMP )";
String insert =
- "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
- " VALUES (?, ?)";
+ "INSERT INTO /* " + comment + " */ " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME +
+ " VALUES (?, ?)";
DriverManager.setLoginTimeout(timeout); // stats is non-blocking
@@ -95,25 +98,24 @@ public class JDBCStatsPublisher implemen
}
};
- for (int failures = 0; ; failures++) {
+ for (int failures = 0;; failures++) {
try {
conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries);
// prepare statements
- selStmt = Utilities.prepareWithRetry(conn, select, waitWindow, maxRetries);
updStmt = Utilities.prepareWithRetry(conn, update, waitWindow, maxRetries);
insStmt = Utilities.prepareWithRetry(conn, insert, waitWindow, maxRetries);
// set query timeout
- Utilities.executeWithRetry(setQueryTimeout, selStmt, waitWindow, maxRetries);
Utilities.executeWithRetry(setQueryTimeout, updStmt, waitWindow, maxRetries);
Utilities.executeWithRetry(setQueryTimeout, insStmt, waitWindow, maxRetries);
return true;
+
} catch (SQLRecoverableException e) {
if (failures >= maxRetries) {
LOG.error("Error during JDBC connection to " + connectionString + ". ", e);
- return false; // just return false without fail the task
+ return false; // just return false without fail the task
}
long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
try {
@@ -143,13 +145,6 @@ public class JDBCStatsPublisher implemen
}
LOG.info("Stats publishing for key " + fileID + ". Value = " + value);
- Utilities.SQLCommand<ResultSet> execQuery = new Utilities.SQLCommand<ResultSet>() {
- @Override
- public ResultSet run(PreparedStatement stmt) throws SQLException {
- return stmt.executeQuery();
- }
- };
-
Utilities.SQLCommand<Void> execUpdate = new Utilities.SQLCommand<Void>() {
@Override
public Void run(PreparedStatement stmt) throws SQLException {
@@ -158,45 +153,40 @@ public class JDBCStatsPublisher implemen
}
};
- for (int failures = 0; ; failures++) {
+ for (int failures = 0;; failures++) {
try {
+ insStmt.setString(1, fileID);
+ insStmt.setString(2, value);
+ Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries);
+ return true;
+ } catch (SQLIntegrityConstraintViolationException e) {
+
+ // We assume that the table used for partial statistics has a primary key declared on the
+ // "fileID". The exception will be thrown if two tasks report results for the same fileID.
+ // In such case, we either update the row, or abandon changes depending on which statistic
+ // is newer.
- // Check to see if a previous task (mapper attempt) had published a previous stat
- selStmt.setString(1, fileID);
- ResultSet result = Utilities.executeWithRetry(execQuery, selStmt, waitWindow, maxRetries);
-
- if (result.next()) {
- long currval = result.getLong(1);
- // Only update if the previous value is smaller (i.e. the previous attempt was a fail and
- // hopefully this attempt is a success (as it has a greater value).
- if (currval < Long.parseLong(value)) {
+ for (int updateFailures = 0;; updateFailures++) {
+ try {
updStmt.setString(1, value);
updStmt.setString(2, fileID);
+ updStmt.setString(3, value);
+ updStmt.setString(4, fileID);
Utilities.executeWithRetry(execUpdate, updStmt, waitWindow, maxRetries);
+ return true;
+ } catch (SQLRecoverableException ue) {
+ // need to start from scratch (connection)
+ if (!handleSQLRecoverableException(ue, updateFailures)) {
+ return false;
+ }
+ } catch (SQLException ue) {
+ LOG.error("Error during publishing statistics. ", e);
+ return false;
}
- } else {
- // No previous attempts.
- insStmt.setString(1, fileID);
- insStmt.setString(2, value);
- Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries);
}
- return true;
} catch (SQLRecoverableException e) {
// need to start from scratch (connection)
- if (failures >= maxRetries) {
- return false;
- }
- // close the current connection
- closeConnection();
- long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
- try {
- Thread.sleep(waitTime);
- } catch (InterruptedException iex) {
- }
- // get a new connection
- if (!connect(hiveconf)) {
- // if cannot reconnect, just fail because connect() already handles retries.
- LOG.error("Error during publishing aggregation. " + e);
+ if (!handleSQLRecoverableException(e, failures)) {
return false;
}
} catch (SQLException e) {
@@ -206,6 +196,26 @@ public class JDBCStatsPublisher implemen
}
}
+ private boolean handleSQLRecoverableException(Exception e, int failures) {
+ if (failures >= maxRetries) {
+ return false;
+ }
+ // close the current connection
+ closeConnection();
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException iex) {
+ }
+ // get a new connection
+ if (!connect(hiveconf)) {
+ // if cannot reconnect, just fail because connect() already handles retries.
+ LOG.error("Error during publishing aggregation. " + e);
+ return false;
+ }
+ return true;
+ }
+
@Override
public boolean closeConnection() {
if (conn == null) {
@@ -218,20 +228,18 @@ public class JDBCStatsPublisher implemen
if (updStmt != null) {
updStmt.close();
}
- if (selStmt != null) {
- selStmt.close();
- }
conn.close();
// In case of derby, explicitly shutdown the database otherwise it reports error when
// trying to connect to the same JDBC connection string again.
- if(HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCLASS).equalsIgnoreCase("jdbc:derby")) {
+ if (HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCLASS).equalsIgnoreCase(
+ "jdbc:derby")) {
try {
- // The following closes the derby connection. It throws an exception that has to be caught and ignored.
+ // The following closes the derby connection. It throws an exception that has to be caught
+ // and ignored.
DriverManager.getConnection(connectionString + ";shutdown=true");
- }
- catch (Exception e) {
+ } catch (Exception e) {
// Do nothing because we know that an exception is thrown anyway.
}
}
@@ -265,9 +273,9 @@ public class JDBCStatsPublisher implemen
boolean tblExists = rs.next();
if (!tblExists) { // Table does not exist, create it
String createTable =
- "CREATE TABLE " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + " (" +
- JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " VARCHAR(255), " +
- JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + " BIGINT)";
+ "CREATE TABLE " + JDBCStatsSetupConstants.PART_STAT_TABLE_NAME + " (" +
+ JDBCStatsSetupConstants.PART_STAT_ID_COLUMN_NAME + " VARCHAR(255) PRIMARY KEY, " +
+ JDBCStatsSetupConstants.PART_STAT_ROW_COUNT_COLUMN_NAME + " BIGINT )";
stmt.executeUpdate(createTable);
stmt.close();
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java?rev=1127229&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisher.java Tue May 24 19:17:46 2011
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.stats.StatsAggregator;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * TestOperators.
+ *
+ */
+public class TestStatsPublisher extends TestCase {
+
+ protected Configuration conf;
+ protected String statsImplementationClass;
+
+ public TestStatsPublisher(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void setUp() {
+ System.out.println("StatPublisher Test");
+ conf = new JobConf(TestStatsPublisher.class);
+
+ statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+ StatsFactory.setImplementation(statsImplementationClass, conf);
+ }
+
+ public void testStatsPublisherOneStat() throws Throwable {
+ try {
+ System.out.println("StatsPublisher - one stat published per key - aggregating matching key");
+
+ // instantiate stats publisher
+ StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+ assertNotNull(statsPublisher);
+ assertTrue(statsPublisher.init(conf));
+ assertTrue(statsPublisher.connect(conf));
+
+ // instantiate stats aggregator
+ StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+ assertNotNull(statsAggregator);
+ assertTrue(statsAggregator.connect(conf));
+
+ // publish stats
+ assertTrue(statsPublisher.publishStat("file_00000", StatsSetupConst.ROW_COUNT, "200"));
+ assertTrue(statsPublisher.publishStat("file_00001", StatsSetupConst.ROW_COUNT, "400"));
+
+ // aggregate existing stats for prefixes
+ String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+ assertEquals("200", rows0);
+ String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+ assertEquals("400", rows1);
+
+ // close connections
+ assertTrue(statsPublisher.closeConnection());
+ assertTrue(statsAggregator.closeConnection());
+
+ System.out.println("StatsPublisher - one stat published per key - aggregating matching key - OK");
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+
+ public void testStatsPublisher() throws Throwable {
+ try {
+ System.out.println("StatsPublisher - basic functionality");
+
+ // instantiate stats publisher
+ StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+ assertNotNull(statsPublisher);
+ assertTrue(statsPublisher.init(conf));
+ assertTrue(statsPublisher.connect(conf));
+
+ // instantiate stats aggregator
+ StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+ assertNotNull(statsAggregator);
+ assertTrue(statsAggregator.connect(conf));
+
+ // publish stats
+ assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "200"));
+ assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "300"));
+ assertTrue(statsPublisher.publishStat("file_00001_a", StatsSetupConst.ROW_COUNT, "400"));
+ assertTrue(statsPublisher.publishStat("file_00001_b", StatsSetupConst.ROW_COUNT, "500"));
+
+ // aggregate existing stats for prefixes
+ String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+ assertEquals("500", rows0);
+ String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+ assertEquals("900", rows1);
+
+ // aggregate non-existent stats
+ String rowsX = statsAggregator.aggregateStats("file_00002", StatsSetupConst.ROW_COUNT);
+ assertEquals("0", rowsX);
+
+ // close connections
+ assertTrue(statsPublisher.closeConnection());
+ assertTrue(statsAggregator.closeConnection());
+
+ System.out.println("StatsPublisher - basic functionality - OK");
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public void testStatsPublisherMultipleUpdates() throws Throwable {
+ try {
+ System.out.println("StatsPublisher - multiple updates");
+
+ // instantiate stats publisher
+ StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+ assertNotNull(statsPublisher);
+ assertTrue(statsPublisher.init(conf));
+ assertTrue(statsPublisher.connect(conf));
+
+ // instantiate stats aggregator
+ StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+ assertNotNull(statsAggregator);
+ assertTrue(statsAggregator.connect(conf));
+
+ // publish stats
+ assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "200"));
+ assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "300"));
+ assertTrue(statsPublisher.publishStat("file_00001_a", StatsSetupConst.ROW_COUNT, "400"));
+ assertTrue(statsPublisher.publishStat("file_00001_b", StatsSetupConst.ROW_COUNT, "500"));
+
+ // repetitive update - should not change the stored value - as the published values are
+ // smaller than the current ones
+ assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "100"));
+ assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "150"));
+
+ // should change the stored value - the published values are greater than the current values
+ assertTrue(statsPublisher.publishStat("file_00001_a", StatsSetupConst.ROW_COUNT, "500"));
+ assertTrue(statsPublisher.publishStat("file_00001_b", StatsSetupConst.ROW_COUNT, "600"));
+
+ // aggregate stats
+ String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+ assertEquals("500", rows0);
+ String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+ assertEquals("1100", rows1);
+
+ // close connections
+ assertTrue(statsPublisher.closeConnection());
+ assertTrue(statsAggregator.closeConnection());
+
+ System.out.println("StatsPublisher - multiple updates - OK");
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public void testStatsPublisherAfterAggregationCleanUp() throws Throwable {
+ try {
+ System.out.println("StatsPublisher - clean-up after aggregation");
+
+ // instantiate stats publisher
+ StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+ assertNotNull(statsPublisher);
+ assertTrue(statsPublisher.init(conf));
+ assertTrue(statsPublisher.connect(conf));
+
+ // instantiate stats aggregator
+ StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+ assertNotNull(statsAggregator);
+ assertTrue(statsAggregator.connect(conf));
+
+ // publish stats
+ assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "200"));
+ assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "300"));
+ assertTrue(statsPublisher.publishStat("file_00001_a", StatsSetupConst.ROW_COUNT, "400"));
+ assertTrue(statsPublisher.publishStat("file_00001_b", StatsSetupConst.ROW_COUNT, "500"));
+
+ // aggregate stats
+ String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+ assertEquals("500", rows0);
+ String rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+ assertEquals("900", rows1);
+
+ // now the table should be empty
+ rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+ assertEquals("0", rows0);
+ rows1 = statsAggregator.aggregateStats("file_00001", StatsSetupConst.ROW_COUNT);
+ assertEquals("0", rows1);
+
+ // close connections
+ assertTrue(statsPublisher.closeConnection());
+ assertTrue(statsAggregator.closeConnection());
+
+ System.out.println("StatsPublisher - clean-up after aggregation - OK");
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public void testStatsAggregatorCleanUp() throws Throwable {
+ try {
+ System.out.println("StatsAggregator - clean-up");
+
+ // instantiate stats publisher
+ StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
+ assertNotNull(statsPublisher);
+ assertTrue(statsPublisher.init(conf));
+ assertTrue(statsPublisher.connect(conf));
+
+ // instantiate stats aggregator
+ StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+ assertNotNull(statsAggregator);
+ assertTrue(statsAggregator.connect(conf));
+
+ // publish stats
+ assertTrue(statsPublisher.publishStat("file_00000_a", StatsSetupConst.ROW_COUNT, "200"));
+ assertTrue(statsPublisher.publishStat("file_00000_b", StatsSetupConst.ROW_COUNT, "300"));
+
+ // cleanUp (closes the connection)
+ assertTrue(statsAggregator.cleanUp("file_00000"));
+
+ // now the connection should be closed (aggregator will report an error)
+ String rows0 = statsAggregator.aggregateStats("file_00000", StatsSetupConst.ROW_COUNT);
+ assertNull(rows0);
+
+ // close connections
+ assertTrue(statsPublisher.closeConnection());
+ assertTrue(statsAggregator.closeConnection());
+
+ System.out.println("StatsAggregator - clean-up - OK");
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+}