You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/16 23:04:20 UTC
[2/4] impala git commit: IMPALA-5058: Improve the concurrency of
DDL/DML operations
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 4c959b2..0e2e8b9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -29,6 +30,7 @@ import org.apache.thrift.TException;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.ImpalaException;
+import org.apache.impala.util.PatternMatcher;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TDataSource;
@@ -173,23 +175,31 @@ public class ImpaladCatalog extends Catalog {
}
}
- // Now remove all objects from the catalog. Removing a database before removing
- // its child tables/functions is fine. If that happens, the removal of the child
- // object will be a no-op.
+ // Now remove all objects from the catalog. First remove low-level objects (tables,
+ // functions and privileges) and then the top-level objects (databases and roles).
for (TCatalogObject catalogObject: req.getRemoved_objects()) {
- removeCatalogObject(catalogObject, newCatalogVersion);
+ if (!isTopLevelCatalogObject(catalogObject)) {
+ removeCatalogObject(catalogObject);
+ }
}
+ for (TCatalogObject catalogObject: req.getRemoved_objects()) {
+ if (isTopLevelCatalogObject(catalogObject)) {
+ removeCatalogObject(catalogObject);
+ }
+ }
+
+
lastSyncedCatalogVersion_ = newCatalogVersion;
// Cleanup old entries in the log.
catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_);
isReady_.set(true);
-
// Notify all the threads waiting on a catalog update.
synchronized (catalogUpdateEventNotifier_) {
catalogUpdateEventNotifier_.notifyAll();
}
- return new TUpdateCatalogCacheResponse(catalogServiceId_);
+ return new TUpdateCatalogCacheResponse(catalogServiceId_,
+ CatalogObjectVersionQueue.INSTANCE.getMinimumVersion());
}
/**
@@ -319,24 +329,10 @@ public class ImpaladCatalog extends Catalog {
/**
* Removes the matching TCatalogObject from the catalog, if one exists and its
* catalog version is < the catalog version of this drop operation.
- * Note that drop operations that come from statestore heartbeats always have a
- * version of 0. To determine the drop version for statestore updates,
- * the catalog version from the current update is used. This is okay because there
- * can never be a catalog update from the statestore that contains a drop
- * and an addition of the same object. For more details on how drop
- * versioning works, see CatalogServerCatalog.java
*/
- private void removeCatalogObject(TCatalogObject catalogObject,
- long currentCatalogUpdateVersion) {
- // The TCatalogObject associated with a drop operation from a state store
- // heartbeat will always have a version of zero. Because no update from
- // the state store can contain both a drop and an addition of the same object,
- // we can assume the drop version is the current catalog version of this update.
- // If the TCatalogObject contains a version that != 0, it indicates the drop
- // came from a direct update.
- long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
- currentCatalogUpdateVersion : catalogObject.getCatalog_version();
-
+ private void removeCatalogObject(TCatalogObject catalogObject) {
+ Preconditions.checkState(catalogObject.getCatalog_version() != 0);
+ long dropCatalogVersion = catalogObject.getCatalog_version();
switch(catalogObject.getType()) {
case DATABASE:
removeDb(catalogObject.getDb(), dropCatalogVersion);
@@ -360,7 +356,7 @@ public class ImpaladCatalog extends Catalog {
case HDFS_CACHE_POOL:
HdfsCachePool existingItem =
hdfsCachePools_.get(catalogObject.getCache_pool().getPool_name());
- if (existingItem.getCatalogVersion() > catalogObject.getCatalog_version()) {
+ if (existingItem.getCatalogVersion() <= catalogObject.getCatalog_version()) {
hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name());
}
break;
@@ -381,6 +377,15 @@ public class ImpaladCatalog extends Catalog {
Db newDb = Db.fromTDatabase(thriftDb, this);
newDb.setCatalogVersion(catalogVersion);
addDb(newDb);
+ if (existingDb != null) {
+ CatalogObjectVersionQueue.INSTANCE.updateVersions(
+ existingDb.getCatalogVersion(), catalogVersion);
+ CatalogObjectVersionQueue.INSTANCE.removeAll(existingDb.getTables());
+ CatalogObjectVersionQueue.INSTANCE.removeAll(
+ existingDb.getFunctions(null, new PatternMatcher()));
+ } else {
+ CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion);
+ }
}
}
@@ -414,6 +419,12 @@ public class ImpaladCatalog extends Catalog {
if (existingFn == null ||
existingFn.getCatalogVersion() < catalogVersion) {
db.addFunction(function);
+ if (existingFn != null) {
+ CatalogObjectVersionQueue.INSTANCE.updateVersions(
+ existingFn.getCatalogVersion(), catalogVersion);
+ } else {
+ CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion);
+ }
}
}
@@ -431,6 +442,11 @@ public class ImpaladCatalog extends Catalog {
Db db = getDb(thriftDb.getDb_name());
if (db != null && db.getCatalogVersion() < dropCatalogVersion) {
removeDb(db.getName());
+ CatalogObjectVersionQueue.INSTANCE.removeVersion(
+ db.getCatalogVersion());
+ CatalogObjectVersionQueue.INSTANCE.removeAll(db.getTables());
+ CatalogObjectVersionQueue.INSTANCE.removeAll(
+ db.getFunctions(null, new PatternMatcher()));
}
}
@@ -455,6 +471,8 @@ public class ImpaladCatalog extends Catalog {
Function fn = db.getFunction(thriftFn.getSignature());
if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) {
db.removeFunction(thriftFn.getSignature());
+ CatalogObjectVersionQueue.INSTANCE.removeVersion(
+ fn.getCatalogVersion());
}
}
@@ -463,6 +481,7 @@ public class ImpaladCatalog extends Catalog {
// version of the drop, remove the function.
if (existingRole != null && existingRole.getCatalogVersion() < dropCatalogVersion) {
authPolicy_.removeRole(thriftRole.getRole_name());
+ CatalogObjectVersionQueue.INSTANCE.removeAll(existingRole.getPrivileges());
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Role.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Role.java b/fe/src/main/java/org/apache/impala/catalog/Role.java
index 0b89866..b45ff22 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Role.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Role.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TRole;
import com.google.common.base.Preconditions;
@@ -30,11 +31,10 @@ import com.google.common.collect.Sets;
/**
* Represents a role in an authorization policy. This class is thread safe.
*/
-public class Role implements CatalogObject {
+public class Role extends CatalogObjectImpl {
private final TRole role_;
// The last role ID assigned, starts at 0.
private static AtomicInteger roleId_ = new AtomicInteger(0);
- private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
private final CatalogObjectCache<RolePrivilege> rolePrivileges_ =
new CatalogObjectCache<RolePrivilege>();
@@ -134,11 +134,12 @@ public class Role implements CatalogObject {
public String getName() { return role_.getRole_name(); }
public int getId() { return role_.getRole_id(); }
@Override
- public synchronized long getCatalogVersion() { return catalogVersion_; }
- @Override
- public synchronized void setCatalogVersion(long newVersion) {
- catalogVersion_ = newVersion;
+ public String getUniqueName() { return "ROLE:" + getName().toLowerCase(); }
+
+ public TCatalogObject toTCatalogObject() {
+ TCatalogObject catalogObject =
+ new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+ catalogObject.setRole(toThrift());
+ return catalogObject;
}
- @Override
- public boolean isLoaded() { return true; }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
index 87277af..ef3717c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
+++ b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.log4j.Logger;
+import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TPrivilege;
import org.apache.impala.thrift.TPrivilegeLevel;
@@ -33,16 +34,14 @@ import com.google.common.collect.Lists;
* Represents a privilege that has been granted to a role in an authorization policy.
* This class is thread safe.
*/
-public class RolePrivilege implements CatalogObject {
+public class RolePrivilege extends CatalogObjectImpl {
private static final Logger LOG = Logger.getLogger(AuthorizationPolicy.class);
// These Joiners are used to build role names. For simplicity, the role name we
// use can also be sent to the Sentry library to perform authorization checks
// so we build them in the same format.
private static final Joiner AUTHORIZABLE_JOINER = Joiner.on("->");
private static final Joiner KV_JOINER = Joiner.on("=");
-
private final TPrivilege privilege_;
- private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
private RolePrivilege(TPrivilege privilege) {
privilege_ = privilege;
@@ -132,13 +131,16 @@ public class RolePrivilege implements CatalogObject {
public String getName() { return privilege_.getPrivilege_name(); }
public int getRoleId() { return privilege_.getRole_id(); }
@Override
- public synchronized long getCatalogVersion() { return catalogVersion_; }
- @Override
- public synchronized void setCatalogVersion(long newVersion) {
- catalogVersion_ = newVersion;
+ public String getUniqueName() {
+ return "PRIVILEGE:" + getName().toLowerCase() + "." + Integer.toString(getRoleId());
+ }
+
+ public TCatalogObject toTCatalogObject() {
+ TCatalogObject catalogObject =
+ new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+ catalogObject.setPrivilege(toThrift());
+ return catalogObject;
}
- @Override
- public boolean isLoaded() { return true; }
// The time this role was created. Used to quickly check if the same privilege
// was dropped and re-created. Assumes a role will not be created + dropped + created
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 23fa7a4..50fe953 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -56,13 +56,9 @@ import com.google.common.collect.Maps;
* is more general than Hive's CLUSTER BY ... INTO BUCKETS clause (which partitions
* a key range into a fixed number of buckets).
*/
-public abstract class Table implements CatalogObject {
+public abstract class Table extends CatalogObjectImpl {
private static final Logger LOG = Logger.getLogger(Table.class);
-
- // Catalog version assigned to this table
- private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
protected org.apache.hadoop.hive.metastore.api.Table msTable_;
-
protected final Db db_;
protected final String name_;
protected final String owner_;
@@ -358,13 +354,21 @@ public abstract class Table implements CatalogObject {
}
public TCatalogObject toTCatalogObject() {
- TCatalogObject catalogObject = new TCatalogObject();
- catalogObject.setType(getCatalogObjectType());
- catalogObject.setCatalog_version(getCatalogVersion());
+ TCatalogObject catalogObject =
+ new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
catalogObject.setTable(toThrift());
return catalogObject;
}
+ public TCatalogObject toMinimalTCatalogObject() {
+ TCatalogObject catalogObject =
+ new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+ catalogObject.setTable(new TTable());
+ catalogObject.getTable().setDb_name(getDb().getName());
+ catalogObject.getTable().setTbl_name(getName());
+ return catalogObject;
+ }
+
/**
* Gets the ColumnType from the given FieldSchema by using Impala's SqlParser.
* Throws a TableLoadingException if the FieldSchema could not be parsed.
@@ -396,6 +400,8 @@ public abstract class Table implements CatalogObject {
public TableName getTableName() {
return new TableName(db_ != null ? db_.getName() : null, name_);
}
+ @Override
+ public String getUniqueName() { return "TABLE:" + getFullName(); }
public ArrayList<Column> getColumns() { return colsByPos_; }
@@ -490,17 +496,6 @@ public abstract class Table implements CatalogObject {
public TTableStats getTTableStats() { return tableStats_; }
public ArrayType getType() { return type_; }
- @Override
- public long getCatalogVersion() { return catalogVersion_; }
-
- @Override
- public void setCatalogVersion(long catalogVersion) {
- catalogVersion_ = catalogVersion;
- }
-
- @Override
- public boolean isLoaded() { return true; }
-
public static boolean isExternalTable(
org.apache.hadoop.hive.metastore.api.Table msTbl) {
return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
new file mode 100644
index 0000000..9d23c4f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+// A log of topic update information for each catalog object. An entry is added to
+// the log when a catalog object is processed (added/removed/skipped) in a topic
+// update and it is replaced every time the catalog object is processed in a
+// topic update.
+//
+// To prevent the log from growing indefinitely, the oldest entries
+// (in terms of last topic update that processed the associated catalog objects) are
+// garbage collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates. That will cause
+// entries of deleted catalog objects or entries of objects that haven't been processed
+// by the catalog for at least TOPIC_UPDATE_LOG_GC_FREQUENCY updates to be removed from
+// the log.
+public class TopicUpdateLog {
+ private static final Logger LOG = Logger.getLogger(TopicUpdateLog.class);
+ // Frequency at which the entries of the topic update log are garbage collected.
+ // An entry may survive for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.
+ private final static int TOPIC_UPDATE_LOG_GC_FREQUENCY = 1000;
+
+ // Number of topic updates left to trigger a gc of topic update log entries.
+ private int numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+
+ // In the next gc cycle of topic update log entries, all the entries that were last
+ // added to a topic update with version less than or equal to
+ // 'oldestTopicUpdateToGc_' are removed from the update log.
+ private long oldestTopicUpdateToGc_ = -1;
+
+ // Represents an entry in the topic update log. A topic update log entry is
+ // associated with a catalog object and stores information about the last topic update
+ // that processed that object.
+ public static class Entry {
+ // Number of times the entry has skipped a topic update.
+ private final int numSkippedUpdates_;
+ // Last version of the corresponding catalog object that was added to a topic
+ // update. -1 if the object was never added to a topic update.
+ private final long lastSentVersion_;
+ // Version of the last topic update to include the corresponding catalog object.
+ // -1 if the object was never added to a topic update.
+ private final long lastSentTopicUpdate_;
+
+ Entry() {
+ numSkippedUpdates_ = 0;
+ lastSentVersion_ = -1;
+ lastSentTopicUpdate_ = -1;
+ }
+
+ Entry(int numSkippedUpdates, long lastSentVersion, long lastSentCatalogUpdate) {
+ numSkippedUpdates_ = numSkippedUpdates;
+ lastSentVersion_ = lastSentVersion;
+ lastSentTopicUpdate_ = lastSentCatalogUpdate;
+ }
+
+ public int getNumSkippedTopicUpdates() { return numSkippedUpdates_; }
+ public long getLastSentVersion() { return lastSentVersion_; }
+ public long getLastSentCatalogUpdate() { return lastSentTopicUpdate_; }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this.getClass() != other.getClass()) return false;
+ Entry entry = (Entry) other;
+ return numSkippedUpdates_ == entry.getNumSkippedTopicUpdates()
+ && lastSentVersion_ == entry.getLastSentVersion()
+ && lastSentTopicUpdate_ == entry.getLastSentCatalogUpdate();
+ }
+ }
+
+ // Entries in the topic update log stored as a map of catalog object keys to
+ // Entry objects.
+ private final ConcurrentHashMap<String, Entry> topicLogEntries_ =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Garbage-collects topic update log entries. These are entries that haven't been
+ * added to any of the last TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates.
+ */
+ public void garbageCollectUpdateLogEntries(long lastTopicUpdateVersion) {
+ if (oldestTopicUpdateToGc_ == -1) {
+ oldestTopicUpdateToGc_ = lastTopicUpdateVersion;
+ return;
+ }
+ if (numTopicUpdatesToGc_ == 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Topic update log GC started.");
+ }
+ Preconditions.checkState(oldestTopicUpdateToGc_ > 0);
+ int numEntriesRemoved = 0;
+ for (Map.Entry<String, Entry> entry:
+ topicLogEntries_.entrySet()) {
+ if (entry.getValue().getLastSentVersion() == -1) continue;
+ if (entry.getValue().getLastSentCatalogUpdate() <= oldestTopicUpdateToGc_) {
+ if (topicLogEntries_.remove(entry.getKey(), entry.getValue())) {
+ ++numEntriesRemoved;
+ }
+ }
+ }
+ numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+ oldestTopicUpdateToGc_ = lastTopicUpdateVersion;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Topic update log GC finished. Removed " + numEntriesRemoved +
+ " entries.");
+ }
+ } else {
+ --numTopicUpdatesToGc_;
+ }
+ }
+
+ public void add(String catalogObjectKey, Entry logEntry) {
+ Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey));
+ Preconditions.checkNotNull(logEntry);
+ topicLogEntries_.put(catalogObjectKey, logEntry);
+ }
+
+ public Entry get(String catalogObjectKey) {
+ Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey));
+ return topicLogEntries_.get(catalogObjectKey);
+ }
+
+ // Returns the topic update log entry for the catalog object with key
+ // 'catalogObjectKey'. If the key does not exist, a newly constructed log entry is
+ // returned.
+ public Entry getOrCreateLogEntry(String catalogObjectKey) {
+ Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey));
+ Entry entry = topicLogEntries_.get(catalogObjectKey);
+ if (entry == null) entry = new Entry();
+ return entry;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index b0ed45f..295956c 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -179,7 +179,7 @@ import com.google.common.math.LongMath;
* update operations and requires the use of fair table locks to prevent starvation.
*
* DO {
- * Acquire the catalog lock (see CatalogServiceCatalog.catalogLock_)
+ * Acquire the catalog lock (see CatalogServiceCatalog.versionLock_)
* Try to acquire a table lock
* IF the table lock acquisition fails {
* Release the catalog lock
@@ -326,6 +326,15 @@ public class CatalogOpExecutor {
ddlRequest.ddl_type);
}
+ // If SYNC_DDL is set, set the catalog update that contains the results of this DDL
+ // operation. The version of this catalog update is returned to the requesting
+ // impalad which will wait until this catalog update has been broadcast to all the
+ // coordinators.
+ if (ddlRequest.isSync_ddl()) {
+ response.getResult().setVersion(
+ catalog_.waitForSyncDdlVersion(response.getResult()));
+ }
+
// At this point, the operation is considered successful. If any errors occurred
// during execution, this function will throw an exception and the CatalogServer
// will handle setting a bad status code.
@@ -909,12 +918,15 @@ public class CatalogOpExecutor {
String dbName = params.getDb();
Preconditions.checkState(dbName != null && !dbName.isEmpty(),
"Null or empty database name passed as argument to Catalog.createDatabase");
- if (params.if_not_exists && catalog_.getDb(dbName) != null) {
+ Db existingDb = catalog_.getDb(dbName);
+ if (params.if_not_exists && existingDb != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping database creation because " + dbName + " already exists "
+ "and IF NOT EXISTS was specified.");
}
- resp.getResult().setVersion(catalog_.getCatalogVersion());
+ Preconditions.checkNotNull(existingDb);
+ resp.getResult().addToUpdated_catalog_objects(existingDb.toTCatalogObject());
+ resp.getResult().setVersion(existingDb.getCatalogVersion());
return;
}
org.apache.hadoop.hive.metastore.api.Database db =
@@ -960,11 +972,7 @@ public class CatalogOpExecutor {
}
Preconditions.checkNotNull(newDb);
- TCatalogObject thriftDb = new TCatalogObject(
- TCatalogObjectType.DATABASE, Catalog.INITIAL_CATALOG_VERSION);
- thriftDb.setDb(newDb.toThrift());
- thriftDb.setCatalog_version(newDb.getCatalogVersion());
- resp.result.addToUpdated_catalog_objects(thriftDb);
+ resp.result.addToUpdated_catalog_objects(newDb.toTCatalogObject());
}
resp.result.setVersion(newDb.getCatalogVersion());
}
@@ -1038,22 +1046,18 @@ public class CatalogOpExecutor {
throws ImpalaException {
if (LOG.isTraceEnabled()) { LOG.trace("Adding DATA SOURCE: " + params.toString()); }
DataSource dataSource = DataSource.fromThrift(params.getData_source());
- if (catalog_.getDataSource(dataSource.getName()) != null) {
+ DataSource existingDataSource = catalog_.getDataSource(dataSource.getName());
+ if (existingDataSource != null) {
if (!params.if_not_exists) {
throw new ImpalaRuntimeException("Data source " + dataSource.getName() +
" already exists.");
}
- // The user specified IF NOT EXISTS and the data source exists, just
- // return the current catalog version.
- resp.result.setVersion(catalog_.getCatalogVersion());
+ resp.result.addToUpdated_catalog_objects(existingDataSource.toTCatalogObject());
+ resp.result.setVersion(existingDataSource.getCatalogVersion());
return;
}
catalog_.addDataSource(dataSource);
- TCatalogObject addedObject = new TCatalogObject();
- addedObject.setType(TCatalogObjectType.DATA_SOURCE);
- addedObject.setData_source(dataSource.toThrift());
- addedObject.setCatalog_version(dataSource.getCatalogVersion());
- resp.result.addToUpdated_catalog_objects(addedObject);
+ resp.result.addToUpdated_catalog_objects(dataSource.toTCatalogObject());
resp.result.setVersion(dataSource.getCatalogVersion());
}
@@ -1070,11 +1074,7 @@ public class CatalogOpExecutor {
resp.result.setVersion(catalog_.getCatalogVersion());
return;
}
- TCatalogObject removedObject = new TCatalogObject();
- removedObject.setType(TCatalogObjectType.DATA_SOURCE);
- removedObject.setData_source(dataSource.toThrift());
- removedObject.setCatalog_version(dataSource.getCatalogVersion());
- resp.result.addToRemoved_catalog_objects(removedObject);
+ resp.result.addToRemoved_catalog_objects(dataSource.toTCatalogObject());
resp.result.setVersion(dataSource.getCatalogVersion());
}
@@ -1229,7 +1229,7 @@ public class CatalogOpExecutor {
throw new CatalogException("Database " + db.getName() + " is not empty");
}
- TCatalogObject removedObject = new TCatalogObject();
+ TCatalogObject removedObject = null;
synchronized (metastoreDdlLock_) {
// Remove all the Kudu tables of 'db' from the Kudu storage engine.
if (db != null && params.cascade) dropTablesFromKudu(db);
@@ -1251,11 +1251,9 @@ public class CatalogOpExecutor {
for (String tableName: removedDb.getAllTableNames()) {
uncacheTable(removedDb.getTable(tableName));
}
- removedObject.setCatalog_version(removedDb.getCatalogVersion());
+ removedObject = removedDb.toTCatalogObject();
}
- removedObject.setType(TCatalogObjectType.DATABASE);
- removedObject.setDb(new TDatabase());
- removedObject.getDb().setDb_name(params.getDb());
+ Preconditions.checkNotNull(removedObject);
resp.result.setVersion(removedObject.getCatalog_version());
resp.result.addToRemoved_catalog_objects(removedObject);
}
@@ -1525,12 +1523,17 @@ public class CatalogOpExecutor {
Preconditions.checkState(params.getColumns() != null,
"Null column list given as argument to Catalog.createTable");
- if (params.if_not_exists &&
- catalog_.containsTable(tableName.getDb(), tableName.getTbl())) {
+ Table existingTbl = catalog_.getTable(tableName.getDb(), tableName.getTbl(), false);
+ if (params.if_not_exists && existingTbl != null) {
LOG.trace(String.format("Skipping table creation because %s already exists and " +
"IF NOT EXISTS was specified.", tableName));
- response.getResult().setVersion(catalog_.getCatalogVersion());
- return false;
+ existingTbl.getLock().lock();
+ try {
+ addTableToCatalogUpdate(existingTbl, response.getResult());
+ return false;
+ } finally {
+ existingTbl.getLock().unlock();
+ }
}
org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
LOG.trace(String.format("Creating table %s", tableName));
@@ -1736,12 +1739,17 @@ public class CatalogOpExecutor {
Preconditions.checkState(tblName != null && tblName.isFullyQualified());
Preconditions.checkState(srcTblName != null && srcTblName.isFullyQualified());
- if (params.if_not_exists &&
- catalog_.containsTable(tblName.getDb(), tblName.getTbl())) {
+ Table existingTbl = catalog_.getTable(tblName.getDb(), tblName.getTbl(), false);
+ if (params.if_not_exists && existingTbl != null) {
LOG.trace(String.format("Skipping table creation because %s already exists and " +
"IF NOT EXISTS was specified.", tblName));
- response.getResult().setVersion(catalog_.getCatalogVersion());
- return;
+ existingTbl.getLock().lock();
+ try {
+ addTableToCatalogUpdate(existingTbl, response.getResult());
+ return;
+ } finally {
+ existingTbl.getLock().unlock();
+ }
}
Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl());
org.apache.hadoop.hive.metastore.api.Table tbl =
@@ -2185,8 +2193,9 @@ public class CatalogOpExecutor {
}
// Rename the table in the Catalog and get the resulting catalog object.
// ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
- Table newTable = catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
- if (newTable == null) {
+ Pair<Table, Table> result =
+ catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
+ if (result.first == null || result.second == null) {
// The rename succeeded in the HMS but failed in the catalog cache. The cache is in
// an inconsistent state, but can likely be fixed by running "invalidate metadata".
throw new ImpalaRuntimeException(String.format(
@@ -2196,14 +2205,9 @@ public class CatalogOpExecutor {
newTableName.toString()));
}
- TCatalogObject addedObject = newTable.toTCatalogObject();
- TCatalogObject removedObject = new TCatalogObject();
- removedObject.setType(TCatalogObjectType.TABLE);
- removedObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
- removedObject.setCatalog_version(addedObject.getCatalog_version());
- response.result.addToRemoved_catalog_objects(removedObject);
- response.result.addToUpdated_catalog_objects(addedObject);
- response.result.setVersion(addedObject.getCatalog_version());
+ response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
+ response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
+ response.result.setVersion(result.second.getCatalogVersion());
}
/**
@@ -2851,28 +2855,18 @@ public class CatalogOpExecutor {
Preconditions.checkNotNull(rolePrivileges);
List<TCatalogObject> updatedPrivs = Lists.newArrayList();
for (RolePrivilege rolePriv: rolePrivileges) {
- TCatalogObject catalogObject = new TCatalogObject();
- catalogObject.setType(rolePriv.getCatalogObjectType());
- catalogObject.setPrivilege(rolePriv.toThrift());
- catalogObject.setCatalog_version(rolePriv.getCatalogVersion());
- updatedPrivs.add(catalogObject);
- }
-
- // TODO: Currently we only support sending back 1 catalog object in a "direct DDL"
- // response. If multiple privileges have been updated, just send back the
- // catalog version so subscribers can wait for the statestore heartbeat that
- // contains all updates (see IMPALA-5571).
- if (updatedPrivs.size() == 1) {
+ updatedPrivs.add(rolePriv.toTCatalogObject());
+ }
+
+ if (!updatedPrivs.isEmpty()) {
// If this is a REVOKE statement with hasGrantOpt, only the GRANT OPTION is revoked
- // from the privilege.
+ // from the privileges. Otherwise the privileges are removed from the catalog.
if (grantRevokePrivParams.isIs_grant() ||
privileges.get(0).isHas_grant_opt()) {
resp.result.setUpdated_catalog_objects(updatedPrivs);
} else {
resp.result.setRemoved_catalog_objects(updatedPrivs);
}
- resp.result.setVersion(updatedPrivs.get(0).getCatalog_version());
- } else if (updatedPrivs.size() > 1) {
resp.result.setVersion(
updatedPrivs.get(updatedPrivs.size() - 1).getCatalog_version());
}
@@ -3027,6 +3021,9 @@ public class CatalogOpExecutor {
resp.result.setUpdated_catalog_objects(addedFuncs);
resp.result.setRemoved_catalog_objects(removedFuncs);
resp.result.setVersion(catalog_.getCatalogVersion());
+ for (TCatalogObject removedFn: removedFuncs) {
+ catalog_.getDeleteLog().addRemovedObject(removedFn);
+ }
}
}
} else if (req.isSetTable_name()) {
@@ -3059,26 +3056,32 @@ public class CatalogOpExecutor {
req.getTable_name().getTable_name());
}
- if (!dbWasAdded.getRef()) {
- // Return the TCatalogObject in the result to indicate this request can be
- // processed as a direct DDL operation.
- if (tblWasRemoved.getRef()) {
- resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
- } else {
- resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
- }
+ // Return the TCatalogObject in the result to indicate this request can be
+ // processed as a direct DDL operation.
+ if (tblWasRemoved.getRef()) {
+ resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
} else {
- // Since multiple catalog objects were modified (db and table), don't treat this
- // as a direct DDL operation. Set the overall catalog version and the impalad
- // will wait for a statestore heartbeat that contains the update.
- Preconditions.checkState(!req.isIs_refresh());
+ resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
+ }
+
+ if (dbWasAdded.getRef()) {
+ Db addedDb = catalog_.getDb(updatedThriftTable.getTable().getDb_name());
+ if (addedDb == null) {
+ throw new CatalogException("Database " +
+ updatedThriftTable.getTable().getDb_name() + " was removed by a " +
+ "concurrent operation. Try invalidating the table again.");
+ }
+ resp.getResult().addToUpdated_catalog_objects(addedDb.toTCatalogObject());
}
resp.getResult().setVersion(updatedThriftTable.getCatalog_version());
} else {
// Invalidate the entire catalog if no table name is provided.
Preconditions.checkArgument(!req.isIs_refresh());
- catalog_.reset();
- resp.result.setVersion(catalog_.getCatalogVersion());
+ resp.getResult().setVersion(catalog_.reset());
+ resp.getResult().setIs_invalidate(true);
+ }
+ if (req.isSync_ddl()) {
+ resp.getResult().setVersion(catalog_.waitForSyncDdlVersion(resp.getResult()));
}
resp.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>()));
return resp;
@@ -3261,11 +3264,16 @@ public class CatalogOpExecutor {
loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata);
addTableToCatalogUpdate(table, response.result);
- return response;
} finally {
Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
table.getLock().unlock();
}
+
+ if (update.isSync_ddl()) {
+ response.getResult().setVersion(
+ catalog_.waitForSyncDdlVersion(response.getResult()));
+ }
+ return response;
}
private List<String> getPartValsFromName(org.apache.hadoop.hive.metastore.api.Table
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index c62fc31..d0936d5 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -520,13 +520,17 @@ public class Frontend {
} else {
throw new IllegalStateException("Unexpected CatalogOp statement type.");
}
-
result.setResult_set_metadata(metadata);
+ ddl.setSync_ddl(result.getQuery_options().isSync_ddl());
result.setCatalog_op_request(ddl);
if (ddl.getOp_type() == TCatalogOpType.DDL) {
TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader();
header.setRequesting_user(analysis.getAnalyzer().getUser().getName());
ddl.getDdl_params().setHeader(header);
+ ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl());
+ }
+ if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
+ ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index b56527b..e945a3b 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -38,7 +38,8 @@ import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TDdlExecRequest;
import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaRequest;
import org.apache.impala.thrift.TGetDbsParams;
import org.apache.impala.thrift.TGetDbsResult;
import org.apache.impala.thrift.TGetFunctionsRequest;
@@ -119,9 +120,11 @@ public class JniCatalog {
/**
* Gets all catalog objects
*/
- public byte[] getCatalogObjects(long from_version) throws ImpalaException, TException {
- TGetAllCatalogObjectsResponse resp =
- catalog_.getCatalogObjects(from_version);
+ public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq)
+ throws ImpalaException, TException {
+ TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
+ JniUtil.deserializeThrift(protocolFactory_, params, thriftGetCatalogDeltaReq);
+ TGetCatalogDeltaResponse resp = catalog_.getCatalogDelta(params.getFrom_version());
TSerializer serializer = new TSerializer(protocolFactory_);
return serializer.serialize(resp);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/util/SentryProxy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/SentryProxy.java b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
index 23534d2..f2df66b 100644
--- a/fe/src/main/java/org/apache/impala/util/SentryProxy.java
+++ b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
@@ -87,7 +87,7 @@ public class SentryProxy {
}
sentryPolicyService_ = new SentryPolicyService(sentryConfig);
- policyReader_.scheduleAtFixedRate(new PolicyReader(), 0,
+ policyReader_.scheduleAtFixedRate(new PolicyReader(false), 0,
BackendConfig.INSTANCE.getSentryCatalogPollingFrequency(),
TimeUnit.SECONDS);
}
@@ -107,6 +107,12 @@ public class SentryProxy {
* atomically.
*/
private class PolicyReader implements Runnable {
+ private boolean resetVersions_;
+
+ public PolicyReader(boolean resetVersions) {
+ resetVersions_ = resetVersions;
+ }
+
public void run() {
synchronized (SentryProxy.this) {
// Assume all roles should be removed. Then query the Policy Service and remove
@@ -131,6 +137,9 @@ public class SentryProxy {
if (existingRole != null &&
existingRole.getGrantGroups().equals(grantGroups)) {
role = existingRole;
+ if (resetVersions_) {
+ role.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
+ }
} else {
role = catalog_.addRole(sentryRole.getRoleName(), grantGroups);
}
@@ -160,6 +169,10 @@ public class SentryProxy {
// We already know about this privilege (privileges cannot be modified).
if (existingPriv != null &&
existingPriv.getCreateTimeMs() == sentryPriv.getCreateTime()) {
+ if (resetVersions_) {
+ existingPriv.setCatalogVersion(
+ catalog_.incrementAndGetCatalogVersion());
+ }
continue;
}
catalog_.addRolePrivilege(role.getName(), thriftPriv);
@@ -302,10 +315,7 @@ public class SentryProxy {
// Update the catalog
for (TPrivilege privilege: privileges) {
RolePrivilege rolePriv = catalog_.removeRolePrivilege(roleName, privilege);
- if (rolePriv == null) {
- rolePriv = RolePrivilege.fromThrift(privilege);
- rolePriv.setCatalogVersion(catalog_.getCatalogVersion());
- }
+ if (rolePriv == null) continue;
rolePrivileges.add(rolePriv);
}
} else {
@@ -317,12 +327,7 @@ public class SentryProxy {
List<TPrivilege> updatedPrivileges = Lists.newArrayList();
for (TPrivilege privilege: privileges) {
RolePrivilege existingPriv = catalog_.getRolePrivilege(roleName, privilege);
- if (existingPriv == null) {
- RolePrivilege rolePriv = RolePrivilege.fromThrift(privilege);
- rolePriv.setCatalogVersion(catalog_.getCatalogVersion());
- rolePrivileges.add(rolePriv);
- continue;
- }
+ if (existingPriv == null) continue;
TPrivilege updatedPriv = existingPriv.toThrift();
updatedPriv.setHas_grant_opt(false);
updatedPrivileges.add(updatedPriv);
@@ -342,9 +347,9 @@ public class SentryProxy {
* the Catalog with any changes. Throws an ImpalaRuntimeException if there are any
* errors executing the refresh job.
*/
- public void refresh() throws ImpalaRuntimeException {
+ public void refresh(boolean resetVersions) throws ImpalaRuntimeException {
try {
- policyReader_.submit(new PolicyReader()).get();
+ policyReader_.submit(new PolicyReader(resetVersions)).get();
} catch (Exception e) {
// We shouldn't make it here. It means an exception leaked from the
// AuthorizationPolicyReader.
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 93e4af0..df2ba0d 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -38,7 +38,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
// Cache pools are typically loaded asynchronously, but as there is no fixed execution
// order for tests, the cache pools are loaded synchronously before the tests are
// executed.
- CachePoolReader rd = new CachePoolReader();
+ CachePoolReader rd = new CachePoolReader(false);
rd.run();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index fdc64e6..7e8ff46 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -65,9 +65,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
/**
* Reloads all metadata from the source catalog.
*/
- public void reset() throws CatalogException {
- srcCatalog_.reset();
- }
+ public void reset() throws CatalogException { srcCatalog_.reset(); }
/**
* Overrides ImpaladCatalog.getTable to load the table metadata if it is missing.
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index e2b1715..1003dc7 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -311,14 +311,12 @@ class StatestoreSubscriber(object):
class TestStatestore():
def make_topic_update(self, topic_name, key_template="foo", value_template="bar",
- num_updates=1, deletions=None):
+ num_updates=1):
topic_entries = [
Subscriber.TTopicItem(key=key_template + str(x), value=value_template + str(x))
for x in xrange(num_updates)]
- if deletions is None: deletions = []
return Subscriber.TTopicDelta(topic_name=topic_name,
topic_entries=topic_entries,
- topic_deletions=deletions,
is_delta=False)
def test_registration_ids_different(self):
@@ -349,11 +347,9 @@ class TestStatestore():
assert len(args.topic_deltas) == 1
assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries
assert args.topic_deltas[topic_name].topic_name == delta.topic_name
- assert args.topic_deltas[topic_name].topic_deletions == delta.topic_deletions
elif sub.update_count == 3:
# After the content-bearing update was processed, the next delta should be empty
assert len(args.topic_deltas[topic_name].topic_entries) == 0
- assert len(args.topic_deltas[topic_name].topic_deletions) == 0
return DEFAULT_UPDATE_STATE_RESPONSE
@@ -461,7 +457,7 @@ class TestStatestore():
assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
# Statestore should not send deletions when the update is not a delta, see
# IMPALA-1891
- assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0
+ assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False
return DEFAULT_UPDATE_STATE_RESPONSE
reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),