You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/03/20 05:26:27 UTC
[impala] 01/03: IMPALA-9357: Fix race condition in alter_database
event
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1d27b91a36f687f958b02738ba7899652b2cfec7
Author: Vihang Karajgaonkar <vi...@cloudera.com>
AuthorDate: Thu Feb 6 13:43:17 2020 -0800
IMPALA-9357: Fix race condition in alter_database event
The race condition is exposed intermittently, on certain builds which
causes test_event_processing::test_self_events test to fail. This
happens because we are checking for self-event identifiers in the Db
object without taking a lock. When a DDL like 'comment on
database is 'test'' is executed, it is possible that the event
processor thread is triggered as soon as the ALTER_DATABASE event is
generated. This may cause event processor fail the self-event detection
since the self-event identifiers are not yet added to the Db object.
The fix adds a Db lock similar to Table lock. Alter db operations
in CatalogOpExecutor now take db locks instead of metastoreDdlLock_
which makes it consistent with table locking protocol.
Testing:
1. Ran existing tests for events processor.
2. This test was failing on centos6 frequently (failed in 1/3 times).
After the fix I ran the test in a loop for 24 hrs (197 iterations) and
the test didn't fail.
3. Ran core tests with CDP and CDH builds.
Change-Id: I472fd8a55740769ee5cdb84e48422a4ab39a8d1e
Reviewed-on: http://gerrit.cloudera.org:8080/15260
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../impala/catalog/CatalogServiceCatalog.java | 138 +++++++++++--------
fe/src/main/java/org/apache/impala/catalog/Db.java | 21 +--
.../org/apache/impala/catalog/HdfsPartition.java | 20 +--
.../main/java/org/apache/impala/catalog/Table.java | 14 +-
.../apache/impala/service/CatalogOpExecutor.java | 148 ++++++++++++++-------
tests/custom_cluster/test_concurrent_ddls.py | 34 +++--
tests/custom_cluster/test_event_processing.py | 11 +-
7 files changed, 241 insertions(+), 145 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index b66d5ee..fb7ef4d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -192,9 +191,9 @@ public class CatalogServiceCatalog extends Catalog {
private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
// Timeout for acquiring a table lock
// TODO: Make this configurable
- private static final long TBL_LOCK_TIMEOUT_MS = 7200000;
+ private static final long LOCK_RETRY_TIMEOUT_MS = 7200000;
// Time to sleep before retrying to acquire a table lock
- private static final int TBL_LOCK_RETRY_MS = 10;
+ private static final int LOCK_RETRY_DELAY_MS = 10;
private final TUniqueId catalogServiceId_;
@@ -402,8 +401,9 @@ public class CatalogServiceCatalog extends Catalog {
/**
* Tries to acquire versionLock_ and the lock of 'tbl' in that order. Returns true if it
- * successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held
- * when the function returns. Returns false otherwise and no lock is held in this case.
+ * successfully acquires both within LOCK_RETRY_TIMEOUT_MS millisecs; both locks are
+ * held when the function returns. Returns false otherwise and no lock is held in
+ * this case.
*/
public boolean tryLockTable(Table tbl) {
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
@@ -423,12 +423,45 @@ public class CatalogServiceCatalog extends Catalog {
versionLock_.writeLock().unlock();
try {
// Sleep to avoid spinning and allow other operations to make progress.
- Thread.sleep(TBL_LOCK_RETRY_MS);
+ Thread.sleep(LOCK_RETRY_DELAY_MS);
} catch (InterruptedException e) {
// ignore
}
end = System.currentTimeMillis();
- } while (end - begin < TBL_LOCK_TIMEOUT_MS);
+ } while (end - begin < LOCK_RETRY_TIMEOUT_MS);
+ return false;
+ }
+ }
+
+ /**
+ * Similar to tryLock on a table, but works on a database object instead of Table.
+ * TODO: Refactor the code so that both table and db can be "lockable" using a single
+ * method.
+ */
+ public boolean tryLockDb(Db db) {
+ try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
+ "Attempting to lock database " + db.getName())) {
+ long begin = System.currentTimeMillis();
+ long end;
+ do {
+ versionLock_.writeLock().lock();
+ if (db.getLock().tryLock()) {
+ if (LOG.isTraceEnabled()) {
+ end = System.currentTimeMillis();
+ LOG.trace(String.format("Lock for db %s was acquired in %d msec",
+ db.getName(), end - begin));
+ }
+ return true;
+ }
+ versionLock_.writeLock().unlock();
+ try {
+ // Sleep to avoid spinning and allow other operations to make progress.
+ Thread.sleep(LOCK_RETRY_DELAY_MS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ end = System.currentTimeMillis();
+ } while (end - begin < LOCK_RETRY_TIMEOUT_MS);
return false;
}
}
@@ -781,42 +814,6 @@ public class CatalogServiceCatalog extends Catalog {
}
/**
- * Gets the list of versions for in-flight events for the given table. Applicable
- * only when external event processing is enabled.
- * @param dbName database name
- * @param tblName table name
- * @return List of previous version numbers for in-flight events on this table.
- * If table is not laoded returns a empty list. If event processing is disabled,
- * returns a empty list
- */
- public List<Long> getInFlightVersionsForEvents(String dbName, String tblName)
- throws DatabaseNotFoundException, TableNotFoundException {
- Preconditions.checkState(isEventProcessingActive(),
- "Event processing should be enabled before calling this method");
- List<Long> result = Collections.EMPTY_LIST;
- versionLock_.readLock().lock();
- try {
- Db db = getDb(dbName);
- if (db == null) {
- throw new DatabaseNotFoundException(
- String.format("Database %s not found", dbName));
- }
- if (tblName == null) {
- return db.getVersionsForInflightEvents();
- }
- Table tbl = getTable(dbName, tblName);
- if (tbl == null) {
- throw new TableNotFoundException(
- String.format("Table %s not found", new TableName(dbName, tblName)));
- }
- if (tbl instanceof IncompleteTable) return result;
- return tbl.getVersionsForInflightEvents();
- } finally {
- versionLock_.readLock().unlock();
- }
- }
-
- /**
* Evaluates if the information from an event (serviceId and versionNumber) matches to
* the catalog object. If there is match, the in-flight version for that object is
* removed and method returns true. If it does not match, returns false
@@ -830,20 +827,45 @@ public class CatalogServiceCatalog extends Catalog {
"Event processing should be enabled when calling this method");
long versionNumber = ctx.getVersionNumberFromEvent();
String serviceIdFromEvent = ctx.getServiceIdFromEvent();
+ LOG.debug("Input arguments for self-event evaluation: {} {}",versionNumber,
+ serviceIdFromEvent);
// no version info or service id in the event
- if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) return false;
+ if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
+ LOG.info("Not a self-event since the given version is {} and service id is {}",
+ versionNumber, serviceIdFromEvent);
+ return false;
+ }
// if the service id from event doesn't match with our service id this is not a
// self-event
- if (!getCatalogServiceId().equals(serviceIdFromEvent)) return false;
+ if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
+ LOG.info("Not a self-event because service id of this catalog {} does not match "
+ + "with one in event {}.", getCatalogServiceId(), serviceIdFromEvent);
+ return false;
+ }
Db db = getDb(ctx.getDbName());
if (db == null) {
throw new DatabaseNotFoundException("Database " + ctx.getDbName() + " not found");
}
// if the given tblName is null we look db's in-flight events
if (ctx.getTblName() == null) {
- return db.removeFromVersionsForInflightEvents(versionNumber);
+ //TODO use read/write locks for both table and db
+ if (!tryLockDb(db)) {
+ throw new CatalogException("Could not acquire lock on database object " +
+ db.getName());
+ }
+ versionLock_.writeLock().unlock();
+ try {
+ boolean removed = db.removeFromVersionsForInflightEvents(versionNumber);
+ if (!removed) {
+ LOG.info("Could not find version {} in the in-flight event list of database "
+ + "{}", versionNumber, db.getName());
+ }
+ return removed;
+ } finally {
+ db.getLock().unlock();
+ }
}
- Table tbl = getTable(ctx.getDbName(), ctx.getTblName());
+ Table tbl = db.getTable(ctx.getTblName());
if (tbl == null) {
throw new TableNotFoundException(
String.format("Table %s.%s not found", ctx.getDbName(), ctx.getTblName()));
@@ -859,7 +881,12 @@ public class CatalogServiceCatalog extends Catalog {
List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues();
// if the partitionKeyValues is null, we look for tbl's in-flight events
if (partitionKeyValues == null) {
- return tbl.removeFromVersionsForInflightEvents(versionNumber);
+ boolean removed = tbl.removeFromVersionsForInflightEvents(versionNumber);
+ if (!removed) {
+ LOG.info("Could not find version {} in in-flight event list of table {}",
+ versionNumber, tbl.getFullName());
+ }
+ return removed;
}
if (tbl instanceof HdfsTable) {
List<String> failingPartitions = new ArrayList<>();
@@ -872,8 +899,11 @@ public class CatalogServiceCatalog extends Catalog {
// should clean up the self-event state on the rest of the partitions
String partName = HdfsTable.constructPartitionName(partitionKeyValue);
if (hdfsPartition == null) {
- LOG.warn(String.format("Partition %s not found during self-event "
- + "evaluation for the table %s", partName, tbl.getFullName()));
+ LOG.info("Partition {} not found during self-event "
+ + "evaluation for the table {}", partName, tbl.getFullName());
+ } else {
+ LOG.info("Could not find {} in in-flight event list of the partition {} "
+ + "of table {}", versionNumber, partName, tbl.getFullName());
}
failingPartitions.add(partName);
}
@@ -895,11 +925,9 @@ public class CatalogServiceCatalog extends Catalog {
*/
public void addVersionsForInflightEvents(Table tbl, long versionNumber) {
if (!isEventProcessingActive()) return;
- // we generally don't take locks on Incomplete tables since they are atomically
- // replaced during load
- Preconditions.checkState(
- tbl instanceof IncompleteTable || tbl.getLock().isHeldByCurrentThread());
tbl.addToVersionsForInflightEvents(versionNumber);
+ LOG.info("Added catalog version {} in table's {} in-flight events",
+ versionNumber, tbl.getFullName());
}
/**
@@ -911,6 +939,8 @@ public class CatalogServiceCatalog extends Catalog {
*/
public void addVersionsForInflightEvents(Db db, long versionNumber) {
if (!isEventProcessingActive()) return;
+ LOG.info("Added catalog version {} in database's {} in-flight events",
+ versionNumber, db.getName());
db.addToVersionsForInflightEvents(versionNumber);
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 4bfd070..330227c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -21,10 +21,10 @@ import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.impala.analysis.ColumnDef;
@@ -100,6 +100,10 @@ public class Db extends CatalogObjectImpl implements FeDb {
// tracks the in-flight metastore events for this db
private final InFlightEvents inFlightEvents_ = new InFlightEvents();
+ // lock to make sure modifications to the Db object are atomically done along with
+ // its associated HMS operation (eg. alterDbOwner or commentOnDb)
+ private final ReentrantLock dbLock_ = new ReentrantLock();
+
public Db(String name, org.apache.hadoop.hive.metastore.api.Database msDb) {
setMetastoreDb(name, msDb);
tableCache_ = new CatalogObjectCache<>();
@@ -139,6 +143,8 @@ public class Db extends CatalogObjectImpl implements FeDb {
return msDb.getParameters().remove(k) != null;
}
+ public ReentrantLock getLock() { return dbLock_; }
+
@Override // FeDb
public boolean isSystemDb() { return isSystemDb_; }
@Override // FeDb
@@ -497,18 +503,14 @@ public class Db extends CatalogObjectImpl implements FeDb {
}
/**
- * Gets the current list of versions for in-flight events for this database
- */
- public List<Long> getVersionsForInflightEvents() {
- return inFlightEvents_.getAll();
- }
-
- /**
* Removes a given version from the collection of version numbers for in-flight events
* @param versionNumber version number to remove from the collection
* @return true if version was successfully removed, false if didn't exist
*/
public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+ Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
+ "removeFromVersionsForInflightEvents called without getting the db lock for "
+ + getName() + " database.");
return inFlightEvents_.remove(versionNumber);
}
@@ -520,6 +522,9 @@ public class Db extends CatalogObjectImpl implements FeDb {
* @param versionNumber version number to add
*/
public void addToVersionsForInflightEvents(long versionNumber) {
+ Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
+ "addToVersionsForInFlightEvents called without getting the db lock for "
+ + getName() + " database.");
if (!inFlightEvents_.add(versionNumber)) {
LOG.warn(String.format("Could not add version %s to the list of in-flight "
+ "events. This could cause unnecessary database %s invalidation when the "
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index d0664df..45a0a83 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -851,18 +851,14 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
}
/**
- * Gets the current list of versions for in-flight events for this partition
- */
- public List<Long> getVersionsForInflightEvents() {
- return inFlightEvents_.getAll();
- }
-
- /**
* Removes a given version from the in-flight events
* @param versionNumber version number to remove
* @return true if the versionNumber was removed, false if it didn't exist
*/
public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+ Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+ "removeFromVersionsForInflightEvents called without holding the table lock on "
+ + "partition " + getPartitionName() + " of table " + table_.getFullName());
return inFlightEvents_.remove(versionNumber);
}
@@ -871,6 +867,9 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
* @param versionNumber version number to add
*/
public void addToVersionsForInflightEvents(long versionNumber) {
+ Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+ "addToVersionsForInflightEvents called without holding the table lock on "
+ + "partition " + getPartitionName() + " of table " + table_.getFullName());
if (!inFlightEvents_.add(versionNumber)) {
LOG.warn(String.format("Could not add %s version to the partition %s of table %s. "
+ "This could cause unnecessary refresh of the partition when the event is"
@@ -881,17 +880,22 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
/**
* Adds the version from the given Partition parameters. No-op if the parameters does
- * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>
+ * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>. This is
+ * done to detect add partition events from this catalog which are generated when
+ * partitions are added or recovered.
*/
private void addInflightVersionsFromParameters() {
Preconditions.checkNotNull(hmsParameters_);
Preconditions.checkState(inFlightEvents_.size() == 0);
+ // we should not check for table lock being held here since there are certain code
+ // paths which call this method without holding the table lock (eg. getOrLoadTable())
if (!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
return;
}
inFlightEvents_.add(Long.parseLong(
hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
}
+
/**
* Marks this partition's metadata as "dirty" indicating that changes have been
* made and this partition's metadata should not be reused during the next
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 6107a12..db9fe3e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -794,18 +794,14 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
}
/**
- * Gets the current list of versions for in-flight events for this table
- */
- public List<Long> getVersionsForInflightEvents() {
- return inFlightEvents.getAll();
- }
-
- /**
* Removes a given version from the collection of version numbers for in-flight events
* @param versionNumber version number to remove from the collection
* @return true if version was successfully removed, false if didn't exist
*/
public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+ Preconditions.checkState(tableLock_.isHeldByCurrentThread(),
+ "removeFromVersionsForInFlightEvents called without taking the table lock on "
+ + getFullName());
return inFlightEvents.remove(versionNumber);
}
@@ -819,6 +815,10 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
* capacity
*/
public void addToVersionsForInflightEvents(long versionNumber) {
+ // we generally don't take locks on Incomplete tables since they are atomically
+ // replaced during load
+ Preconditions.checkState(
+ this instanceof IncompleteTable || tableLock_.isHeldByCurrentThread());
if (!inFlightEvents.add(versionNumber)) {
LOG.warn(String.format("Could not add %s version to the table %s. This could "
+ "cause unnecessary refresh of the table when the event is received by the "
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 92d8820..098f2e9 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -216,21 +216,27 @@ import org.slf4j.LoggerFactory;
* updates, DDL operations should not directly modify the HMS objects of the catalog
* objects but should operate on copies instead.
*
- * The CatalogOpExecutor uses table-level locking to protect table metadata during
- * concurrent modifications and is responsible for assigning a new catalog version when
- * a table is modified (e.g. alterTable()).
+ * The CatalogOpExecutor uses table-level or Db object level locking to protect table
+ * metadata or database metadata respectively during concurrent modifications and is
+ * responsible for assigning a new catalog version when a table/Db is modified
+ * (e.g. alterTable() or alterDb()).
*
* The following locking protocol is employed to ensure that modifying
- * the table metadata and assigning a new catalog version is performed atomically and
+ * the table/Db metadata and assigning a new catalog version is performed atomically and
* consistently in the presence of concurrent DDL operations. The following pattern
* ensures that the catalog lock is never held for a long period of time, preventing
- * other DDL operations from making progress. This pattern only applies to single-table
+ * other DDL operations from making progress. This pattern only applies to single-table/Db
* update operations and requires the use of fair table locks to prevent starvation.
+ * Additionally, this locking protocol is also followed in case of CREATE/DROP
+ * FUNCTION. In case of CREATE/DROP FUNCTION, we take the Db object lock since
+ * certain FUNCTION are stored in the HMS database parameters. Using this approach
+ * also makes sure that adding or removing functions from different databases do not
+ * block each other.
*
* DO {
* Acquire the catalog lock (see CatalogServiceCatalog.versionLock_)
- * Try to acquire a table lock
- * IF the table lock acquisition fails {
+ * Try to acquire a table/Db lock
+ * IF the table/Db lock acquisition fails {
* Release the catalog lock
* YIELD()
* ELSE
@@ -241,15 +247,15 @@ import org.slf4j.LoggerFactory;
*
* Increment and get a new catalog version
* Release the catalog lock
- * Modify table metadata
- * Release table lock
+ * Modify table/Db metadata
+ * Release table/Db lock
*
* Note: The getCatalogObjects() function is the only case where this locking pattern is
* not used since it accesses multiple catalog entities in order to compute a snapshot
* of catalog metadata.
*
- * Operations that CREATE/DROP catalog objects such as tables and databases employ the
- * following locking protocol:
+ * Operations that CREATE/DROP catalog objects such as tables and databases
+ * (except for functions, see above) employ the following locking protocol:
* 1. Acquire the metastoreDdlLock_
* 2. Update the Hive Metastore
* 3. Increment and get a new catalog version
@@ -257,6 +263,7 @@ import org.slf4j.LoggerFactory;
* 5. Grant/revoke owner privilege if authorization with ownership is enabled.
* 6. Release the metastoreDdlLock_
*
+ *
* It is imperative that other operations that need to hold both the catalog lock and
* table locks at the same time follow the same locking protocol and acquire these
* locks in that particular order. Also, operations that modify table metadata
@@ -1356,21 +1363,23 @@ public class CatalogOpExecutor {
}
boolean isPersistentJavaFn =
(fn.getBinaryType() == TFunctionBinaryType.JAVA) && fn.isPersistent();
- synchronized (metastoreDdlLock_) {
- Db db = catalog_.getDb(fn.dbName());
- if (db == null) {
- throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
- }
- // Get a new catalog version to assign to the database being altered. This is
- // needed for events processor as this method creates alter database events.
- long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
- addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+ Db db = catalog_.getDb(fn.dbName());
+ if (db == null) {
+ throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
+ }
+
+ tryLock(db, "creating function " + fn.getClass().getSimpleName());
+ // Get a new catalog version to assign to the database being altered. This is
+ // needed for events processor as this method creates alter database events.
+ long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+ catalog_.getLock().writeLock().unlock();
+ try {
// Search for existing functions with the same name or signature that would
// conflict with the function being added.
- for (Function function: db.getFunctions(fn.functionName())) {
+ for (Function function : db.getFunctions(fn.functionName())) {
if (isPersistentJavaFn || (function.isPersistent() &&
(function.getBinaryType() == TFunctionBinaryType.JAVA)) ||
- function.compare(fn, Function.CompareMode.IS_INDISTINGUISHABLE)) {
+ function.compare(fn, Function.CompareMode.IS_INDISTINGUISHABLE)) {
if (!params.if_not_exists) {
throw new CatalogException("Function " + fn.functionName() +
" already exists.");
@@ -1386,15 +1395,16 @@ public class CatalogOpExecutor {
// the corresponding Jar and add each signature to the catalog.
Preconditions.checkState(fn instanceof ScalarFunction);
org.apache.hadoop.hive.metastore.api.Function hiveFn =
- ((ScalarFunction)fn).toHiveFunction();
+ ((ScalarFunction) fn).toHiveFunction();
List<Function> funcs = FunctionUtils.extractFunctions(fn.dbName(), hiveFn,
BackendConfig.INSTANCE.getBackendCfg().local_library_path);
if (funcs.isEmpty()) {
throw new CatalogException(
- "No compatible function signatures found in class: " + hiveFn.getClassName());
+ "No compatible function signatures found in class: " + hiveFn
+ .getClassName());
}
if (addJavaFunctionToHms(fn.dbName(), hiveFn, params.if_not_exists)) {
- for (Function addedFn: funcs) {
+ for (Function addedFn : funcs) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Adding function: %s.%s", addedFn.dbName(),
addedFn.signatureString()));
@@ -1404,7 +1414,14 @@ public class CatalogOpExecutor {
}
}
} else {
+ //TODO(Vihang): addFunction method below directly updates the database
+ // parameters. If the applyAlterDatabase method below throws an exception,
+ // catalog might end up in a inconsistent state. Ideally, we should make a copy
+ // of hms Database object and then update the Db once the HMS operation succeeds
+ // similar to what happens in alterDatabaseSetOwner method.
if (catalog_.addFunction(fn)) {
+ addCatalogServiceIdentifiers(db.getMetaStoreDb(),
+ catalog_.getCatalogServiceId(), newCatalogVersion);
// Flush DB changes to metastore
applyAlterDatabase(db.getMetaStoreDb());
addedFunctions.add(fn.toTCatalogObject());
@@ -1421,6 +1438,8 @@ public class CatalogOpExecutor {
} else {
addSummary(resp, "Function already exists.");
}
+ } finally {
+ db.getLock().unlock();
}
}
@@ -2098,24 +2117,26 @@ public class CatalogOpExecutor {
private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp)
throws ImpalaException {
FunctionName fName = FunctionName.fromThrift(params.fn_name);
- synchronized (metastoreDdlLock_) {
- Db db = catalog_.getDb(fName.getDb());
- if (db == null) {
- if (!params.if_exists) {
- throw new CatalogException("Database: " + fName.getDb()
- + " does not exist.");
- }
- addSummary(resp, "Database does not exist.");
- return;
+ Db db = catalog_.getDb(fName.getDb());
+ if (db == null) {
+ if (!params.if_exists) {
+ throw new CatalogException("Database: " + fName.getDb()
+ + " does not exist.");
}
- // Get a new catalog version to assign to the database being altered. This is
- // needed for events processor as this method creates alter database events.
- long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
- addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+ addSummary(resp, "Database does not exist.");
+ return;
+ }
+
+ tryLock(db, "dropping function " + fName);
+ // Get a new catalog version to assign to the database being altered. This is
+ // needed for events processor as this method creates alter database events.
+ long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+ catalog_.getLock().writeLock().unlock();
+ try {
List<TCatalogObject> removedFunctions = Lists.newArrayList();
if (!params.isSetSignature()) {
dropJavaFunctionFromHms(fName.getDb(), fName.getFunction(), params.if_exists);
- for (Function fn: db.getFunctions(fName.getFunction())) {
+ for (Function fn : db.getFunctions(fName.getFunction())) {
if (fn.getBinaryType() != TFunctionBinaryType.JAVA
|| !fn.isPersistent()) {
continue;
@@ -2125,7 +2146,7 @@ public class CatalogOpExecutor {
}
} else {
ArrayList<Type> argTypes = Lists.newArrayList();
- for (TColumnType t: params.arg_types) {
+ for (TColumnType t : params.arg_types) {
argTypes.add(Type.fromThrift(t));
}
Function desc = new Function(fName, argTypes, Type.INVALID, false);
@@ -2136,6 +2157,8 @@ public class CatalogOpExecutor {
"Function: " + desc.signatureString() + " does not exist.");
}
} else {
+ addCatalogServiceIdentifiers(db.getMetaStoreDb(),
+ catalog_.getCatalogServiceId(), newCatalogVersion);
// Flush DB changes to metastore
applyAlterDatabase(db.getMetaStoreDb());
removedFunctions.add(fn.toTCatalogObject());
@@ -2152,6 +2175,8 @@ public class CatalogOpExecutor {
addSummary(resp, "Function does not exist.");
}
resp.result.setVersion(catalog_.getCatalogVersion());
+ } finally {
+ db.getLock().unlock();
}
}
@@ -4576,16 +4601,19 @@ public class CatalogOpExecutor {
}
private void alterCommentOnDb(String dbName, String comment, TDdlExecResponse response)
- throws ImpalaRuntimeException, CatalogException {
+ throws ImpalaRuntimeException, CatalogException, InternalException {
Db db = catalog_.getDb(dbName);
if (db == null) {
throw new CatalogException("Database: " + dbName + " does not exist.");
}
- synchronized (metastoreDdlLock_) {
- // Get a new catalog version to assign to the database being altered.
- long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
- addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+ tryLock(db, "altering the comment");
+ // Get a new catalog version to assign to the database being altered.
+ long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+ catalog_.getLock().writeLock().unlock();
+ try {
Database msDb = db.getMetaStoreDb().deepCopy();
+ addCatalogServiceIdentifiers(msDb, catalog_.getCatalogServiceId(),
+ newCatalogVersion);
msDb.setDescription(comment);
try {
applyAlterDatabase(msDb);
@@ -4597,6 +4625,8 @@ public class CatalogOpExecutor {
// now that HMS alter operation has succeeded, add this version to list of inflight
// events in catalog database if event processing is enabled
catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
+ } finally {
+ db.getLock().unlock();
}
addSummary(response, "Updated database.");
}
@@ -4623,11 +4653,14 @@ public class CatalogOpExecutor {
TDdlExecResponse response) throws ImpalaException {
Preconditions.checkNotNull(params.owner_name);
Preconditions.checkNotNull(params.owner_type);
- synchronized (metastoreDdlLock_) {
- // Get a new catalog version to assign to the database being altered.
- long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
- addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+ tryLock(db, "altering the owner");
+ // Get a new catalog version to assign to the database being altered.
+ long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+ catalog_.getLock().writeLock().unlock();
+ try {
Database msDb = db.getMetaStoreDb().deepCopy();
+ addCatalogServiceIdentifiers(msDb, catalog_.getCatalogServiceId(),
+ newCatalogVersion);
String originalOwnerName = msDb.getOwnerName();
PrincipalType originalOwnerType = msDb.getOwnerType();
msDb.setOwnerName(params.owner_name);
@@ -4647,6 +4680,8 @@ public class CatalogOpExecutor {
// now that HMS alter operation has succeeded, add this version to list of inflight
// events in catalog database if event processing is enabled
catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
+ } finally {
+ db.getLock().unlock();
}
addSummary(response, "Updated database.");
}
@@ -4656,9 +4691,9 @@ public class CatalogOpExecutor {
* No-op if event processing is disabled
*/
private void addCatalogServiceIdentifiers(
- Db db, String catalogServiceId, long newCatalogVersion) {
+ Database msDb, String catalogServiceId, long newCatalogVersion) {
if (!catalog_.isEventProcessingActive()) return;
- org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
+ Preconditions.checkNotNull(msDb);
msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
catalogServiceId);
msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
@@ -4775,6 +4810,17 @@ public class CatalogOpExecutor {
}
/**
+ * Try to lock the given Db in the catalog for the given operation. Throws
+ * InternalException if catalog is unable to lock the database.
+ */
+ private void tryLock(Db db, String operation) throws InternalException {
+ if (!catalog_.tryLockDb(db)) {
+ throw new InternalException(String.format("Error %s of database %s due to lock "
+ + "contention.", operation, db.getName()));
+ }
+ }
+
+ /**
* Commits ACID transaction with given transaction id.
* @param transactionId is the id of the transaction.
* @throws TransactionException
diff --git a/tests/custom_cluster/test_concurrent_ddls.py b/tests/custom_cluster/test_concurrent_ddls.py
index 37a498a..695045b 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -97,22 +97,32 @@ class TestConcurrentDdls(CustomClusterTestSuite):
def run_ddls(i):
tbl_name = db + ".test_" + str(i)
- for query_tmpl in [
+ # func_name = "f_" + str(i)
+ for query in [
+ # alter database operations
+ # TODO (IMPALA-9532): Uncomment the alter database operations
+ # "comment on database %s is 'test-concurrent-ddls'" % db,
+ # "alter database %s set owner user `test-user`" % db,
+ # "create function %s.%s() returns int location '%s/libTestUdfs.so' \
+ # symbol='NoArgs'" % (db, func_name, WAREHOUSE),
+ # "drop function if exists %s.%s()" % (db, func_name),
# Create a partitioned and unpartitioned table
- "create table %s (i int)",
- "create table %s_part (i int) partitioned by (j int)",
+ "create table %s (i int)" % tbl_name,
+ "create table %s_part (i int) partitioned by (j int)" % tbl_name,
# Below queries could fail if running with invalidate metadata concurrently
- "alter table %s_part add partition (j=1)",
- "alter table %s_part add partition (j=2)",
- "invalidate metadata %s_part",
- "refresh %s",
- "refresh %s_part",
- "insert overwrite table %s select int_col from functional.alltypestiny",
- "insert overwrite table %s_part partition(j=1) values (1), (2), (3), (4), (5)",
- "insert overwrite table %s_part partition(j=2) values (1), (2), (3), (4), (5)"
+ "alter table %s_part add partition (j=1)" % tbl_name,
+ "alter table %s_part add partition (j=2)" % tbl_name,
+ "invalidate metadata %s_part" % tbl_name,
+ "refresh %s" % tbl_name,
+ "refresh %s_part" % tbl_name,
+ "insert overwrite table %s select int_col from "
+ "functional.alltypestiny" % tbl_name,
+ "insert overwrite table %s_part partition(j=1) "
+ "values (1), (2), (3), (4), (5)" % tbl_name,
+ "insert overwrite table %s_part partition(j=2) "
+ "values (1), (2), (3), (4), (5)" % tbl_name
]:
try:
- query = query_tmpl % tbl_name
# TODO(IMPALA-9123): Timeout logic here does not work for DDLs since they are
# usually stuck in CREATED state and execute_async() won't return. We finally
# use timeout in pytest.mark.timeout() but it's not precise. We should find a
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index e9d5c4f..f39f55f 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -18,16 +18,13 @@
import random
import string
import pytest
-import json
-import time
-import requests
-from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
- SkipIfLocal, SkipIfHive2
+from tests.common.skip import SkipIfHive2
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.util.hive_utils import HiveDbWrapper
from tests.util.event_processor_utils import EventProcessorUtils
+from tests.util.filesystem_utils import WAREHOUSE
@SkipIfS3.hive
@@ -242,8 +239,12 @@ class TestEventProcessing(CustomClusterTestSuite):
self_event_test_queries = {
# Queries which will increment the self-events-skipped counter
True: [
+ # ALTER_DATABASE case
"comment on database {0} is 'self-event test database'".format(db_name),
"alter database {0} set owner user `test-user`".format(db_name),
+ "create function {0}.f() returns int location '{1}/libTestUdfs.so' "
+ "symbol='NoArgs'".format(db_name, WAREHOUSE),
+ "drop function {0}.f()".format(db_name),
# ALTER_TABLE case
"alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name),
"alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name),