You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/03/25 00:28:29 UTC
[3/3] hive git commit: HIVE-11388 - Allow ACID Compactor components
to run in multiple metastores (Eugene Koifman, reviewed by Wei Zheng)
HIVE-11388 - Allow ACID Compactor components to run in multiple metastores (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/17870823
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/17870823
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/17870823
Branch: refs/heads/branch-1
Commit: 178708231e09bb2c08aa05cf9979efd6d3cd542c
Parents: c829505
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Mar 24 16:22:21 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Mar 24 16:22:21 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/common/ServerUtils.java | 14 ++
.../deployers/config/hive/hive-site.mysql.xml | 24 ++-
.../hive/metastore/txn/CompactionInfo.java | 4 +
.../metastore/txn/CompactionTxnHandler.java | 7 +-
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 20 ++-
.../hadoop/hive/metastore/txn/TxnHandler.java | 169 ++++++++++++++++++-
.../hadoop/hive/metastore/txn/TxnStore.java | 33 +++-
.../hadoop/hive/metastore/txn/TxnUtils.java | 4 +-
.../metastore/txn/ValidCompactorTxnList.java | 2 +-
.../hive/metastore/txn/TestTxnHandler.java | 93 ++++++++++
.../ql/txn/AcidCompactionHistoryService.java | 7 +
.../hive/ql/txn/AcidHouseKeeperService.java | 7 +
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 62 ++++++-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 19 ++-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 7 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 6 +
16 files changed, 445 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index a284f18..4141770 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
/**
* ServerUtils (specific to HiveServer version 1)
*/
@@ -47,4 +50,15 @@ public class ServerUtils {
}
}
+ /**
+ * @return name of current host
+ */
+ public static String hostname() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.error("Unable to resolve my host name " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
----------------------------------------------------------------------
diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
index b6f1ab7..387da6c 100644
--- a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
+++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
@@ -62,13 +62,14 @@
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
+
<property>
<name>hive.compactor.initiator.on</name>
- <value>false</value>
+ <value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
- <value>2</value>
+ <value>5</value>
</property>
<property>
<name>hive.timedout.txn.reaper.start</name>
@@ -81,9 +82,24 @@
-->
<property>
<name>hive.timedout.txn.reaper.interval</name>
- <value>30s</value>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.history.reaper.interval</name>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.cleaner.run.interval</name>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.check.interval</name>
+ <value>1s</value>
+ </property>
+ <property>
+ <name>hive.compactor.delta.num.threshold</name>
+ <value>2</value>
</property>
-
<!--end ACID related properties-->
<!--
<property>
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 73255d2..bea1473 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.metastore.txn;
import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -39,6 +40,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
public boolean tooManyAborts = false;
/**
* {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL)
+ * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition.
+ * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and
+ * {@link ValidCompactorTxnList#highWatermark}
*/
public long highestTxnId;
byte[] metaInfo;
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index f7c738a..cdff357 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -161,6 +161,8 @@ class CompactionTxnHandler extends TxnHandler {
try {
Connection dbConn = null;
Statement stmt = null;
+ //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
+ Statement updStmt = null;
ResultSet rs = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -174,6 +176,7 @@ class CompactionTxnHandler extends TxnHandler {
dbConn.rollback();
return null;
}
+ updStmt = dbConn.createStatement();
do {
CompactionInfo info = new CompactionInfo();
info.id = rs.getLong(1);
@@ -187,7 +190,7 @@ class CompactionTxnHandler extends TxnHandler {
"cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
" AND cq_state='" + INITIATED_STATE + "'";
LOG.debug("Going to execute update <" + s + ">");
- int updCount = stmt.executeUpdate(s);
+ int updCount = updStmt.executeUpdate(s);
if(updCount == 1) {
dbConn.commit();
return info;
@@ -211,6 +214,7 @@ class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
+ closeStmt(updStmt);
close(rs, stmt, dbConn);
}
} catch (RetryException e) {
@@ -627,6 +631,7 @@ class CompactionTxnHandler extends TxnHandler {
/**
* Record the highest txn id that the {@code ci} compaction job will pay attention to.
+ * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids.
*/
public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
Connection dbConn = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 42415ac..56c9ed8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -68,7 +68,7 @@ public final class TxnDbUtil {
Connection conn = null;
Statement stmt = null;
try {
- conn = getConnection();
+ conn = getConnection(true);
stmt = conn.createStatement();
stmt.execute("CREATE TABLE TXNS (" +
" TXN_ID bigint PRIMARY KEY," +
@@ -140,8 +140,13 @@ public final class TxnDbUtil {
" CC_HIGHEST_TXN_ID bigint," +
" CC_META_INFO varchar(2048) for bit data," +
" CC_HADOOP_JOB_ID varchar(32))");
-
- conn.commit();
+
+ stmt.execute("CREATE TABLE AUX_TABLE (" +
+ " MT_KEY1 varchar(128) NOT NULL," +
+ " MT_KEY2 bigint NOT NULL," +
+ " MT_COMMENT varchar(255)," +
+ " PRIMARY KEY(MT_KEY1, MT_KEY2)" +
+ ")");
} catch (SQLException e) {
try {
conn.rollback();
@@ -166,7 +171,7 @@ public final class TxnDbUtil {
Connection conn = null;
Statement stmt = null;
try {
- conn = getConnection();
+ conn = getConnection(true);
stmt = conn.createStatement();
// We want to try these, whether they succeed or fail.
@@ -185,7 +190,7 @@ public final class TxnDbUtil {
dropTable(stmt, "COMPACTION_QUEUE");
dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
dropTable(stmt, "COMPLETED_COMPACTIONS");
- conn.commit();
+ dropTable(stmt, "AUX_TABLE");
} finally {
closeResources(conn, stmt, null);
}
@@ -249,6 +254,9 @@ public final class TxnDbUtil {
}
static Connection getConnection() throws Exception {
+ return getConnection(false);
+ }
+ static Connection getConnection(boolean isAutoCommit) throws Exception {
HiveConf conf = new HiveConf();
String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
@@ -260,7 +268,7 @@ public final class TxnDbUtil {
prop.setProperty("user", user);
prop.setProperty("password", passwd);
Connection conn = driver.connect(driverUrl, prop);
- conn.setAutoCommit(false);
+ conn.setAutoCommit(isAutoCommit);
return conn;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 9789371..a3b0751 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -23,6 +23,8 @@ import com.jolbox.bonecp.BoneCPDataSource;
import org.apache.commons.dbcp.ConnectionFactory;
import org.apache.commons.dbcp.DriverManagerConnectionFactory;
import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.commons.logging.Log;
@@ -45,6 +47,8 @@ import javax.sql.DataSource;
import java.io.IOException;
import java.sql.*;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -87,7 +91,7 @@ import java.util.concurrent.locks.ReentrantLock;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-abstract class TxnHandler implements TxnStore {
+abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
static final protected char INITIATED_STATE = 'i';
static final protected char WORKING_STATE = 'w';
@@ -139,6 +143,12 @@ abstract class TxnHandler implements TxnStore {
* Derby specific concurrency control
*/
private static final ReentrantLock derbyLock = new ReentrantLock(true);
+ /**
+ * must be static since even in UT there may be > 1 instance of TxnHandler
+ * (e.g. via Compactor services)
+ */
+ private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
+ private static final String hostname = ServerUtils.hostname();
// Private methods should never catch SQLException and then throw MetaException. The public
// methods depend on SQLException coming back so they can detect and handle deadlocks. Private
@@ -563,7 +573,7 @@ abstract class TxnHandler implements TxnStore {
* @throws MetaException
*/
private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
- String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? "AND TXN_STATE=" + quoteChar(txnState) : "");
+ String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
ResultSet rs = stmt.executeQuery(addForUpdateClause(query));
if(rs.next()) {
return rs;
@@ -1437,14 +1447,14 @@ abstract class TxnHandler implements TxnStore {
}
}
- void rollbackDBConn(Connection dbConn) {
+ static void rollbackDBConn(Connection dbConn) {
try {
if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
} catch (SQLException e) {
LOG.warn("Failed to rollback db connection " + getMessage(e));
}
}
- protected void closeDbConn(Connection dbConn) {
+ protected static void closeDbConn(Connection dbConn) {
try {
if (dbConn != null && !dbConn.isClosed()) {
dbConn.close();
@@ -1458,7 +1468,7 @@ abstract class TxnHandler implements TxnStore {
* Close statement instance.
* @param stmt statement instance.
*/
- protected void closeStmt(Statement stmt) {
+ protected static void closeStmt(Statement stmt) {
try {
if (stmt != null && !stmt.isClosed()) stmt.close();
} catch (SQLException e) {
@@ -1470,7 +1480,7 @@ abstract class TxnHandler implements TxnStore {
* Close the ResultSet.
* @param rs may be {@code null}
*/
- void close(ResultSet rs) {
+ static void close(ResultSet rs) {
try {
if (rs != null && !rs.isClosed()) {
rs.close();
@@ -1484,7 +1494,7 @@ abstract class TxnHandler implements TxnStore {
/**
* Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
*/
- void close(ResultSet rs, Statement stmt, Connection dbConn) {
+ static void close(ResultSet rs, Statement stmt, Connection dbConn) {
close(rs);
closeStmt(stmt);
closeDbConn(dbConn);
@@ -2635,6 +2645,40 @@ abstract class TxnHandler implements TxnStore {
}
return false;
}
+ private boolean isDuplicateKeyError(SQLException ex) {
+ switch (dbProduct) {
+ case DERBY:
+ if("23505".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case MYSQL:
+ if(ex.getErrorCode() == 1022 && "23000".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case SQLSERVER:
+ //2627 is unique constaint violation incl PK, 2601 - unique key
+ if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case ORACLE:
+ if(ex.getErrorCode() == 1 && "23000".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ case POSTGRES:
+ //http://www.postgresql.org/docs/8.1/static/errcodes-appendix.html
+ if("23505".equals(ex.getSQLState())) {
+ return true;
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected DB type: " + dbProduct + "; " + getMessage(ex));
+ }
+ return false;
+ }
private static String getMessage(SQLException ex) {
return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
}
@@ -2709,4 +2753,115 @@ abstract class TxnHandler implements TxnStore {
derbyLock.unlock();
}
}
+ @Override
+ public MutexAPI getMutexAPI() {
+ return this;
+ }
+
+ @Override
+ public LockHandle acquireLock(String key) throws MetaException {
+ /**
+ * The implementation here is a bit kludgey but done so that code exercised by unit tests
+ * (which run against Derby which has no support for select for update) is as similar to
+ * production code as possible.
+ * In particular, with Derby we always run in a single process with a single metastore and
+ * the absence of For Update is handled via a Semaphore. The later would strictly speaking
+ * make the SQL statments below unnecessary (for Derby), but then they would not be tested.
+ */
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ String sqlStmt = addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0");
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("About to execute SQL: " + sqlStmt);
+ }
+ rs = stmt.executeQuery(sqlStmt);
+ if (!rs.next()) {
+ close(rs);
+ try {
+ stmt.executeUpdate("insert into AUX_TABLE(MT_KEY1,MT_KEY2) values(" + quoteString(key) + ", 0)");
+ dbConn.commit();
+ } catch (SQLException ex) {
+ if (!isDuplicateKeyError(ex)) {
+ throw new RuntimeException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex), ex);
+ }
+ }
+ rs = stmt.executeQuery(sqlStmt);
+ if (!rs.next()) {
+ throw new IllegalStateException("Unable to lock " + quoteString(key) + ". Expected row in AUX_TABLE is missing.");
+ }
+ }
+ Semaphore derbySemaphore = null;
+ if(dbProduct == DatabaseProduct.DERBY) {
+ derbyKey2Lock.putIfAbsent(key, new Semaphore(1));
+ derbySemaphore = derbyKey2Lock.get(key);
+ derbySemaphore.acquire();
+ }
+ LOG.info(quoteString(key) + " locked by " + quoteString(TxnHandler.hostname));
+ //OK, so now we have a lock
+ return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore);
+ } catch (SQLException ex) {
+ rollbackDBConn(dbConn);
+ close(rs, stmt, dbConn);
+ checkRetryable(dbConn, ex, "acquireLock(" + key + ")");
+ throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex));
+ }
+ catch(InterruptedException ex) {
+ rollbackDBConn(dbConn);
+ close(rs, stmt, dbConn);
+ throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + ex.getMessage() + StringUtils.stringifyException(ex));
+ }
+ finally {
+ unlockInternal();
+ }
+ }
+ catch(RetryException ex) {
+ acquireLock(key);
+ }
+ throw new MetaException("This can't happen because checkRetryable() has a retry limit");
+ }
+ public void acquireLock(String key, LockHandle handle) {
+ //the idea is that this will use LockHandle.dbConn
+ throw new NotImplementedException();
+ }
+ private static final class LockHandleImpl implements LockHandle {
+ private final Connection dbConn;
+ private final Statement stmt;
+ private final ResultSet rs;
+ private final Semaphore derbySemaphore;
+ private final List<String> keys = new ArrayList<>();
+ LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key, Semaphore derbySemaphore) {
+ this.dbConn = conn;
+ this.stmt = stmt;
+ this.rs = rs;
+ this.derbySemaphore = derbySemaphore;
+ if(derbySemaphore != null) {
+ //oterwise it may later release permit acquired by someone else
+ assert derbySemaphore.availablePermits() == 0 : "Expected locked Semaphore";
+ }
+ keys.add(key);
+ }
+ void addKey(String key) {
+ //keys.add(key);
+ //would need a list of (stmt,rs) pairs - 1 for each key
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void releaseLocks() {
+ rollbackDBConn(dbConn);
+ close(rs, stmt, dbConn);
+ if(derbySemaphore != null) {
+ derbySemaphore.release();
+ }
+ for(String key : keys) {
+ LOG.info(quoteString(key) + " unlocked by " + quoteString(TxnHandler.hostname));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 6fc6ed9..3aac11b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -74,6 +74,7 @@ import java.util.Set;
@InterfaceStability.Evolving
public interface TxnStore {
+ public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
// Compactor states (Should really be enum)
static final public String INITIATED_RESPONSE = "initiated";
static final public String WORKING_RESPONSE = "working";
@@ -355,10 +356,40 @@ public interface TxnStore {
*/
public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
-
@VisibleForTesting
public int numLocksInLockTable() throws SQLException, MetaException;
@VisibleForTesting
long setTimeout(long milliseconds);
+
+ public MutexAPI getMutexAPI();
+
+ /**
+ * This is primarily designed to provide coarse grained mutex support to operations running
+ * inside the Metastore (of which there could be several instances). The initial goal is to
+ * ensure that various sub-processes of the Compactor don't step on each other.
+ *
+ * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly.
+ */
+ public static interface MutexAPI {
+ /**
+ * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns
+ * a handle which must be used to release the lock. Each invocation returns a new handle.
+ */
+ public LockHandle acquireLock(String key) throws MetaException;
+
+ /**
+ * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This
+ * will associate the lock on {@code key} with the same handle. All locks associated with
+ * the same handle will be released together.
+ * @param handle not NULL
+ */
+ public void acquireLock(String key, LockHandle handle) throws MetaException;
+ public static interface LockHandle {
+ /**
+ * Releases all locks associcated with this handle.
+ */
+ public void releaseLocks();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index f60e34b..4c14eef 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -75,8 +75,8 @@ public class TxnUtils {
int i = 0;
for (TxnInfo txn : txns.getOpen_txns()) {
if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
- exceptions[i++] = txn.getId();
- }
+ exceptions[i++] = txn.getId();//todo: only add Aborted
+ }//remove all exceptions < minOpenTxn
highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
return new ValidCompactorTxnList(exceptions, -1, highWater);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
index 648fd49..30bdfa7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
import java.util.Arrays;
/**
- * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
+ * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
* For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
* is committed or aborted. Additionally it will return none if there are any open transactions
* below the max transaction given, since we don't want to compact above open transactions. For
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index b8cab71..6033c15 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.metastore.api.*;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.*;
+import org.apache.hadoop.util.StringUtils;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -33,6 +34,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
@@ -1214,6 +1216,97 @@ public class TestTxnHandler {
}
}
+ /**
+ * This cannnot be run against Derby (thus in UT) but it can run againt MySQL.
+ * 1. add to metastore/pom.xml
+ * <dependency>
+ * <groupId>mysql</groupId>
+ * <artifactId>mysql-connector-java</artifactId>
+ * <version>5.1.30</version>
+ * </dependency>
+ * 2. Hack in the c'tor of this class
+ * conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore");
+ * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive");
+ * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive");
+ * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
+ * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack()
+ *
+ */
+ @Ignore("multiple threads wedge Derby")
+ @Test
+ public void testMutexAPI() throws Exception {
+ final TxnStore.MutexAPI api = txnHandler.getMutexAPI();
+ final AtomicInteger stepTracker = new AtomicInteger(0);
+ /**
+ * counter = 0;
+ * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock
+ * Thread2 counter=2, lock (and block), inc counter, should be 4
+ */
+ Thread t1 = new Thread("MutexTest1") {
+ public void run() {
+ try {
+ stepTracker.incrementAndGet();//now 1
+ TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+ Thread.sleep(4000);
+ //stepTracker should now be 2 which indicates t2 has started
+ Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get());
+ stepTracker.incrementAndGet();//now 3
+ handle.releaseLocks();
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ }
+ };
+ t1.setDaemon(true);
+ ErrorHandle ueh1 = new ErrorHandle();
+ t1.setUncaughtExceptionHandler(ueh1);
+ Thread t2 = new Thread("MutexTest2") {
+ public void run() {
+ try {
+ stepTracker.incrementAndGet();//now 2
+ //this should block until t1 unlocks
+ TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+ stepTracker.incrementAndGet();//now 4
+ Assert.assertEquals(4, stepTracker.get());
+ handle.releaseLocks();
+ stepTracker.incrementAndGet();//now 5
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ }
+ };
+ t2.setDaemon(true);
+ ErrorHandle ueh2 = new ErrorHandle();
+ t2.setUncaughtExceptionHandler(ueh2);
+ t1.start();
+ try {
+ Thread.sleep(1000);
+ }
+ catch(InterruptedException ex) {
+ LOG.info("Sleep was interrupted");
+ }
+ t2.start();
+ t1.join(6000);//so that test doesn't block
+ t2.join(6000);
+
+ if(ueh1.error != null) {
+ Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false);
+ }
+ if (ueh2.error != null) {
+ Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false);
+ }
+ Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get());
+ }
+ private final static class ErrorHandle implements Thread.UncaughtExceptionHandler {
+ Throwable error = null;
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage());
+ error = e;
+ }
+ }
private void updateTxns(Connection conn) throws SQLException {
Statement stmt = conn.createStatement();
stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
index 59c8fe4..5d9e7be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -61,7 +61,9 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
@Override
public void run() {
+ TxnStore.MutexAPI.LockHandle handle = null;
try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name());
long startTime = System.currentTimeMillis();
txnHandler.purgeCompactionHistory();
int count = isAliveCounter.incrementAndGet();
@@ -70,6 +72,11 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
catch(Throwable t) {
LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
}
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index de74a7b..f39df17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -61,7 +61,9 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
}
@Override
public void run() {
+ TxnStore.MutexAPI.LockHandle handle = null;
try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name());
long startTime = System.currentTimeMillis();
txnHandler.performTimeOuts();
int count = isAliveCounter.incrementAndGet();
@@ -70,6 +72,11 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
catch(Throwable t) {
LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(), t);
}
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 33580fd..1e6e8a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -72,11 +73,13 @@ public class Cleaner extends CompactorThread {
// and if so remembers that and then sets it to true at the end. We have to check here
// first to make sure we go through a complete iteration of the loop before resetting it.
boolean setLooped = !looped.get();
- long startedAt = System.currentTimeMillis();
+ TxnStore.MutexAPI.LockHandle handle = null;
+ long startedAt = -1;
// Make sure nothing escapes this run method and kills the metastore at large,
// so wrap it in a big catch Throwable statement.
try {
-
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+ startedAt = System.currentTimeMillis();
// First look for all the compactions that are waiting to be cleaned. If we have not
// seen an entry before, look for all the locks held on that table or partition and
// record them. We will then only clean the partition once all of those locks have been
@@ -86,6 +89,31 @@ public class Cleaner extends CompactorThread {
// done the compaction will read the more up to date version of the data (either in a
// newer delta or in a newer base).
List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+ {
+ /**
+ * Since there may be more than 1 instance of Cleaner running we may have state info
+ * for items which were cleaned by instances. Here we remove them.
+ *
+ * In the long run if we add end_time to compaction_queue, then we can check that
+ * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case
+ * we know the lock owner is reading files created by this compaction or later.
+ * The advantage is that we don't have to store the locks.
+ */
+ Set<Long> currentToCleanSet = new HashSet<>();
+ for (CompactionInfo ci : toClean) {
+ currentToCleanSet.add(ci.id);
+ }
+ Set<Long> cleanPerformedByOthers = new HashSet<>();
+ for (long id : compactId2CompactInfoMap.keySet()) {
+ if (!currentToCleanSet.contains(id)) {
+ cleanPerformedByOthers.add(id);
+ }
+ }
+ for (long id : cleanPerformedByOthers) {
+ compactId2CompactInfoMap.remove(id);
+ compactId2LockMap.remove(id);
+ }
+ }
if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
@@ -119,6 +147,7 @@ public class Cleaner extends CompactorThread {
// Remember to remove this when we're out of the loop,
// we can't do it in the loop or we'll get a concurrent modification exception.
compactionsCleaned.add(queueEntry.getKey());
+ //Future thought: this may be expensive so consider having a thread pool run in parallel
clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
} else {
// Remove the locks we didn't see so we don't look for them again next time
@@ -140,6 +169,11 @@ public class Cleaner extends CompactorThread {
LOG.error("Caught an exception in the main loop of compactor cleaner, " +
StringUtils.stringifyException(t));
}
+ finally {
+ if (handle != null) {
+ handle.releaseLocks();
+ }
+ }
if (setLooped) {
looped.set(true);
}
@@ -206,10 +240,24 @@ public class Cleaner extends CompactorThread {
StorageDescriptor sd = resolveStorageDescriptor(t, p);
final String location = sd.getLocation();
- // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open
- // transactions. This assures that all deltas are treated as valid and all we return are
- // obsolete files.
- final ValidTxnList txnList = new ValidReadTxnList();
+ /**
+ * Each Compaction only compacts as far as the highest txn id such that all txns below it
+ * are resolved (i.e. not opened). This is what "highestTxnId" tracks. This is only tracked
+ * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorTxnList and uses for more info.
+ *
+ * We only want to clean up to the highestTxnId - otherwise we risk deleteing deltas from
+ * under an active reader.
+ *
+ * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a
+ * clean request for D2.
+ * Cleaner checks existing locks and finds none.
+ * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction
+ * completes which creates D4.
+ * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete
+ * unless ValidTxnList is "capped" at highestTxnId.
+ */
+ final ValidTxnList txnList = ci.highestTxnId > 0 ?
+ new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList();
if (runJobAsSelf(ci.runAs)) {
removeFiles(location, txnList);
@@ -249,7 +297,7 @@ public class Cleaner extends CompactorThread {
FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
for (Path dead : filesToDelete) {
- LOG.debug("Doing to delete path " + dead.toString());
+ LOG.debug("Going to delete path " + dead.toString());
fs.delete(dead, true);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 1898a4d..0e4ba06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -74,11 +74,15 @@ public class Initiator extends CompactorThread {
// much easier. The stop value is only for testing anyway and not used when called from
// HiveMetaStore.
do {
- long startedAt = System.currentTimeMillis();
+ long startedAt = -1;
+ TxnStore.MutexAPI.LockHandle handle = null;
// Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
// don't doom the entire thread.
- try {//todo: add method to only get current i.e. skip history - more efficient
+ try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
+ startedAt = System.currentTimeMillis();
+ //todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
ValidTxnList txns =
TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
@@ -114,6 +118,8 @@ public class Initiator extends CompactorThread {
// Check if we already have initiated or are working on a compaction for this partition
// or table. If so, skip it. If we are just waiting on cleaning we can still check,
// as it may be time to compact again even though we haven't cleaned.
+ //todo: this is not robust. You can easily run Alter Table to start a compaction between
+ //the time currentCompactions is generated and now
if (lookForCurrentCompactions(currentCompactions, ci)) {
LOG.debug("Found currently initiated or working compaction for " +
ci.getFullPartitionName() + " so we will not initiate another compaction");
@@ -134,7 +140,9 @@ public class Initiator extends CompactorThread {
}
StorageDescriptor sd = resolveStorageDescriptor(t, p);
String runAs = findUserToRunAs(sd.getLocation(), t);
-
+ /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
+ * Long term we should consider having a thread pool here and running checkForCompactionS
+ * in parallel*/
CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs);
if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
} catch (Throwable t) {
@@ -154,6 +162,11 @@ public class Initiator extends CompactorThread {
LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
StringUtils.stringifyException(t));
}
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
long elapsedTime = System.currentTimeMillis() - startedAt;
if (elapsedTime >= checkInterval || stop.get()) continue;
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 6f8dc35..8394ec6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -229,7 +229,6 @@ public class TestTxnCommands2 {
}
Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true);
}
- @Ignore("alter table")
@Test
public void testAlterTable() throws Exception {
int[][] tableData = {{1,2}};
@@ -604,7 +603,13 @@ public class TestTxnCommands2 {
private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
int lastCount = houseKeeperService.getIsAliveCounter();
houseKeeperService.start(conf);
+ int maxIter = 10;
+ int iterCount = 0;
while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+ if(iterCount++ >= maxIter) {
+ //prevent test hangs
+ throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits");
+ }
try {
Thread.sleep(100);//make sure it has run at least once
}
http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 99705b4..b355dbe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -194,7 +194,13 @@ public class TestDbTxnManager {
private void runReaper() throws Exception {
int lastCount = houseKeeperService.getIsAliveCounter();
houseKeeperService.start(conf);
+ int maxIter = 10;
+ int iterCount = 0;
while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+ if(iterCount++ >= maxIter) {
+ //prevent test hangs
+ throw new IllegalStateException("Reaper didn't run after " + iterCount + " waits");
+ }
try {
Thread.sleep(100);//make sure it has run at least once
}