You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/16 23:04:21 UTC
[3/4] impala git commit: IMPALA-5058: Improve the concurrency of
DDL/DML operations
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
index 27839b3..1a5e2ec 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
@@ -17,6 +17,7 @@
package org.apache.impala.catalog;
+import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -24,24 +25,35 @@ import java.util.TreeMap;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TTable;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
/**
- * The impalad catalog cache can be modified by either a state store update or by a
- * direct ("fast") update that applies the result of a catalog operation to the cache
- * out-of-band of a state store update. This thread safe log tracks the divergence
- * (due to direct updates to the cache) of this impalad's cache from the last state
- * store update. This log is needed to ensure work is never undone. For example,
- * consider the following sequence of events:
- * t1: [Direct Update] - Add item A - (Catalog Version 9)
- * t2: [Direct Update] - Drop item A - (Catalog Version 10)
- * t3: [StateStore Update] - (From Catalog Version 9)
- * This log is used to ensure the state store update in t3 does not undo the drop in t2.
+ * Represents a log of deleted catalog objects.
*
- * Currently this only tracks objects that were dropped, since the catalog cache can be
- * queried to check if an object was added. TODO: Also track object additions from async
- * operations. This could be used to to "replay" the log in the case of a catalog reset
- * ("invalidate metadata"). Currently, the catalog may briefly go back in time if
- * "invalidate metadata" is run concurrently with async catalog operations.
+ * There are currently two use cases for this log:
+ *
+ * a) Processing catalog updates in the impalads
+ * The impalad catalog cache can be modified by either a state store update or by a
+ * direct update that applies the result of a catalog operation to the cache
+ * out-of-band of a state store update. This thread safe log tracks the divergence
+ * (due to direct updates to the cache) of this impalad's cache from the last state
+ * store update. This log is needed to ensure work is never undone. For example,
+ * consider the following sequence of events:
+ * t1: [Direct Update] - Add item A - (Catalog Version 9)
+ * t2: [Direct Update] - Drop item A - (Catalog Version 10)
+ * t3: [StateStore Update] - (From Catalog Version 9)
+ * This log is used to ensure the state store update in t3 does not undo the drop in t2.
+ * Currently this only tracks objects that were dropped, since the catalog cache can be
+ * queried to check if an object was added. TODO: Also track object additions from async
+ * operations. This could be used to to "replay" the log in the case of a catalog reset
+ * ("invalidate metadata"). Currently, the catalog may briefly go back in time if
+ * "invalidate metadata" is run concurrently with async catalog operations.
+ *
+ * b) Building catalog topic updates in the catalogd
+ * The catalogd uses this log to identify deleted catalog objects that have been deleted
+ * since the last catalog topic update. Once the catalog topic update is constructed,
+ * the old entries in the log are garbage collected to prevent the log from growing
+ * indefinitely.
*/
public class CatalogDeltaLog {
// Map of the catalog version an object was removed from the catalog
@@ -58,6 +70,17 @@ public class CatalogDeltaLog {
}
/**
+ * Retrieve all the removed catalog objects with versions in range
+ * (fromVersion, toVersion].
+ */
+ public synchronized List<TCatalogObject> retrieveObjects(long fromVersion,
+ long toVersion) {
+ SortedMap<Long, TCatalogObject> objects =
+ removedCatalogObjects_.subMap(fromVersion + 1, toVersion + 1);
+ return ImmutableList.<TCatalogObject>copyOf(objects.values());
+ }
+
+ /**
* Given the current catalog version, removes all items with catalogVersion <
* currectCatalogVersion. Such objects do not need to be tracked in the delta
* log anymore because they are consistent with the state store's view of the
@@ -85,36 +108,8 @@ public class CatalogDeltaLog {
SortedMap<Long, TCatalogObject> candidateObjects =
removedCatalogObjects_.tailMap(catalogObject.getCatalog_version());
for (Map.Entry<Long, TCatalogObject> entry: candidateObjects.entrySet()) {
- if (objectNamesMatch(catalogObject, entry.getValue())) return true;
+ if (Catalog.keyEquals(catalogObject, entry.getValue())) return true;
}
return false;
}
-
- /**
- * Returns true if the two objects have the same object type and name.
- * TODO: Use global object IDs everywhere instead of tracking catalog objects by name.
- */
- private boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) {
- if (first.getType() != second.getType()) return false;
- switch (first.getType()) {
- case DATABASE:
- return first.getDb().getDb_name().equalsIgnoreCase(second.getDb().getDb_name());
- case TABLE:
- case VIEW:
- TTable firstTbl = first.getTable();
- return firstTbl.getDb_name().equalsIgnoreCase(second.getTable().getDb_name()) &&
- firstTbl.getTbl_name().equalsIgnoreCase(second.getTable().getTbl_name());
- case FUNCTION:
- return first.getFn().getSignature().equals(second.getFn().getSignature()) &&
- first.getFn().getName().equals(second.getFn().getName());
- case ROLE:
- return first.getRole().getRole_name().equalsIgnoreCase(
- second.getRole().getRole_name());
- case PRIVILEGE:
- return first.getPrivilege().getPrivilege_name().equalsIgnoreCase(
- second.getPrivilege().getPrivilege_name()) &&
- first.getPrivilege().getRole_id() == second.getPrivilege().getRole_id();
- default: return false;
- }
- }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
index a2d8ca9..cc4c495 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
@@ -29,6 +29,9 @@ public interface CatalogObject {
// Returns the unqualified object name.
public String getName();
+ // Returns the unique name of this catalog object.
+ public String getUniqueName();
+
// Returns the version of this catalog object.
public long getCatalogVersion();
@@ -37,4 +40,4 @@ public interface CatalogObject {
// Returns true if this CatalogObject has had its metadata loaded, false otherwise.
public boolean isLoaded();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
index c578e41..d882cdb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
@@ -30,6 +30,8 @@ import com.google.common.collect.Lists;
/**
* Thread safe cache for storing CatalogObjects. Enforces that updates to existing
* entries only get applied if the new/updated object has a larger catalog version.
+ * add() and remove() functions also update the entries of the global instance of
+ * CatalogObjectVersionQueue which keeps track of the catalog objects versions.
*/
public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T> {
private final boolean caseInsensitiveKeys_;
@@ -71,13 +73,19 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
String key = catalogObject.getName();
if (caseInsensitiveKeys_) key = key.toLowerCase();
T existingItem = metadataCache_.putIfAbsent(key, catalogObject);
- if (existingItem == null) return true;
+ if (existingItem == null) {
+ CatalogObjectVersionQueue.INSTANCE.addVersion(
+ catalogObject.getCatalogVersion());
+ return true;
+ }
if (existingItem.getCatalogVersion() < catalogObject.getCatalogVersion()) {
// When existingItem != null it indicates there was already an existing entry
// associated with the key. Add the updated object iff it has a catalog
// version greater than the existing entry.
metadataCache_.put(key, catalogObject);
+ CatalogObjectVersionQueue.INSTANCE.updateVersions(
+ existingItem.getCatalogVersion(), catalogObject.getCatalogVersion());
return true;
}
return false;
@@ -89,7 +97,12 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
*/
public synchronized T remove(String name) {
if (caseInsensitiveKeys_) name = name.toLowerCase();
- return metadataCache_.remove(name);
+ T removedObject = metadataCache_.remove(name);
+ if (removedObject != null) {
+ CatalogObjectVersionQueue.INSTANCE.removeVersion(
+ removedObject.getCatalogVersion());
+ }
+ return removedObject;
}
/**
@@ -144,4 +157,4 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
public Iterator<T> iterator() {
return metadataCache_.values().iterator();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java
new file mode 100644
index 0000000..321355c
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.impala.common.NotImplementedException;
+import org.apache.impala.thrift.TCatalogObjectType;
+
+abstract public class CatalogObjectImpl implements CatalogObject {
+ // Current catalog version of this object. Initialized to
+ // Catalog.INITIAL_CATALOG_VERSION.
+ private AtomicLong catalogVersion_ = new AtomicLong(Catalog.INITIAL_CATALOG_VERSION);
+
+ protected CatalogObjectImpl() {}
+
+ @Override
+ public long getCatalogVersion() { return catalogVersion_.get(); }
+
+ @Override
+ public void setCatalogVersion(long newVersion) { catalogVersion_.set(newVersion); }
+
+ @Override
+ public boolean isLoaded() { return true; }
+
+ @Override
+ public String getName() { return ""; }
+
+ @Override
+ public String getUniqueName() { return ""; }
+}
+
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
new file mode 100644
index 0000000..5fcd398
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.List;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Singleton class used to maintain the versions of all the catalog objects stored in a
+ * local catalog cache. Simple wrapper around a priority queue which stores the catalog
+ * object versions, allowing O(1) retrieval of the minimum object version currently
+ * stored in the cache. Provides a simple API to add, remove and update catalog object
+ * versions. Not thread-safe.
+ *
+ * The primary use case of this class is to allow an Impalad catalog cache determine when
+ * the result set of an INVALIDATE METADATA operation has been applied locally by keeping
+ * track of the minimum catalog object version.
+ */
+public class CatalogObjectVersionQueue {
+ private final PriorityQueue<Long> objectVersions_ = new PriorityQueue<>();
+
+ public static final CatalogObjectVersionQueue INSTANCE =
+ new CatalogObjectVersionQueue();
+
+ private CatalogObjectVersionQueue() {}
+
+ public void updateVersions(long oldVersion, long newVersion) {
+ removeVersion(oldVersion);
+ addVersion(newVersion);
+ }
+
+ public void removeVersion(long oldVersion) {
+ objectVersions_.remove(oldVersion);
+ }
+
+ public void addVersion(long newVersion) {
+ objectVersions_.add(newVersion);
+ }
+
+ public long getMinimumVersion() {
+ Long minVersion = objectVersions_.peek();
+ return minVersion != null ? minVersion : 0;
+ }
+
+ public void addAll(List<? extends CatalogObject> catalogObjects) {
+ for (CatalogObject catalogObject: catalogObjects) {
+ addVersion(catalogObject.getCatalogVersion());
+ }
+ }
+
+ public void removeAll(List<? extends CatalogObject> catalogObjects) {
+ for (CatalogObject catalogObject: catalogObjects) {
+ removeVersion(catalogObject.getCatalogVersion());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index d2a0a82..f75b0a8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +33,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.Path;
@@ -45,9 +47,9 @@ import org.apache.hadoop.hive.metastore.api.ResourceType;
import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.impala.analysis.Analyzer;
import org.apache.impala.authorization.SentryConfig;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.TopicUpdateLog.Entry;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
@@ -58,8 +60,9 @@ import org.apache.impala.hive.executor.UdfExecutor;
import org.apache.impala.thrift.TCatalog;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TCatalogUpdateResult;
import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaResponse;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TPrivilege;
import org.apache.impala.thrift.TTable;
@@ -73,6 +76,8 @@ import org.apache.thrift.protocol.TCompactProtocol;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -80,14 +85,62 @@ import com.google.common.collect.Sets;
/**
* Specialized Catalog that implements the CatalogService specific Catalog
* APIs. The CatalogServiceCatalog manages loading of all the catalog metadata
- * and processing of DDL requests. For each DDL request, the CatalogServiceCatalog
- * will return the catalog version that the update will show up in. The client
- * can then wait until the statestore sends an update that contains that catalog
- * version.
- * The CatalogServiceCatalog also manages a global "catalog version". The version
- * is incremented and assigned to a CatalogObject whenever it is
- * added/modified/removed from the catalog. This means each CatalogObject will have a
- * unique version and assigned versions are strictly increasing.
+ * and processing of DDL requests. The CatalogServiceCatalog maintains a global
+ * "catalog version". The version is incremented and assigned to a CatalogObject whenever
+ * it is added/modified/removed from the catalog. This means each CatalogObject will have
+ * a unique version and assigned versions are strictly increasing.
+ *
+ * Periodically, the CatalogServiceCatalog collects a delta of catalog updates (based on a
+ * specified catalog version) and constructs a topic update to be sent to the statestore.
+ * Each catalog topic update is defined by a range of catalog versions (from, to] and the
+ * CatalogServiceCatalog guarantees that every catalog object that has a version in the
+ * specified range is included in the catalog topic update. Concurrent DDL requests are
+ * allowed while a topic update is in progress. Hence, there is a non-zero probability
+ * that frequently modified catalog objects may keep skipping topic updates. That can
+ * happen when by the time a topic update thread tries to collect an object update, that
+ * object is being modified by another metadata operation, causing its version to surpass
+ * the 'to' version of the topic update. To ensure that all catalog updates
+ * are eventually included in a catalog topic update, we keep track of the number of times
+ * each catalog object has skipped a topic update and if that number exceeds a specified
+ * threshold, we add the catalog object to the next topic update even if its version is
+ * higher than the 'to' version of the topic update. As a result, the same version of an
+ * object might be sent in two subsequent topic updates.
+ *
+ * The CatalogServiceCatalog maintains two logs:
+ * - Delete log. Since deleted objects are removed from the cache, the cache itself is
+ * not useful for tracking deletions. This log is used for populating the list of
+ * deleted objects during a topic update by recording the catalog objects that
+ * have been removed from the catalog. An entry with a new version is added to this log
+ * every time an object is removed (e.g. dropTable). Incrementing an object's version
+ * and adding it to the delete log should be performed atomically. An entry is removed
+ * from this log by the topic update thread when the associated deletion entry is
+ * added to a topic update.
+ * - Topic update log. This log records information about the catalog objects that have
+ * been included in a catalog topic update. Only the thread that is processing the
+ * topic update is responsible for adding, updating, and removing entries from the log.
+ * All other operations (e.g. addTable) only read topic update log entries but never
+ * modify them. Each entry includes the number of times a catalog object has
+ * skipped a topic update, which version of the object was last sent in a topic update
+ * and what was the version of that topic update. Entries of the topic update log are
+ * garbage-collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates by the topic
+ * update processing thread to prevent the log from growing indefinitely. Metadata
+ * operations using SYNC_DDL are inspecting this log to identify the catalog topic
+ * version that the issuing impalad must wait for in order to ensure that the effects
+ * of this operation have been broadcast to all the coordinators.
+ *
+ * Known anomalies with SYNC_DDL:
+ * The time-based cleanup process of the topic update log entries may cause metadata
+ * operations that use SYNC_DDL to hang while waiting for specific topic update log
+ * entries. That could happen if the thread processing the metadata operation stalls
+ * for a long period of time (longer than the time to process
+ * TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates) between the time the operation was
+ * applied in the catalog cache and the time the SYNC_DDL version was checked. To reduce
+ * the probability of such an event, we set the value of the
+ * TOPIC_UPDATE_LOG_GC_FREQUENCY to a large value. Also, to prevent metadata operations
+ * from hanging in that path due to unknown issues (e.g. bugs), operations using
+ * SYNC_DDL are not allowed to wait indefinitely for specific topic log entries and an
+ * exception is thrown if the specified max wait time is exceeded. See
+ * waitForSyncDdlVersion() for more details.
*
* Table metadata for IncompleteTables (not fully loaded tables) are loaded in the
* background by the TableLoadingMgr; tables can be prioritized for loading by calling
@@ -100,7 +153,7 @@ import com.google.common.collect.Sets;
* out-of-band of the table loading thread pool.
*
* See the class comments in CatalogOpExecutor for a description of the locking protocol
- * that should be employed if both the catalog lock and table locks need to be held at
+ * that should be employed if both the version lock and table locks need to be held at
* the same time.
*
* TODO: Consider removing on-demand loading and have everything go through the table
@@ -110,6 +163,7 @@ public class CatalogServiceCatalog extends Catalog {
private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
+ private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
private final TUniqueId catalogServiceId_;
// Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
@@ -123,11 +177,11 @@ public class CatalogServiceCatalog extends Catalog {
// from the metastore.
// * During renameTable(), because a table must be removed and added to the catalog
// atomically (potentially in a different database).
- private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true);
+ private final ReentrantReadWriteLock versionLock_ = new ReentrantReadWriteLock(true);
// Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented
// with each update to the Catalog. Continued across the lifetime of the object.
- // Protected by catalogLock_.
+ // Protected by versionLock_.
// TODO: Handle overflow of catalogVersion_ and nextTableId_.
// TODO: The name of this variable is misleading and can be interpreted as a property
// of the catalog server. Rename into something that indicates its role as a global
@@ -150,6 +204,19 @@ public class CatalogServiceCatalog extends Catalog {
// Local temporary directory to copy UDF Jars.
private static String localLibraryPath_;
+ // Log of deleted catalog objects.
+ private final CatalogDeltaLog deleteLog_;
+
+ // Version of the last topic update returned to the statestore.
+ // The version of a topic update is the catalog version of the CATALOG object
+ // that is added to it.
+ private final AtomicLong lastSentTopicUpdate_ = new AtomicLong(-1);
+
+ // Wait time for a topic update.
+ private static final long TOPIC_UPDATE_WAIT_TIMEOUT_MS = 10000;
+
+ private final TopicUpdateLog topicUpdateLog_ = new TopicUpdateLog();
+
/**
* Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
* will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
@@ -169,7 +236,7 @@ public class CatalogServiceCatalog extends Catalog {
// local, etc.)
if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) {
cachePoolReader_.scheduleAtFixedRate(
- new CachePoolReader(), 0, 1, TimeUnit.MINUTES);
+ new CachePoolReader(false), 0, 1, TimeUnit.MINUTES);
}
} catch (IOException e) {
LOG.error("Couldn't identify the default FS. Cache Pool reader will be disabled.");
@@ -180,6 +247,7 @@ public class CatalogServiceCatalog extends Catalog {
sentryProxy_ = null;
}
localLibraryPath_ = new String("file://" + localLibraryPath);
+ deleteLog_ = new CatalogDeltaLog();
}
// Timeout for acquiring a table lock
@@ -189,7 +257,7 @@ public class CatalogServiceCatalog extends Catalog {
private static final int TBL_LOCK_RETRY_MS = 10;
/**
- * Tries to acquire catalogLock_ and the lock of 'tbl' in that order. Returns true if it
+ * Tries to acquire versionLock_ and the lock of 'tbl' in that order. Returns true if it
* successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held
* when the function returns. Returns false otherwise and no lock is held in this case.
*/
@@ -197,7 +265,7 @@ public class CatalogServiceCatalog extends Catalog {
long begin = System.currentTimeMillis();
long end;
do {
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
if (tbl.getLock().tryLock()) {
if (LOG.isTraceEnabled()) {
end = System.currentTimeMillis();
@@ -206,7 +274,7 @@ public class CatalogServiceCatalog extends Catalog {
}
return true;
}
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
try {
// Sleep to avoid spinning and allow other operations to make progress.
Thread.sleep(TBL_LOCK_RETRY_MS);
@@ -223,12 +291,17 @@ public class CatalogServiceCatalog extends Catalog {
* Called periodically by the cachePoolReader_.
*/
protected class CachePoolReader implements Runnable {
-
+ // If true, existing cache pools will get a new catalog version and, consequently,
+ // they will be added to the next topic update, triggering an update in each
+ // coordinator's local catalog cache. This is needed for the case of INVALIDATE
+ // METADATA where a new catalog version needs to be assigned to every catalog object.
+ private final boolean incrementVersions_;
/**
* This constructor is needed to create a non-threaded execution of the class.
*/
- public CachePoolReader() {
+ public CachePoolReader(boolean incrementVersions) {
super();
+ incrementVersions_ = incrementVersions;
}
public void run() {
@@ -249,28 +322,45 @@ public class CatalogServiceCatalog extends Catalog {
return;
}
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
try {
// Determine what has changed relative to what we have cached.
Set<String> droppedCachePoolNames = Sets.difference(
hdfsCachePools_.keySet(), currentCachePools.keySet());
Set<String> createdCachePoolNames = Sets.difference(
currentCachePools.keySet(), hdfsCachePools_.keySet());
+ Set<String> survivingCachePoolNames = Sets.difference(
+ hdfsCachePools_.keySet(), droppedCachePoolNames);
// Add all new cache pools.
for (String createdCachePool: createdCachePoolNames) {
HdfsCachePool cachePool = new HdfsCachePool(
currentCachePools.get(createdCachePool));
- cachePool.setCatalogVersion(
- CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
+ cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
hdfsCachePools_.add(cachePool);
}
// Remove dropped cache pools.
for (String cachePoolName: droppedCachePoolNames) {
- hdfsCachePools_.remove(cachePoolName);
- CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
+ HdfsCachePool cachePool = hdfsCachePools_.remove(cachePoolName);
+ if (cachePool != null) {
+ cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
+ TCatalogObject removedObject =
+ new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
+ cachePool.getCatalogVersion());
+ removedObject.setCache_pool(cachePool.toThrift());
+ deleteLog_.addRemovedObject(removedObject);
+ }
+ }
+ if (incrementVersions_) {
+ // Increment the version of existing pools in order to be added to the next
+ // topic update.
+ for (String survivingCachePoolName: survivingCachePoolNames) {
+ HdfsCachePool cachePool = hdfsCachePools_.get(survivingCachePoolName);
+ Preconditions.checkNotNull(cachePool);
+ cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
+ }
}
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
}
}
@@ -297,120 +387,347 @@ public class CatalogServiceCatalog extends Catalog {
}
/**
- * Returns all known objects in the Catalog (Tables, Views, Databases, and
- * Functions). Some metadata may be skipped for objects that have a catalog
- * version < the specified "fromVersion". Takes a lock on the catalog to ensure this
- * update contains a consistent snapshot of all items in the catalog. While holding the
- * catalog lock, it locks each accessed table to protect against concurrent
- * modifications.
+ * Identifies and returns the catalog objects that were added/modified/deleted in the
+ * catalog with versions > 'fromVersion'. It operates on a snaphsot of the catalog
+ * without holding the catalog lock which means that other concurrent metadata
+ * operations can still make progress while the catalog delta is computed. An entry in
+ * the topic update log is added for every catalog object that is included in the
+ * catalog delta. The log is examined by operations using SYNC_DDL to determine which
+ * topic update covers the result set of metadata operation. Once the catalog delta is
+ * computed, the entries in the delete log with versions less than 'fromVersion' are
+ * garbage collected.
+ */
+ public TGetCatalogDeltaResponse getCatalogDelta(long fromVersion) {
+ // Maximum catalog version (inclusive) to be included in the catalog delta.
+ long toVersion = getCatalogVersion();
+ TGetCatalogDeltaResponse resp = new TGetCatalogDeltaResponse();
+ resp.setUpdated_objects(new ArrayList<TCatalogObject>());
+ resp.setDeleted_objects(new ArrayList<TCatalogObject>());
+ resp.setMax_catalog_version(toVersion);
+
+ for (Db db: getAllDbs()) {
+ addDatabaseToCatalogDelta(db, fromVersion, toVersion, resp);
+ }
+ for (DataSource dataSource: getAllDataSources()) {
+ addDataSourceToCatalogDelta(dataSource, fromVersion, toVersion, resp);
+ }
+ for (HdfsCachePool cachePool: getAllHdfsCachePools()) {
+ addHdfsCachePoolToCatalogDelta(cachePool, fromVersion, toVersion, resp);
+ }
+ for (Role role: getAllRoles()) {
+ addRoleToCatalogDelta(role, fromVersion, toVersion, resp);
+ }
+ Set<String> updatedCatalogObjects = Sets.newHashSet();
+ for (TCatalogObject catalogObj: resp.updated_objects) {
+ topicUpdateLog_.add(Catalog.toCatalogObjectKey(catalogObj),
+ new TopicUpdateLog.Entry(0, catalogObj.getCatalog_version(),
+ toVersion));
+ updatedCatalogObjects.add(Catalog.toCatalogObjectKey(catalogObj));
+ }
+
+ // Identify the catalog objects that were removed from the catalog for which their
+ // versions are in range ('fromVersion', 'toVersion']. We need to make sure
+ // that we don't include "deleted" objects that were re-added to the catalog.
+ for (TCatalogObject removedObject: getDeletedObjects(fromVersion, toVersion)) {
+ if (!updatedCatalogObjects.contains(
+ Catalog.toCatalogObjectKey(removedObject))) {
+ topicUpdateLog_.add(Catalog.toCatalogObjectKey(removedObject),
+ new TopicUpdateLog.Entry(0, removedObject.getCatalog_version(),
+ toVersion));
+ resp.addToDeleted_objects(removedObject);
+ }
+ }
+ // Each topic update should contain a single "TCatalog" object which is used to
+ // pass overall state on the catalog, such as the current version and the
+ // catalog service id. By setting the catalog version to the latest catalog
+ // version at this point, it ensures impalads will always bump their versions,
+ // even in the case where an object has been dropped.
+ TCatalogObject catalog =
+ new TCatalogObject(TCatalogObjectType.CATALOG, toVersion);
+ catalog.setCatalog(new TCatalog(catalogServiceId_));
+ resp.addToUpdated_objects(catalog);
+ // Garbage collect the delete and topic update log.
+ deleteLog_.garbageCollect(toVersion);
+ topicUpdateLog_.garbageCollectUpdateLogEntries(toVersion);
+ lastSentTopicUpdate_.set(toVersion);
+ // Notify any operation that is waiting on the next topic update.
+ synchronized (topicUpdateLog_) {
+ topicUpdateLog_.notifyAll();
+ }
+ return resp;
+ }
+
+ /**
+ * Get a snapshot view of all the catalog objects that were deleted between versions
+ * ('fromVersion', 'toVersion'].
*/
- public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) {
- TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse();
- resp.setObjects(new ArrayList<TCatalogObject>());
- resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
- catalogLock_.readLock().lock();
+ private List<TCatalogObject> getDeletedObjects(long fromVersion, long toVersion) {
+ versionLock_.readLock().lock();
try {
- for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
- TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
- db.getCatalogVersion());
- catalogDb.setDb(db.toThrift());
- resp.addToObjects(catalogDb);
-
- for (String tblName: db.getAllTableNames()) {
- TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
- Catalog.INITIAL_CATALOG_VERSION);
-
- Table tbl = db.getTable(tblName);
- if (tbl == null) {
- LOG.error("Table: " + tblName + " was expected to be in the catalog " +
- "cache. Skipping table for this update.");
- continue;
- }
+ return deleteLog_.retrieveObjects(fromVersion, toVersion);
+ } finally {
+ versionLock_.readLock().unlock();
+ }
+ }
- // Protect the table from concurrent modifications.
- tbl.getLock().lock();
- try {
- // Only add the extended metadata if this table's version is >=
- // the fromVersion.
- if (tbl.getCatalogVersion() >= fromVersion) {
- try {
- catalogTbl.setTable(tbl.toThrift());
- } catch (Exception e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Error calling toThrift() on table %s.%s: %s",
- db.getName(), tblName, e.getMessage()), e);
- }
- continue;
- }
- catalogTbl.setCatalog_version(tbl.getCatalogVersion());
- } else {
- catalogTbl.setTable(new TTable(db.getName(), tblName));
- }
- } finally {
- tbl.getLock().unlock();
- }
- resp.addToObjects(catalogTbl);
- }
+ /**
+ * Get a snapshot view of all the databases in the catalog.
+ */
+ private List<Db> getAllDbs() {
+ versionLock_.readLock().lock();
+ try {
+ return ImmutableList.copyOf(dbCache_.get().values());
+ } finally {
+ versionLock_.readLock().unlock();
+ }
+ }
- for (Function fn: db.getFunctions(null, new PatternMatcher())) {
- TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
- fn.getCatalogVersion());
- function.setFn(fn.toThrift());
- resp.addToObjects(function);
- }
+ /**
+ * Get a snapshot view of all the data sources in the catalog.
+ */
+ private List<DataSource> getAllDataSources() {
+ versionLock_.readLock().lock();
+ try {
+ return ImmutableList.copyOf(getDataSources());
+ } finally {
+ versionLock_.readLock().unlock();
+ }
+ }
+
+ /**
+ * Get a snapshot view of all the Hdfs cache pools in the catalog.
+ */
+ private List<HdfsCachePool> getAllHdfsCachePools() {
+ versionLock_.readLock().lock();
+ try {
+ return ImmutableList.copyOf(hdfsCachePools_);
+ } finally {
+ versionLock_.readLock().unlock();
+ }
+ }
+
+ /**
+ * Get a snapshot view of all the roles in the catalog.
+ */
+ private List<Role> getAllRoles() {
+ versionLock_.readLock().lock();
+ try {
+ return ImmutableList.copyOf(authPolicy_.getAllRoles());
+ } finally {
+ versionLock_.readLock().unlock();
+ }
+ }
+
+ /**
+ * Adds a database in the topic update if its version is in the range
+ * ('fromVersion', 'toVersion']. It iterates through all the tables and functions of
+ * this database to determine if they can be included in the topic update.
+ */
+ private void addDatabaseToCatalogDelta(Db db, long fromVersion, long toVersion,
+ TGetCatalogDeltaResponse resp) {
+ long dbVersion = db.getCatalogVersion();
+ if (dbVersion > fromVersion && dbVersion <= toVersion) {
+ TCatalogObject catalogDb =
+ new TCatalogObject(TCatalogObjectType.DATABASE, dbVersion);
+ catalogDb.setDb(db.toThrift());
+ resp.addToUpdated_objects(catalogDb);
+ }
+ for (Table tbl: getAllTables(db)) {
+ addTableToCatalogDelta(tbl, fromVersion, toVersion, resp);
+ }
+ for (Function fn: getAllFunctions(db)) {
+ addFunctionToCatalogDelta(fn, fromVersion, toVersion, resp);
+ }
+ }
+
+ /**
+ * Get a snapshot view of all the tables in a database.
+ */
+ private List<Table> getAllTables(Db db) {
+ Preconditions.checkNotNull(db);
+ versionLock_.readLock().lock();
+ try {
+ return ImmutableList.copyOf(db.getTables());
+ } finally {
+ versionLock_.readLock().unlock();
+ }
+ }
+
+ /**
+ * Get a snapshot view of all the functions in a database.
+ */
+ private List<Function> getAllFunctions(Db db) {
+ Preconditions.checkNotNull(db);
+ versionLock_.readLock().lock();
+ try {
+ return ImmutableList.copyOf(db.getFunctions(null, new PatternMatcher()));
+ } finally {
+ versionLock_.readLock().unlock();
+ }
+ }
+
+ /**
+ * Adds a table in the topic update if its version is in the range
+ * ('fromVersion', 'toVersion']. If the table's version is larger than 'toVersion' and
+ * the table has skipped a topic update 'MAX_NUM_SKIPPED_TOPIC_UPDATES' times, it is
+ * included in the topic update. This prevents tables that are updated frequently from
+ * skipping topic updates indefinitely, which would also violate the semantics of
+ * SYNC_DDL.
+ */
+ private void addTableToCatalogDelta(Table tbl, long fromVersion, long toVersion,
+ TGetCatalogDeltaResponse resp) {
+ if (tbl.getCatalogVersion() <= toVersion) {
+ addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp);
+ } else {
+ TopicUpdateLog.Entry topicUpdateEntry =
+ topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
+ Preconditions.checkNotNull(topicUpdateEntry);
+ if (topicUpdateEntry.getNumSkippedTopicUpdates() >= MAX_NUM_SKIPPED_TOPIC_UPDATES) {
+ addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp);
+ } else {
+ LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
+ toVersion);
+ topicUpdateLog_.add(tbl.getUniqueName(),
+ new TopicUpdateLog.Entry(
+ topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
+ topicUpdateEntry.getLastSentVersion(),
+ topicUpdateEntry.getLastSentCatalogUpdate()));
}
+ }
+ }
- for (DataSource dataSource: getDataSources()) {
- TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
- dataSource.getCatalogVersion());
- catalogObj.setData_source(dataSource.toThrift());
- resp.addToObjects(catalogObj);
+ /**
+ * Helper function that tries to add a table in a topic update. It acquires table's
+ * lock and checks if its version is in the ('fromVersion', 'toVersion'] range and how
+ * many consecutive times (if any) has the table skipped a topic update.
+ */
+ private void addTableToCatalogDeltaHelper(Table tbl, long fromVersion, long toVersion,
+ TGetCatalogDeltaResponse resp) {
+ TCatalogObject catalogTbl =
+ new TCatalogObject(TCatalogObjectType.TABLE, Catalog.INITIAL_CATALOG_VERSION);
+ tbl.getLock().lock();
+ try {
+ long tblVersion = tbl.getCatalogVersion();
+ if (tblVersion <= fromVersion) return;
+ TopicUpdateLog.Entry topicUpdateEntry =
+ topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
+ if (tblVersion > toVersion &&
+ topicUpdateEntry.getNumSkippedTopicUpdates() < MAX_NUM_SKIPPED_TOPIC_UPDATES) {
+ LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
+ toVersion);
+ topicUpdateLog_.add(tbl.getUniqueName(),
+ new TopicUpdateLog.Entry(
+ topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
+ topicUpdateEntry.getLastSentVersion(),
+ topicUpdateEntry.getLastSentCatalogUpdate()));
+ return;
}
- for (HdfsCachePool cachePool: hdfsCachePools_) {
- TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
- cachePool.getCatalogVersion());
- pool.setCache_pool(cachePool.toThrift());
- resp.addToObjects(pool);
+ try {
+ catalogTbl.setTable(tbl.toThrift());
+ } catch (Exception e) {
+ LOG.error(String.format("Error calling toThrift() on table %s: %s",
+ tbl.getFullName(), e.getMessage()), e);
+ return;
}
+ catalogTbl.setCatalog_version(tbl.getCatalogVersion());
+ resp.addToUpdated_objects(catalogTbl);
+ } finally {
+ tbl.getLock().unlock();
+ }
+ }
- // Get all roles
- for (Role role: authPolicy_.getAllRoles()) {
- TCatalogObject thriftRole = new TCatalogObject();
- thriftRole.setRole(role.toThrift());
- thriftRole.setCatalog_version(role.getCatalogVersion());
- thriftRole.setType(role.getCatalogObjectType());
- resp.addToObjects(thriftRole);
-
- for (RolePrivilege p: role.getPrivileges()) {
- TCatalogObject privilege = new TCatalogObject();
- privilege.setPrivilege(p.toThrift());
- privilege.setCatalog_version(p.getCatalogVersion());
- privilege.setType(p.getCatalogObjectType());
- resp.addToObjects(privilege);
- }
- }
+ /**
+ * Adds a function to the topic update if its version is in the range
+ * ('fromVersion', 'toVersion'].
+ */
+ private void addFunctionToCatalogDelta(Function fn, long fromVersion, long toVersion,
+ TGetCatalogDeltaResponse resp) {
+ long fnVersion = fn.getCatalogVersion();
+ if (fnVersion <= fromVersion || fnVersion > toVersion) return;
+ TCatalogObject function =
+ new TCatalogObject(TCatalogObjectType.FUNCTION, fnVersion);
+ function.setFn(fn.toThrift());
+ resp.addToUpdated_objects(function);
+ }
- // Each update should contain a single "TCatalog" object which is used to
- // pass overall state on the catalog, such as the current version and the
- // catalog service id.
- TCatalogObject catalog = new TCatalogObject();
- catalog.setType(TCatalogObjectType.CATALOG);
- // By setting the catalog version to the latest catalog version at this point,
- // it ensure impalads will always bump their versions, even in the case where
- // an object has been dropped.
- catalog.setCatalog_version(getCatalogVersion());
- catalog.setCatalog(new TCatalog(catalogServiceId_));
- resp.addToObjects(catalog);
-
- // The max version is the max catalog version of all items in the update.
- resp.setMax_catalog_version(getCatalogVersion());
- return resp;
+ /**
+ * Adds a data source to the topic update if its version is in the range
+ * ('fromVersion', 'toVersion'].
+ */
+ private void addDataSourceToCatalogDelta(DataSource dataSource, long fromVersion,
+ long toVersion, TGetCatalogDeltaResponse resp) {
+ long dsVersion = dataSource.getCatalogVersion();
+ if (dsVersion <= fromVersion || dsVersion > toVersion) return;
+ TCatalogObject catalogObj =
+ new TCatalogObject(TCatalogObjectType.DATA_SOURCE, dsVersion);
+ catalogObj.setData_source(dataSource.toThrift());
+ resp.addToUpdated_objects(catalogObj);
+ }
+
+ /**
+ * Adds a HDFS cache pool to the topic update if its version is in the range
+ * ('fromVersion', 'toVersion'].
+ */
+ private void addHdfsCachePoolToCatalogDelta(HdfsCachePool cachePool, long fromVersion,
+ long toVersion, TGetCatalogDeltaResponse resp) {
+ long cpVersion = cachePool.getCatalogVersion();
+ if (cpVersion <= fromVersion || cpVersion > toVersion) {
+ return;
+ }
+ TCatalogObject pool =
+ new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, cpVersion);
+ pool.setCache_pool(cachePool.toThrift());
+ resp.addToUpdated_objects(pool);
+ }
+
+
+ /**
+ * Adds a role to the topic update if its version is in the range
+ * ('fromVersion', 'toVersion']. It iterates through all the privileges of this role to
+ * determine if they can be inserted in the topic update.
+ */
+ private void addRoleToCatalogDelta(Role role, long fromVersion, long toVersion,
+ TGetCatalogDeltaResponse resp) {
+ long roleVersion = role.getCatalogVersion();
+ if (roleVersion > fromVersion && roleVersion <= toVersion) {
+ TCatalogObject thriftRole =
+ new TCatalogObject(TCatalogObjectType.ROLE, roleVersion);
+ thriftRole.setRole(role.toThrift());
+ resp.addToUpdated_objects(thriftRole);
+ }
+ for (RolePrivilege p: getAllPrivileges(role)) {
+ addRolePrivilegeToCatalogDelta(p, fromVersion, toVersion, resp);
+ }
+ }
+
+ /**
+ * Get a snapshot view of all the privileges in a role.
+ */
+ private List<RolePrivilege> getAllPrivileges(Role role) {
+ Preconditions.checkNotNull(role);
+ versionLock_.readLock().lock();
+ try {
+ return ImmutableList.copyOf(role.getPrivileges());
} finally {
- catalogLock_.readLock().unlock();
+ versionLock_.readLock().unlock();
}
}
/**
+ * Adds a role privilege to the topic update if its version is in the range
+ * ('fromVersion', 'toVersion'].
+ */
+ private void addRolePrivilegeToCatalogDelta(RolePrivilege priv, long fromVersion,
+ long toVersion, TGetCatalogDeltaResponse resp) {
+ long privVersion = priv.getCatalogVersion();
+ if (privVersion <= fromVersion || privVersion > toVersion) return;
+ TCatalogObject privilege =
+ new TCatalogObject(TCatalogObjectType.PRIVILEGE, privVersion);
+ privilege.setPrivilege(priv.toThrift());
+ resp.addToUpdated_objects(privilege);
+ }
+
+ /**
* Returns all user defined functions (aggregate and scalar) in the specified database.
* Functions are not returned in a defined order.
*/
@@ -710,6 +1027,31 @@ public class CatalogServiceCatalog extends Catalog {
tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
}
}
+
+ if (existingDb != null) {
+ // Identify any removed functions and add them to the delta log.
+ for (Map.Entry<String, List<Function>> e:
+ existingDb.getAllFunctions().entrySet()) {
+ for (Function fn: e.getValue()) {
+ if (newDb.getFunction(fn,
+ Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
+ fn.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(fn.toTCatalogObject());
+ }
+ }
+ }
+
+ // Identify any deleted tables and add them to the delta log
+ Set<String> oldTableNames = Sets.newHashSet(existingDb.getAllTableNames());
+ Set<String> newTableNames = Sets.newHashSet(newDb.getAllTableNames());
+ oldTableNames.removeAll(newTableNames);
+ for (String removedTableName: oldTableNames) {
+ Table removedTable = IncompleteTable.createUninitializedTable(existingDb,
+ removedTableName);
+ removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(removedTable.toTCatalogObject());
+ }
+ }
return Pair.create(newDb, tblsToBackgroundLoad);
} catch (Exception e) {
LOG.warn("Encountered an exception while invalidating database: " + dbName +
@@ -720,22 +1062,35 @@ public class CatalogServiceCatalog extends Catalog {
/**
* Resets this catalog instance by clearing all cached table and database metadata.
+ * Returns the current catalog version before reset has taken any effect. The
+ * requesting impalad will use that version to determine when the
+ * effects of reset have been applied to its local catalog cache.
*/
- public void reset() throws CatalogException {
- LOG.info("Invalidating all metadata.");
-
+ public long reset() throws CatalogException {
+ long currentCatalogVersion = getCatalogVersion();
+ LOG.info("Invalidating all metadata. Version: " + currentCatalogVersion);
// First update the policy metadata.
if (sentryProxy_ != null) {
// Sentry Service is enabled.
try {
// Update the authorization policy, waiting for the result to complete.
- sentryProxy_.refresh();
+ sentryProxy_.refresh(true);
} catch (Exception e) {
throw new CatalogException("Error updating authorization policy: ", e);
}
}
- catalogLock_.writeLock().lock();
+ // Update the HDFS cache pools
+ CachePoolReader reader = new CachePoolReader(true);
+ reader.run();
+
+ versionLock_.writeLock().lock();
+ // Assign new versions to all the loaded data sources.
+ for (DataSource dataSource: getDataSources()) {
+ dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+ }
+
+ // Update db and table metadata
try {
// Not all Java UDFs are persisted to the metastore. The ones which aren't
// should be restored once the catalog has been invalidated.
@@ -757,6 +1112,16 @@ public class CatalogServiceCatalog extends Catalog {
}
}
dbCache_.set(newDbCache);
+
+ // Identify any deleted databases and add them to the delta log.
+ Set<String> oldDbNames = oldDbCache.keySet();
+ Set<String> newDbNames = newDbCache.keySet();
+ oldDbNames.removeAll(newDbNames);
+ for (String dbName: oldDbNames) {
+ Db removedDb = oldDbCache.get(dbName);
+ updateDeleteLog(removedDb);
+ }
+
// Submit tables for background loading.
for (TTableName tblName: tblsToBackgroundLoad) {
tableLoadingMgr_.backgroundLoad(tblName);
@@ -765,21 +1130,26 @@ public class CatalogServiceCatalog extends Catalog {
LOG.error(e);
throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
LOG.info("Invalidated all metadata.");
+ return currentCatalogVersion;
}
/**
* Adds a database name to the metadata cache and returns the database's
* new Db object. Used by CREATE DATABASE statements.
*/
- public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb)
- throws ImpalaException {
+ public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
Db newDb = new Db(dbName, this, msDb);
- newDb.setCatalogVersion(incrementAndGetCatalogVersion());
- addDb(newDb);
- return newDb;
+ versionLock_.writeLock().lock();
+ try {
+ newDb.setCatalogVersion(incrementAndGetCatalogVersion());
+ addDb(newDb);
+ return newDb;
+ } finally {
+ versionLock_.writeLock().unlock();
+ }
}
/**
@@ -789,11 +1159,36 @@ public class CatalogServiceCatalog extends Catalog {
*/
@Override
public Db removeDb(String dbName) {
- Db removedDb = super.removeDb(dbName);
- if (removedDb != null) {
- removedDb.setCatalogVersion(incrementAndGetCatalogVersion());
+ versionLock_.writeLock().lock();
+ try {
+ Db removedDb = super.removeDb(dbName);
+ if (removedDb != null) updateDeleteLog(removedDb);
+ return removedDb;
+ } finally {
+ versionLock_.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Helper function to clean up the state associated with a removed database. It creates
+ * the entries in the delete log for 'db' as well as for its tables and functions
+ * (if any).
+ */
+ private void updateDeleteLog(Db db) {
+ Preconditions.checkNotNull(db);
+ Preconditions.checkState(versionLock_.isWriteLockedByCurrentThread());
+ if (!db.isSystemDb()) {
+ for (Table tbl: db.getTables()) {
+ tbl.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(tbl.toMinimalTCatalogObject());
+ }
+ for (Function fn: db.getFunctions(null, new PatternMatcher())) {
+ fn.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(fn.toTCatalogObject());
+ }
}
- return removedDb;
+ db.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(db.toTCatalogObject());
}
/**
@@ -804,8 +1199,13 @@ public class CatalogServiceCatalog extends Catalog {
Db db = getDb(dbName);
if (db == null) return null;
Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
- incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
- db.addTable(incompleteTable);
+ versionLock_.writeLock().lock();
+ try {
+ incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
+ db.addTable(incompleteTable);
+ } finally {
+ versionLock_.writeLock().unlock();
+ }
return db.getTable(tblName);
}
@@ -825,14 +1225,14 @@ public class CatalogServiceCatalog extends Catalog {
long previousCatalogVersion;
// Return the table if it is already loaded or submit a new load request.
- catalogLock_.readLock().lock();
+ versionLock_.readLock().lock();
try {
Table tbl = getTable(dbName, tblName);
if (tbl == null || tbl.isLoaded()) return tbl;
previousCatalogVersion = tbl.getCatalogVersion();
loadReq = tableLoadingMgr_.loadAsync(tableName);
} finally {
- catalogLock_.readLock().unlock();
+ versionLock_.readLock().unlock();
}
Preconditions.checkNotNull(loadReq);
try {
@@ -850,7 +1250,7 @@ public class CatalogServiceCatalog extends Catalog {
*/
private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
throws DatabaseNotFoundException {
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
try {
Db db = getDb(updatedTbl.getDb().getName());
if (db == null) {
@@ -868,7 +1268,7 @@ public class CatalogServiceCatalog extends Catalog {
db.addTable(updatedTbl);
return updatedTbl;
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
}
@@ -879,12 +1279,17 @@ public class CatalogServiceCatalog extends Catalog {
public Table removeTable(String dbName, String tblName) {
Db parentDb = getDb(dbName);
if (parentDb == null) return null;
-
- Table removedTable = parentDb.removeTable(tblName);
- if (removedTable != null) {
- removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+ versionLock_.writeLock().lock();
+ try {
+ Table removedTable = parentDb.removeTable(tblName);
+ if (removedTable != null) {
+ removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(removedTable.toMinimalTCatalogObject());
+ }
+ return removedTable;
+ } finally {
+ versionLock_.writeLock().unlock();
}
- return removedTable;
}
/**
@@ -894,11 +1299,17 @@ public class CatalogServiceCatalog extends Catalog {
*/
@Override
public Function removeFunction(Function desc) {
- Function removedFn = super.removeFunction(desc);
- if (removedFn != null) {
- removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
+ versionLock_.writeLock().lock();
+ try {
+ Function removedFn = super.removeFunction(desc);
+ if (removedFn != null) {
+ removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(removedFn.toTCatalogObject());
+ }
+ return removedFn;
+ } finally {
+ versionLock_.writeLock().unlock();
}
- return removedFn;
}
/**
@@ -909,9 +1320,14 @@ public class CatalogServiceCatalog extends Catalog {
public boolean addFunction(Function fn) {
Db db = getDb(fn.getFunctionName().getDb());
if (db == null) return false;
- if (db.addFunction(fn)) {
- fn.setCatalogVersion(incrementAndGetCatalogVersion());
- return true;
+ versionLock_.writeLock().lock();
+ try {
+ if (db.addFunction(fn)) {
+ fn.setCatalogVersion(incrementAndGetCatalogVersion());
+ return true;
+ }
+ } finally {
+ versionLock_.writeLock().unlock();
}
return false;
}
@@ -922,20 +1338,31 @@ public class CatalogServiceCatalog extends Catalog {
*/
@Override
public boolean addDataSource(DataSource dataSource) {
- if (dataSources_.add(dataSource)) {
- dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
- return true;
+ versionLock_.writeLock().lock();
+ try {
+ if (dataSources_.add(dataSource)) {
+ dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+ return true;
+ }
+ } finally {
+ versionLock_.writeLock().unlock();
}
return false;
}
@Override
public DataSource removeDataSource(String dataSourceName) {
- DataSource dataSource = dataSources_.remove(dataSourceName);
- if (dataSource != null) {
- dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+ versionLock_.writeLock().lock();
+ try {
+ DataSource dataSource = dataSources_.remove(dataSourceName);
+ if (dataSource != null) {
+ dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(dataSource.toTCatalogObject());
+ }
+ return dataSource;
+ } finally {
+ versionLock_.writeLock().unlock();
}
- return dataSource;
}
/**
@@ -969,20 +1396,30 @@ public class CatalogServiceCatalog extends Catalog {
/**
* Renames a table. Equivalent to an atomic drop + add of the table. Returns
- * the new Table object with an incremented catalog version or null if the
- * drop or add were unsuccessful. If null is returned, then the catalog cache
- * is in one of the following two states:
- * 1. Old table was not removed, and new table was not added
- * 2. Old table was removed, but new table was not added
+ * a pair of tables containing the removed table (or null if the table drop was not
+ * successful) and the new table (or null if either the drop of the old one or the
+ * add of the new table was not successful). Depending on the return value, the catalog
+ * cache is in one of the following states:
+ * 1. null, null: Old table was not removed and new table was not added.
+ * 2. null, T_new: Invalid configuration
+ * 3. T_old, null: Old table was removed but new table was not added.
+ * 4. T_old, T_new: Old table was removed and new table was added.
*/
- public Table renameTable(TTableName oldTableName, TTableName newTableName)
+ public Pair<Table, Table> renameTable(TTableName oldTableName, TTableName newTableName)
throws CatalogException {
// Remove the old table name from the cache and add the new table.
Db db = getDb(oldTableName.getDb_name());
if (db == null) return null;
- Table oldTable = db.removeTable(oldTableName.getTable_name());
- if (oldTable == null) return null;
- return addTable(newTableName.getDb_name(), newTableName.getTable_name());
+ versionLock_.writeLock().lock();
+ try {
+ Table oldTable =
+ removeTable(oldTableName.getDb_name(), oldTableName.getTable_name());
+ if (oldTable == null) return Pair.create(null, null);
+ return Pair.create(oldTable,
+ addTable(newTableName.getDb_name(), newTableName.getTable_name()));
+ } finally {
+ versionLock_.writeLock().unlock();
+ }
}
/**
@@ -1004,7 +1441,7 @@ public class CatalogServiceCatalog extends Catalog {
}
try {
long newCatalogVersion = incrementAndGetCatalogVersion();
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Table msTbl = null;
try {
@@ -1019,7 +1456,7 @@ public class CatalogServiceCatalog extends Catalog {
LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
return tbl.toTCatalogObject();
} finally {
- Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
+ Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
tbl.getLock().unlock();
}
}
@@ -1123,9 +1560,7 @@ public class CatalogServiceCatalog extends Catalog {
try {
msDb = msClient.getHiveClient().getDatabase(dbName);
Preconditions.checkNotNull(msDb);
- db = new Db(dbName, this, msDb);
- db.setCatalogVersion(incrementAndGetCatalogVersion());
- addDb(db);
+ addDb(dbName, msDb);
dbWasAdded.setRef(true);
} catch (TException e) {
// The Metastore database cannot be get. Log the error and return.
@@ -1138,9 +1573,8 @@ public class CatalogServiceCatalog extends Catalog {
// Add a new uninitialized table to the table cache, effectively invalidating
// any existing entry. The metadata for the table will be loaded lazily, on the
// on the next access to the table.
- Table newTable = IncompleteTable.createUninitializedTable(db, tblName);
- newTable.setCatalogVersion(incrementAndGetCatalogVersion());
- db.addTable(newTable);
+ Table newTable = addTable(dbName, tblName);
+ Preconditions.checkNotNull(newTable);
if (loadInBackground_) {
tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
tblName.toLowerCase()));
@@ -1148,7 +1582,10 @@ public class CatalogServiceCatalog extends Catalog {
if (dbWasAdded.getRef()) {
// The database should always have a lower catalog version than the table because
// it needs to be created before the table can be added.
- Preconditions.checkState(db.getCatalogVersion() < newTable.getCatalogVersion());
+ Db addedDb = newTable.getDb();
+ Preconditions.checkNotNull(addedDb);
+ Preconditions.checkState(
+ addedDb.getCatalogVersion() < newTable.getCatalogVersion());
}
return newTable.toTCatalogObject();
}
@@ -1158,14 +1595,14 @@ public class CatalogServiceCatalog extends Catalog {
* If a role with the same name already exists it will be overwritten.
*/
public Role addRole(String roleName, Set<String> grantGroups) {
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
try {
Role role = new Role(roleName, grantGroups);
role.setCatalogVersion(incrementAndGetCatalogVersion());
authPolicy_.addRole(role);
return role;
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
}
@@ -1175,14 +1612,19 @@ public class CatalogServiceCatalog extends Catalog {
* exists.
*/
public Role removeRole(String roleName) {
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
try {
Role role = authPolicy_.removeRole(roleName);
if (role == null) return null;
+ for (RolePrivilege priv: role.getPrivileges()) {
+ priv.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(priv.toTCatalogObject());
+ }
role.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(role.toTCatalogObject());
return role;
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
}
@@ -1192,14 +1634,14 @@ public class CatalogServiceCatalog extends Catalog {
*/
public Role addRoleGrantGroup(String roleName, String groupName)
throws CatalogException {
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
try {
Role role = authPolicy_.addGrantGroup(roleName, groupName);
Preconditions.checkNotNull(role);
role.setCatalogVersion(incrementAndGetCatalogVersion());
return role;
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
}
@@ -1209,14 +1651,14 @@ public class CatalogServiceCatalog extends Catalog {
*/
public Role removeRoleGrantGroup(String roleName, String groupName)
throws CatalogException {
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
try {
Role role = authPolicy_.removeGrantGroup(roleName, groupName);
Preconditions.checkNotNull(role);
role.setCatalogVersion(incrementAndGetCatalogVersion());
return role;
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
}
@@ -1227,7 +1669,7 @@ public class CatalogServiceCatalog extends Catalog {
*/
public RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
throws CatalogException {
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
try {
Role role = authPolicy_.getRole(roleName);
if (role == null) throw new CatalogException("Role does not exist: " + roleName);
@@ -1236,7 +1678,7 @@ public class CatalogServiceCatalog extends Catalog {
authPolicy_.addPrivilege(priv);
return priv;
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
}
@@ -1247,7 +1689,7 @@ public class CatalogServiceCatalog extends Catalog {
*/
public RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv)
throws CatalogException {
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
try {
Role role = authPolicy_.getRole(roleName);
if (role == null) throw new CatalogException("Role does not exist: " + roleName);
@@ -1255,9 +1697,10 @@ public class CatalogServiceCatalog extends Catalog {
role.removePrivilege(thriftPriv.getPrivilege_name());
if (rolePrivilege == null) return null;
rolePrivilege.setCatalogVersion(incrementAndGetCatalogVersion());
+ deleteLog_.addRemovedObject(rolePrivilege.toTCatalogObject());
return rolePrivilege;
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
}
@@ -1268,13 +1711,13 @@ public class CatalogServiceCatalog extends Catalog {
*/
public RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec)
throws CatalogException {
- catalogLock_.readLock().lock();
+ versionLock_.readLock().lock();
try {
Role role = authPolicy_.getRole(roleName);
if (role == null) throw new CatalogException("Role does not exist: " + roleName);
return role.getPrivilege(privSpec.getPrivilege_name());
} finally {
- catalogLock_.readLock().unlock();
+ versionLock_.readLock().unlock();
}
}
@@ -1282,11 +1725,11 @@ public class CatalogServiceCatalog extends Catalog {
* Increments the current Catalog version and returns the new value.
*/
public long incrementAndGetCatalogVersion() {
- catalogLock_.writeLock().lock();
+ versionLock_.writeLock().lock();
try {
return ++catalogVersion_;
} finally {
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
}
}
@@ -1294,16 +1737,15 @@ public class CatalogServiceCatalog extends Catalog {
* Returns the current Catalog version.
*/
public long getCatalogVersion() {
- catalogLock_.readLock().lock();
+ versionLock_.readLock().lock();
try {
return catalogVersion_;
} finally {
- catalogLock_.readLock().unlock();
+ versionLock_.readLock().unlock();
}
}
- public ReentrantReadWriteLock getLock() { return catalogLock_; }
-
+ public ReentrantReadWriteLock getLock() { return versionLock_; }
public SentryProxy getSentryProxy() { return sentryProxy_; }
public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
@@ -1320,7 +1762,7 @@ public class CatalogServiceCatalog extends Catalog {
}
try {
long newCatalogVersion = incrementAndGetCatalogVersion();
- catalogLock_.writeLock().unlock();
+ versionLock_.writeLock().unlock();
HdfsTable hdfsTable = (HdfsTable) tbl;
HdfsPartition hdfsPartition = hdfsTable
.getPartitionFromThriftPartitionSpec(partitionSpec);
@@ -1355,8 +1797,111 @@ public class CatalogServiceCatalog extends Catalog {
hdfsTable.getFullName(), partitionName));
return hdfsTable.toTCatalogObject();
} finally {
- Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
+ Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
tbl.getLock().unlock();
}
}
+
+ public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
+
+ /**
+ * Returns the version of the topic update that an operation using SYNC_DDL must wait
+ * for in order to ensure that its result set ('result') has been broadcast to all the
+ * coordinators. For operations that don't produce a result set, e.g. INVALIDATE
+ * METADATA, return the version specified in 'result.version'.
+ */
+ public long waitForSyncDdlVersion(TCatalogUpdateResult result) throws CatalogException {
+ if (!result.isSetUpdated_catalog_objects() &&
+ !result.isSetRemoved_catalog_objects()) {
+ return result.getVersion();
+ }
+ long lastSentTopicUpdate = lastSentTopicUpdate_.get();
+ // Maximum number of attempts (topic updates) to find the catalog topic version that
+ // an operation using SYNC_DDL must wait for.
+ long maxNumAttempts = 5;
+ if (result.isSetUpdated_catalog_objects()) {
+ maxNumAttempts =
+ result.getUpdated_catalog_objects().size() * (MAX_NUM_SKIPPED_TOPIC_UPDATES + 1);
+ }
+ long numAttempts = 0;
+ long begin = System.currentTimeMillis();
+ long versionToWaitFor = -1;
+ while (versionToWaitFor == -1) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("waitForSyncDdlVersion() attempt: " + numAttempts);
+ }
+ // Examine the topic update log to determine the latest topic update that
+ // covers the added/modified/deleted objects in 'result'.
+ long topicVersionForUpdates =
+ getCoveringTopicUpdateVersion(result.getUpdated_catalog_objects());
+ long topicVersionForDeletes =
+ getCoveringTopicUpdateVersion(result.getRemoved_catalog_objects());
+ if (topicVersionForUpdates == -1 || topicVersionForDeletes == -1) {
+ // Wait for the next topic update.
+ synchronized(topicUpdateLog_) {
+ try {
+ topicUpdateLog_.wait(TOPIC_UPDATE_WAIT_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ long currentTopicUpdate = lastSentTopicUpdate_.get();
+ // Don't count time-based exits from the wait() toward the maxNumAttempts
+ // threshold.
+ if (lastSentTopicUpdate != currentTopicUpdate) {
+ ++numAttempts;
+ if (numAttempts > maxNumAttempts) {
+ throw new CatalogException("Couldn't retrieve the catalog topic version " +
+ "for the SYNC_DDL operation after " + maxNumAttempts + " attempts." +
+ "The operation has been successfully executed but its effects may have " +
+ "not been broadcast to all the coordinators.");
+ }
+ lastSentTopicUpdate = currentTopicUpdate;
+ }
+ } else {
+ versionToWaitFor = Math.max(topicVersionForDeletes, topicVersionForUpdates);
+ }
+ }
+ Preconditions.checkState(versionToWaitFor >= 0);
+ LOG.info("Operation using SYNC_DDL is waiting for catalog topic version: " +
+ versionToWaitFor + ". Time to identify topic version (msec): " +
+ (System.currentTimeMillis() - begin));
+ return versionToWaitFor;
+ }
+
+ /**
+ * Returns the version of the topic update that covers a set of TCatalogObjects.
+ * A topic update U covers a TCatalogObject T, corresponding to a catalog object O,
+ * if last_sent_version(O) >= catalog_version(T) && catalog_version(U) >=
+ * last_topic_update(O). The first condition indicates that a version of O that is
+ * larger or equal to the version in T has been added to a topic update. The second
+ * condition indicates that U is either the update to include O or an update following
+ * the one to include O. Returns -1 if there is a catalog object in 'tCatalogObjects'
+ * which doesn't satisfy the above conditions.
+ */
+ private long getCoveringTopicUpdateVersion(List<TCatalogObject> tCatalogObjects) {
+ if (tCatalogObjects == null || tCatalogObjects.isEmpty()) {
+ return lastSentTopicUpdate_.get();
+ }
+ long versionToWaitFor = -1;
+ for (TCatalogObject tCatalogObject: tCatalogObjects) {
+ TopicUpdateLog.Entry topicUpdateEntry =
+ topicUpdateLog_.get(Catalog.toCatalogObjectKey(tCatalogObject));
+ // There are two reasons for which a topic update log entry cannot be found:
+ // a) It corresponds to a new catalog object that hasn't been processed by a catalog
+ // update yet.
+ // b) It corresponds to a catalog object that hasn't been modified for at least
+ // TOPIC_UPDATE_LOG_GC_FREQUENCY updates and hence its entry was garbage
+ // collected.
+ // In both cases, -1 is returned to indicate that we're waiting for the
+ // entry to show up in the topic update log.
+ if (topicUpdateEntry == null ||
+ topicUpdateEntry.getLastSentVersion() < tCatalogObject.getCatalog_version()) {
+ return -1;
+ }
+ versionToWaitFor =
+ Math.max(versionToWaitFor, topicUpdateEntry.getLastSentCatalogUpdate());
+ }
+ return versionToWaitFor;
+ }
}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/DataSource.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSource.java b/fe/src/main/java/org/apache/impala/catalog/DataSource.java
index e9601d7..f59f3be 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSource.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSource.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog;
import org.apache.hadoop.fs.Path;
+import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TDataSource;
import com.google.common.base.Objects;
@@ -27,13 +28,12 @@ import com.google.common.base.Objects;
* Represents a data source in the catalog. Contains the data source name and all
* information needed to locate and load the data source.
*/
-public class DataSource implements CatalogObject {
+public class DataSource extends CatalogObjectImpl {
private final String dataSrcName_;
private final String className_;
private final String apiVersionString_;
// Qualified path to the data source.
private final String location_;
- private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
public DataSource(String dataSrcName, String location, String className,
String apiVersionString) {
@@ -54,16 +54,9 @@ public class DataSource implements CatalogObject {
}
@Override
- public long getCatalogVersion() { return catalogVersion_; }
-
- @Override
- public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
- @Override
public String getName() { return dataSrcName_; }
-
@Override
- public boolean isLoaded() { return true; }
+ public String getUniqueName() { return "DATA_SOURCE:" + dataSrcName_.toLowerCase(); }
public String getLocation() { return location_; }
public String getClassName() { return className_; }
@@ -85,4 +78,11 @@ public class DataSource implements CatalogObject {
public static String debugString(TDataSource thrift) {
return fromThrift(thrift).debugString();
}
+
+ public TCatalogObject toTCatalogObject() {
+ TCatalogObject catalogObj =
+ new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+ catalogObj.setData_source(toThrift());
+ return catalogObj;
+ }
}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 074ff92..f1c9c8e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -34,6 +34,7 @@ import org.apache.impala.catalog.Function;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.JniUtil;
+import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TFunction;
@@ -59,11 +60,10 @@ import com.google.common.collect.Maps;
* value is the base64 representation of the thrift serialized function object.
*
*/
-public class Db implements CatalogObject {
+public class Db extends CatalogObjectImpl {
private static final Logger LOG = LoggerFactory.getLogger(Db.class);
private final Catalog parentCatalog_;
private final TDatabase thriftDb_;
- private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
public static final String FUNCTION_INDEX_PREFIX = "impala_registered_function_";
@@ -134,16 +134,14 @@ public class Db implements CatalogObject {
@Override
public String getName() { return thriftDb_.getDb_name(); }
@Override
- public TCatalogObjectType getCatalogObjectType() {
- return TCatalogObjectType.DATABASE;
- }
+ public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.DATABASE; }
+ @Override
+ public String getUniqueName() { return "DATABASE:" + getName().toLowerCase(); }
/**
* Adds a table to the table cache.
*/
- public void addTable(Table table) {
- tableCache_.add(table);
- }
+ public void addTable(Table table) { tableCache_.add(table); }
/**
* Gets all table names in the table cache.
@@ -165,9 +163,7 @@ public class Db implements CatalogObject {
* Returns the Table with the given name if present in the table cache or null if the
* table does not exist in the cache.
*/
- public Table getTable(String tblName) {
- return tableCache_.get(tblName);
- }
+ public Table getTable(String tblName) { return tableCache_.get(tblName); }
/**
* Removes the table name and any cached metadata from the Table cache.
@@ -495,11 +491,10 @@ public class Db implements CatalogObject {
return result;
}
- @Override
- public long getCatalogVersion() { return catalogVersion_; }
- @Override
- public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
- @Override
- public boolean isLoaded() { return true; }
+ public TCatalogObject toTCatalogObject() {
+ TCatalogObject catalogObj =
+ new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+ catalogObj.setDb(toThrift());
+ return catalogObj;
+ }
}
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Function.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Function.java b/fe/src/main/java/org/apache/impala/catalog/Function.java
index 80316a6..03cd867 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Function.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Function.java
@@ -49,7 +49,7 @@ import com.google.common.collect.Lists;
* - Builtin functions, which are recreated after every restart of the
* catalog. (persisted, visible to Impala)
*/
-public class Function implements CatalogObject {
+public class Function extends CatalogObjectImpl {
// Enum for how to compare function signatures.
// For decimal types, the type in the function can be a wildcard, i.e. decimal(*,*).
// The wildcard can *only* exist as function type, the caller will always be a
@@ -106,7 +106,6 @@ public class Function implements CatalogObject {
// Set to true for functions that survive service restarts, including all builtins,
// native and IR functions, but only Java functions created without a signature.
private boolean isPersistent_;
- private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
public Function(FunctionName name, Type[] argTypes,
Type retType, boolean varArgs) {
@@ -298,15 +297,12 @@ public class Function implements CatalogObject {
@Override
public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.FUNCTION; }
-
- @Override
- public long getCatalogVersion() { return catalogVersion_; }
-
- @Override
- public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
@Override
public String getName() { return getFunctionName().toString(); }
+ @Override
+ public String getUniqueName() {
+ return "FUNCTION:" + name_.toString() + "(" + signatureString() + ")";
+ }
// Child classes must override this function.
public String toSql(boolean ifNotExists) { return ""; }
@@ -315,7 +311,7 @@ public class Function implements CatalogObject {
TCatalogObject result = new TCatalogObject();
result.setType(TCatalogObjectType.FUNCTION);
result.setFn(toThrift());
- result.setCatalog_version(catalogVersion_);
+ result.setCatalog_version(getCatalogVersion());
return result;
}
@@ -372,9 +368,6 @@ public class Function implements CatalogObject {
return function;
}
- @Override
- public boolean isLoaded() { return true; }
-
// Returns the resolved symbol in the binary. The BE will do a lookup of 'symbol'
// in the binary and try to resolve unmangled names.
// If this function is expecting a return argument, retArgType is that type. It should
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
index 398bc87..6f752d4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
@@ -28,8 +28,7 @@ import com.google.common.base.Preconditions;
* care about for cache pools is the cache pool name. In the future it may be desirable
* to track additional metadata such as the owner, size, and current usage of the pool.
*/
-public class HdfsCachePool implements CatalogObject {
- private long catalogVersion_;
+public class HdfsCachePool extends CatalogObjectImpl {
private final THdfsCachePool cachePool_;
public HdfsCachePool(CachePoolInfo cachePoolInfo) {
@@ -57,9 +56,5 @@ public class HdfsCachePool implements CatalogObject {
@Override
public String getName() { return cachePool_.getPool_name(); }
@Override
- public long getCatalogVersion() { return catalogVersion_; }
- @Override
- public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
- @Override
- public boolean isLoaded() { return true; }
-}
\ No newline at end of file
+ public String getUniqueName() { return "HDFS_CACHE_POOL:" + getName().toLowerCase(); }
+}