You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/13 01:41:15 UTC

[73/91] [abbrv] hive git commit: HIVE-19416 : merge master into branch (Sergey Shelukhin) 0712

http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 0000000,8d88749..a46d2f9
mode 000000,100644..100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@@ -1,0 -1,9354 +1,9422 @@@
+ /* * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.metastore;
+ 
+ import static org.apache.commons.lang.StringUtils.join;
+ import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_COMMENT;
+ import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
+ import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+ import static org.apache.hadoop.hive.metastore.Warehouse.getCatalogQualifiedTableName;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.parseDbName;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.CAT_NAME;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.DB_NAME;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependCatalogToDbName;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependNotNullCatToDbName;
+ 
+ import java.io.IOException;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.nio.ByteBuffer;
+ import java.security.PrivilegedExceptionAction;
+ import java.util.AbstractMap;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.LinkedHashMap;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Objects;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.locks.Condition;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Pattern;
+ 
+ import javax.jdo.JDOException;
+ 
+ import com.codahale.metrics.Counter;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.ImmutableListMultimap;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Multimaps;
+ 
+ import org.apache.commons.cli.OptionBuilder;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.common.StatsSetupConst;
+ import org.apache.hadoop.hive.common.TableName;
+ import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
+ import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode;
+ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
+ import org.apache.hadoop.hive.metastore.events.AddNotNullConstraintEvent;
+ import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+ import org.apache.hadoop.hive.metastore.events.AddPrimaryKeyEvent;
+ import org.apache.hadoop.hive.metastore.events.AddUniqueConstraintEvent;
+ import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
+ import org.apache.hadoop.hive.metastore.events.AlterCatalogEvent;
+ import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
+ import org.apache.hadoop.hive.metastore.events.AlterISchemaEvent;
+ import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+ import org.apache.hadoop.hive.metastore.events.AlterSchemaVersionEvent;
+ import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
+ import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
+ import org.apache.hadoop.hive.metastore.events.CreateCatalogEvent;
+ import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+ import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+ import org.apache.hadoop.hive.metastore.events.CreateISchemaEvent;
+ import org.apache.hadoop.hive.metastore.events.AddSchemaVersionEvent;
+ import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+ import org.apache.hadoop.hive.metastore.events.DropCatalogEvent;
+ import org.apache.hadoop.hive.metastore.events.DropConstraintEvent;
+ import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+ import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+ import org.apache.hadoop.hive.metastore.events.DropISchemaEvent;
+ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+ import org.apache.hadoop.hive.metastore.events.DropSchemaVersionEvent;
+ import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+ import org.apache.hadoop.hive.metastore.events.InsertEvent;
+ import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+ import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+ import org.apache.hadoop.hive.metastore.events.PreAddPartitionEvent;
+ import org.apache.hadoop.hive.metastore.events.PreAlterCatalogEvent;
+ import org.apache.hadoop.hive.metastore.events.PreAlterDatabaseEvent;
+ import org.apache.hadoop.hive.metastore.events.PreAlterISchemaEvent;
+ import org.apache.hadoop.hive.metastore.events.PreAlterPartitionEvent;
+ import org.apache.hadoop.hive.metastore.events.PreAlterSchemaVersionEvent;
+ import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent;
+ import org.apache.hadoop.hive.metastore.events.PreAuthorizationCallEvent;
+ import org.apache.hadoop.hive.metastore.events.PreCreateCatalogEvent;
+ import org.apache.hadoop.hive.metastore.events.PreCreateDatabaseEvent;
+ import org.apache.hadoop.hive.metastore.events.PreCreateISchemaEvent;
+ import org.apache.hadoop.hive.metastore.events.PreAddSchemaVersionEvent;
+ import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
+ import org.apache.hadoop.hive.metastore.events.PreDropCatalogEvent;
+ import org.apache.hadoop.hive.metastore.events.PreDropDatabaseEvent;
+ import org.apache.hadoop.hive.metastore.events.PreDropISchemaEvent;
+ import org.apache.hadoop.hive.metastore.events.PreDropPartitionEvent;
+ import org.apache.hadoop.hive.metastore.events.PreDropSchemaVersionEvent;
+ import org.apache.hadoop.hive.metastore.events.PreDropTableEvent;
+ import org.apache.hadoop.hive.metastore.events.PreEventContext;
+ import org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent;
+ import org.apache.hadoop.hive.metastore.events.PreReadCatalogEvent;
+ import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
+ import org.apache.hadoop.hive.metastore.events.PreReadISchemaEvent;
+ import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
+ import org.apache.hadoop.hive.metastore.events.PreReadhSchemaVersionEvent;
+ import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+ import org.apache.hadoop.hive.metastore.metrics.JvmPauseMonitor;
+ import org.apache.hadoop.hive.metastore.metrics.Metrics;
+ import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+ import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+ import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+ import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+ import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
+ import org.apache.hadoop.hive.metastore.security.TUGIContainingTransport;
+ import org.apache.hadoop.hive.metastore.txn.TxnStore;
+ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+ import org.apache.hadoop.security.SecurityUtil;
+ import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
+ import org.apache.hadoop.hive.metastore.utils.FileUtils;
+ import org.apache.hadoop.hive.metastore.utils.HdfsUtils;
+ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+ import org.apache.hadoop.hive.metastore.utils.LogUtils;
+ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+ import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo;
+ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.util.ReflectionUtils;
+ import org.apache.hadoop.util.ShutdownHookManager;
+ import org.apache.hadoop.util.StringUtils;
+ import org.apache.logging.log4j.LogManager;
+ import org.apache.logging.log4j.core.LoggerContext;
+ import org.apache.thrift.TException;
+ import org.apache.thrift.TProcessor;
+ import org.apache.thrift.protocol.TBinaryProtocol;
+ import org.apache.thrift.protocol.TCompactProtocol;
+ import org.apache.thrift.protocol.TProtocol;
+ import org.apache.thrift.protocol.TProtocolFactory;
+ import org.apache.thrift.server.ServerContext;
+ import org.apache.thrift.server.TServer;
+ import org.apache.thrift.server.TServerEventHandler;
+ import org.apache.thrift.server.TThreadPoolServer;
+ import org.apache.thrift.transport.TFramedTransport;
+ import org.apache.thrift.transport.TServerSocket;
+ import org.apache.thrift.transport.TTransport;
+ import org.apache.thrift.transport.TTransportFactory;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.facebook.fb303.FacebookBase;
+ import com.facebook.fb303.fb_status;
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import com.google.common.base.Splitter;
+ import com.google.common.util.concurrent.ThreadFactoryBuilder;
+ 
+ /**
+  * TODO:pc remove application logic to a separate interface.
+  */
+ public class HiveMetaStore extends ThriftHiveMetastore {
+   public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStore.class);
+   public static final String PARTITION_NUMBER_EXCEED_LIMIT_MSG =
+       "Number of partitions scanned (=%d) on table '%s' exceeds limit (=%d). This is controlled on the metastore server by %s.";
+ 
+   // boolean that tells if the HiveMetaStore (remote) server is being used.
+   // Can be used to determine if the calls to metastore api (HMSHandler) are being made with
+   // embedded metastore or a remote one
+   private static boolean isMetaStoreRemote = false;
+ 
+   // Used for testing to simulate method timeout.
+   @VisibleForTesting
+   static boolean TEST_TIMEOUT_ENABLED = false;
+   @VisibleForTesting
+   static long TEST_TIMEOUT_VALUE = -1;
+ 
+   private static ShutdownHookManager shutdownHookMgr;
+ 
+   public static final String ADMIN = "admin";
+   public static final String PUBLIC = "public";
+   /** MM write states. */
+   public static final char MM_WRITE_OPEN = 'o', MM_WRITE_COMMITTED = 'c', MM_WRITE_ABORTED = 'a';
+ 
+   private static HadoopThriftAuthBridge.Server saslServer;
+   private static MetastoreDelegationTokenManager delegationTokenManager;
+   private static boolean useSasl;
+ 
+   static final String NO_FILTER_STRING = "";
+   static final int UNLIMITED_MAX_PARTITIONS = -1;
+ 
+   private static final class ChainedTTransportFactory extends TTransportFactory {
+     private final TTransportFactory parentTransFactory;
+     private final TTransportFactory childTransFactory;
+ 
+     private ChainedTTransportFactory(
+         TTransportFactory parentTransFactory,
+         TTransportFactory childTransFactory) {
+       this.parentTransFactory = parentTransFactory;
+       this.childTransFactory = childTransFactory;
+     }
+ 
+     @Override
+     public TTransport getTransport(TTransport trans) {
+       return childTransFactory.getTransport(parentTransFactory.getTransport(trans));
+     }
+   }
+ 
+   public static boolean isRenameAllowed(Database srcDB, Database destDB) {
+     if (!srcDB.getName().equalsIgnoreCase(destDB.getName())) {
+       if (ReplChangeManager.isSourceOfReplication(srcDB) || ReplChangeManager.isSourceOfReplication(destDB)) {
+         return false;
+       }
+     }
+     return true;
+   }
+ 
+   public static class HMSHandler extends FacebookBase implements IHMSHandler {
+     public static final Logger LOG = HiveMetaStore.LOG;
+     private final Configuration conf; // stores datastore (jpox) properties,
+                                      // right now they come from jpox.properties
+ 
+     // Flag to control that always threads are initialized only once
+     // instead of multiple times
+     private final static AtomicBoolean alwaysThreadsInitialized =
+         new AtomicBoolean(false);
+ 
+     private static String currentUrl;
+     private FileMetadataManager fileMetadataManager;
+     private PartitionExpressionProxy expressionProxy;
+     private StorageSchemaReader storageSchemaReader;
+ 
+     // Variables for metrics
+     // Package visible so that HMSMetricsListener can see them.
+     static AtomicInteger databaseCount, tableCount, partCount;
+ 
+     private Warehouse wh; // hdfs warehouse
+     private static final ThreadLocal<RawStore> threadLocalMS =
+         new ThreadLocal<RawStore>() {
+           @Override
+           protected RawStore initialValue() {
+             return null;
+           }
+         };
+ 
+     private static final ThreadLocal<TxnStore> threadLocalTxn = new ThreadLocal<TxnStore>() {
+       @Override
+       protected TxnStore initialValue() {
+         return null;
+       }
+     };
+ 
+     private static final ThreadLocal<Map<String, com.codahale.metrics.Timer.Context>> timerContexts =
+         new ThreadLocal<Map<String, com.codahale.metrics.Timer.Context>>() {
+       @Override
+       protected Map<String, com.codahale.metrics.Timer.Context> initialValue() {
+         return new HashMap<>();
+       }
+     };
+ 
+     public static RawStore getRawStore() {
+       return threadLocalMS.get();
+     }
+ 
+     static void removeRawStore() {
+       threadLocalMS.remove();
+     }
+ 
+     // Thread local configuration is needed as many threads could make changes
+     // to the conf using the connection hook
+     private static final ThreadLocal<Configuration> threadLocalConf =
+         new ThreadLocal<Configuration>() {
+           @Override
+           protected Configuration initialValue() {
+             return null;
+           }
+         };
+ 
+     /**
+      * Thread local HMSHandler used during shutdown to notify meta listeners
+      */
+     private static final ThreadLocal<HMSHandler> threadLocalHMSHandler = new ThreadLocal<>();
+ 
+     /**
+      * Thread local Map to keep track of modified meta conf keys
+      */
+     private static final ThreadLocal<Map<String, String>> threadLocalModifiedConfig =
+         new ThreadLocal<>();
+ 
+     private static ExecutorService threadPool;
+ 
+     static final Logger auditLog = LoggerFactory.getLogger(
+         HiveMetaStore.class.getName() + ".audit");
+ 
+     private static void logAuditEvent(String cmd) {
+       if (cmd == null) {
+         return;
+       }
+ 
+       UserGroupInformation ugi;
+       try {
+         ugi = SecurityUtils.getUGI();
+       } catch (Exception ex) {
+         throw new RuntimeException(ex);
+       }
+ 
+       String address = getIPAddress();
+       if (address == null) {
+         address = "unknown-ip-addr";
+       }
+ 
+       auditLog.info("ugi={}	ip={}	cmd={}	", ugi.getUserName(), address, cmd);
+     }
+ 
+     private static String getIPAddress() {
+       if (useSasl) {
+         if (saslServer != null && saslServer.getRemoteAddress() != null) {
+           return saslServer.getRemoteAddress().getHostAddress();
+         }
+       } else {
+         // if kerberos is not enabled
+         return getThreadLocalIpAddress();
+       }
+       return null;
+     }
+ 
+     private static AtomicInteger nextSerialNum = new AtomicInteger();
+     private static ThreadLocal<Integer> threadLocalId = new ThreadLocal<Integer>() {
+       @Override
+       protected Integer initialValue() {
+         return nextSerialNum.getAndIncrement();
+       }
+     };
+ 
+     // This will only be set if the metastore is being accessed from a metastore Thrift server,
+     // not if it is from the CLI. Also, only if the TTransport being used to connect is an
+     // instance of TSocket. This is also not set when kerberos is used.
+     private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
+       @Override
+       protected String initialValue() {
+         return null;
+       }
+     };
+ 
+     /**
+      * Internal function to notify listeners for meta config change events
+      */
+     private void notifyMetaListeners(String key, String oldValue, String newValue) throws MetaException {
+       for (MetaStoreEventListener listener : listeners) {
+         listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, newValue));
+       }
+ 
+       if (transactionalListeners.size() > 0) {
+         // All the fields of this event are final, so no reason to create a new one for each
+         // listener
+         ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, newValue);
+         for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+           transactionalListener.onConfigChange(cce);
+         }
+       }
+     }
+ 
+     /**
+      * Internal function to notify listeners to revert back to old values of keys
+      * that were modified during setMetaConf. This would get called from HiveMetaStore#cleanupRawStore
+      */
+     private void notifyMetaListenersOnShutDown() {
+       Map<String, String> modifiedConf = threadLocalModifiedConfig.get();
+       if (modifiedConf == null) {
+         // Nothing got modified
+         return;
+       }
+       try {
+         Configuration conf = threadLocalConf.get();
+         if (conf == null) {
+           throw new MetaException("Unexpected: modifiedConf is non-null but conf is null");
+         }
+         // Notify listeners of the changed value
+         for (Entry<String, String> entry : modifiedConf.entrySet()) {
+           String key = entry.getKey();
+           // curr value becomes old and vice-versa
+           String currVal = entry.getValue();
+           String oldVal = conf.get(key);
+           if (!Objects.equals(oldVal, currVal)) {
+             notifyMetaListeners(key, oldVal, currVal);
+           }
+         }
+         logInfo("Meta listeners shutdown notification completed.");
+       } catch (MetaException e) {
+         LOG.error("Failed to notify meta listeners on shutdown: ", e);
+       }
+     }
+ 
+     static void setThreadLocalIpAddress(String ipAddress) {
+       threadLocalIpAddress.set(ipAddress);
+     }
+ 
+     // This will return null if the metastore is not being accessed from a metastore Thrift server,
+     // or if the TTransport being used to connect is not an instance of TSocket, or if kereberos
+     // is used
+     static String getThreadLocalIpAddress() {
+       return threadLocalIpAddress.get();
+     }
+ 
+     // Make it possible for tests to check that the right type of PartitionExpressionProxy was
+     // instantiated.
+     @VisibleForTesting
+     PartitionExpressionProxy getExpressionProxy() {
+       return expressionProxy;
+     }
+ 
+     /**
+      * Use {@link #getThreadId()} instead.
+      * @return thread id
+      */
+     @Deprecated
+     public static Integer get() {
+       return threadLocalId.get();
+     }
+ 
+     @Override
+     public int getThreadId() {
+       return threadLocalId.get();
+     }
+ 
+     public HMSHandler(String name) throws MetaException {
+       this(name, MetastoreConf.newMetastoreConf(), true);
+     }
+ 
+     public HMSHandler(String name, Configuration conf) throws MetaException {
+       this(name, conf, true);
+     }
+ 
+     public HMSHandler(String name, Configuration conf, boolean init) throws MetaException {
+       super(name);
+       this.conf = conf;
+       isInTest = MetastoreConf.getBoolVar(this.conf, ConfVars.HIVE_IN_TEST);
+       if (threadPool == null) {
+         synchronized (HMSHandler.class) {
+           int numThreads = MetastoreConf.getIntVar(conf, ConfVars.FS_HANDLER_THREADS_COUNT);
+           threadPool = Executors.newFixedThreadPool(numThreads,
+               new ThreadFactoryBuilder().setDaemon(true)
+                   .setNameFormat("HMSHandler #%d").build());
+         }
+       }
+       if (init) {
+         init();
+       }
+     }
+ 
+     /**
+      * Use {@link #getConf()} instead.
+      * @return Configuration object
+      */
+     @Deprecated
+     public Configuration getHiveConf() {
+       return conf;
+     }
+ 
+     private ClassLoader classLoader;
+     private AlterHandler alterHandler;
+     private List<MetaStorePreEventListener> preListeners;
+     private List<MetaStoreEventListener> listeners;
+     private List<TransactionalMetaStoreEventListener> transactionalListeners;
+     private List<MetaStoreEndFunctionListener> endFunctionListeners;
+     private List<MetaStoreInitListener> initListeners;
+     private Pattern partitionValidationPattern;
+     private final boolean isInTest;
+ 
+     {
+       classLoader = Thread.currentThread().getContextClassLoader();
+       if (classLoader == null) {
+         classLoader = Configuration.class.getClassLoader();
+       }
+     }
+ 
+     @Override
+     public List<TransactionalMetaStoreEventListener> getTransactionalListeners() {
+       return transactionalListeners;
+     }
+ 
+     @Override
+     public List<MetaStoreEventListener> getListeners() {
+       return listeners;
+     }
+ 
+     @Override
+     public void init() throws MetaException {
+       initListeners = MetaStoreUtils.getMetaStoreListeners(
+           MetaStoreInitListener.class, conf, MetastoreConf.getVar(conf, ConfVars.INIT_HOOKS));
+       for (MetaStoreInitListener singleInitListener: initListeners) {
+           MetaStoreInitContext context = new MetaStoreInitContext();
+           singleInitListener.onInit(context);
+       }
+ 
+       String alterHandlerName = MetastoreConf.getVar(conf, ConfVars.ALTER_HANDLER);
+       alterHandler = ReflectionUtils.newInstance(JavaUtils.getClass(
+           alterHandlerName, AlterHandler.class), conf);
+       wh = new Warehouse(conf);
+ 
+       synchronized (HMSHandler.class) {
+         if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(conf))) {
+           createDefaultDB();
+           createDefaultRoles();
+           addAdminUsers();
+           currentUrl = MetaStoreInit.getConnectionURL(conf);
+         }
+       }
+ 
+       //Start Metrics
+       if (MetastoreConf.getBoolVar(conf, ConfVars.METRICS_ENABLED)) {
+         LOG.info("Begin calculating metadata count metrics.");
+         Metrics.initialize(conf);
+         databaseCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_DATABASES);
+         tableCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_TABLES);
+         partCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_PARTITIONS);
+         updateMetrics();
+ 
+       }
+ 
+       preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
+           conf, MetastoreConf.getVar(conf, ConfVars.PRE_EVENT_LISTENERS));
+       preListeners.add(0, new TransactionalValidationListener(conf));
+       listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, conf,
+           MetastoreConf.getVar(conf, ConfVars.EVENT_LISTENERS));
+       listeners.add(new SessionPropertiesListener(conf));
+       listeners.add(new AcidEventListener(conf));
+       transactionalListeners = MetaStoreUtils.getMetaStoreListeners(TransactionalMetaStoreEventListener.class,
+           conf, MetastoreConf.getVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS));
+       if (Metrics.getRegistry() != null) {
+         listeners.add(new HMSMetricsListener(conf));
+       }
+ 
+       endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
+           MetaStoreEndFunctionListener.class, conf, MetastoreConf.getVar(conf, ConfVars.END_FUNCTION_LISTENERS));
+ 
+       String partitionValidationRegex =
+           MetastoreConf.getVar(conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN);
+       if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) {
+         partitionValidationPattern = Pattern.compile(partitionValidationRegex);
+       } else {
+         partitionValidationPattern = null;
+       }
+ 
+       // We only initialize once the tasks that need to be run periodically
+       if (alwaysThreadsInitialized.compareAndSet(false, true)) {
+         ThreadPool.initialize(conf);
+         Collection<String> taskNames =
+             MetastoreConf.getStringCollection(conf, ConfVars.TASK_THREADS_ALWAYS);
+         for (String taskName : taskNames) {
+           MetastoreTaskThread task =
+               JavaUtils.newInstance(JavaUtils.getClass(taskName, MetastoreTaskThread.class));
+           task.setConf(conf);
+           long freq = task.runFrequency(TimeUnit.MILLISECONDS);
+           // For backwards compatibility, since some threads used to be hard coded but only run if
+           // frequency was > 0
+           if (freq > 0) {
+             ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
+           }
+         }
+       }
+       expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
+       fileMetadataManager = new FileMetadataManager(this.getMS(), conf);
+     }
+ 
+     private static String addPrefix(String s) {
+       return threadLocalId.get() + ": " + s;
+     }
+ 
+     /**
+      * Set copy of invoking HMSHandler on thread local
+      */
+     private static void setHMSHandler(HMSHandler handler) {
+       if (threadLocalHMSHandler.get() == null) {
+         threadLocalHMSHandler.set(handler);
+       }
+     }
+     @Override
+     public void setConf(Configuration conf) {
+       threadLocalConf.set(conf);
+       RawStore ms = threadLocalMS.get();
+       if (ms != null) {
+         ms.setConf(conf); // reload if DS related configuration is changed
+       }
+     }
+ 
+     @Override
+     public Configuration getConf() {
+       Configuration conf = threadLocalConf.get();
+       if (conf == null) {
+         conf = new Configuration(this.conf);
+         threadLocalConf.set(conf);
+       }
+       return conf;
+     }
+ 
+     private Map<String, String> getModifiedConf() {
+       Map<String, String> modifiedConf = threadLocalModifiedConfig.get();
+       if (modifiedConf == null) {
+         modifiedConf = new HashMap<>();
+         threadLocalModifiedConfig.set(modifiedConf);
+       }
+       return modifiedConf;
+     }
+ 
+     @Override
+     public Warehouse getWh() {
+       return wh;
+     }
+ 
+     @Override
+     public void setMetaConf(String key, String value) throws MetaException {
+       ConfVars confVar = MetastoreConf.getMetaConf(key);
+       if (confVar == null) {
+         throw new MetaException("Invalid configuration key " + key);
+       }
+       try {
+         confVar.validate(value);
+       } catch (IllegalArgumentException e) {
+         throw new MetaException("Invalid configuration value " + value + " for key " + key +
+             " by " + e.getMessage());
+       }
+       Configuration configuration = getConf();
+       String oldValue = MetastoreConf.get(configuration, key);
+       // Save prev val of the key on threadLocal
+       Map<String, String> modifiedConf = getModifiedConf();
+       if (!modifiedConf.containsKey(key)) {
+         modifiedConf.put(key, oldValue);
+       }
+       // Set invoking HMSHandler on threadLocal, this will be used later to notify
+       // metaListeners in HiveMetaStore#cleanupRawStore
+       setHMSHandler(this);
+       configuration.set(key, value);
+       notifyMetaListeners(key, oldValue, value);
+ 
+       if (ConfVars.TRY_DIRECT_SQL == confVar) {
+         HMSHandler.LOG.info("Direct SQL optimization = {}",  value);
+       }
+     }
+ 
+     @Override
+     public String getMetaConf(String key) throws MetaException {
+       ConfVars confVar = MetastoreConf.getMetaConf(key);
+       if (confVar == null) {
+         throw new MetaException("Invalid configuration key " + key);
+       }
+       return getConf().get(key, confVar.getDefaultVal().toString());
+     }
+ 
+     /**
+      * Get a cached RawStore.
+      *
+      * @return the cached RawStore
+      * @throws MetaException
+      */
+     @Override
+     public RawStore getMS() throws MetaException {
+       Configuration conf = getConf();
+       return getMSForConf(conf);
+     }
+ 
+     public static RawStore getMSForConf(Configuration conf) throws MetaException {
+       RawStore ms = threadLocalMS.get();
+       if (ms == null) {
+         ms = newRawStoreForConf(conf);
+         ms.verifySchema();
+         threadLocalMS.set(ms);
+         ms = threadLocalMS.get();
+       }
+       return ms;
+     }
+ 
+     @Override
+     public TxnStore getTxnHandler() {
++      return getMsThreadTxnHandler(conf);
++    }
++
++    public static TxnStore getMsThreadTxnHandler(Configuration conf) {
+       TxnStore txn = threadLocalTxn.get();
+       if (txn == null) {
+         txn = TxnUtils.getTxnStore(conf);
+         threadLocalTxn.set(txn);
+       }
+       return txn;
+     }
+ 
+     static RawStore newRawStoreForConf(Configuration conf) throws MetaException {
+       Configuration newConf = new Configuration(conf);
+       String rawStoreClassName = MetastoreConf.getVar(newConf, ConfVars.RAW_STORE_IMPL);
+       LOG.info(addPrefix("Opening raw store with implementation class:" + rawStoreClassName));
+       return RawStoreProxy.getProxy(newConf, conf, rawStoreClassName, threadLocalId.get());
+     }
+ 
+     @VisibleForTesting
+     public static void createDefaultCatalog(RawStore ms, Warehouse wh) throws MetaException,
+         InvalidOperationException {
+       try {
+         Catalog defaultCat = ms.getCatalog(DEFAULT_CATALOG_NAME);
+         // Null check because in some test cases we get a null from ms.getCatalog.
+         if (defaultCat !=null && defaultCat.getLocationUri().equals("TBD")) {
+           // One time update issue.  When the new 'hive' catalog is created in an upgrade the
+           // script does not know the location of the warehouse.  So we need to update it.
+           LOG.info("Setting location of default catalog, as it hasn't been done after upgrade");
+           defaultCat.setLocationUri(wh.getWhRoot().toString());
+           ms.alterCatalog(defaultCat.getName(), defaultCat);
+         }
+ 
+       } catch (NoSuchObjectException e) {
+         Catalog cat = new Catalog(DEFAULT_CATALOG_NAME, wh.getWhRoot().toString());
+         cat.setDescription(Warehouse.DEFAULT_CATALOG_COMMENT);
+         ms.createCatalog(cat);
+       }
+     }
+ 
+     private void createDefaultDB_core(RawStore ms) throws MetaException, InvalidObjectException {
+       try {
+         ms.getDatabase(DEFAULT_CATALOG_NAME, DEFAULT_DATABASE_NAME);
+       } catch (NoSuchObjectException e) {
+         Database db = new Database(DEFAULT_DATABASE_NAME, DEFAULT_DATABASE_COMMENT,
+           wh.getDefaultDatabasePath(DEFAULT_DATABASE_NAME).toString(), null);
+         db.setOwnerName(PUBLIC);
+         db.setOwnerType(PrincipalType.ROLE);
+         db.setCatalogName(DEFAULT_CATALOG_NAME);
+         ms.createDatabase(db);
+       }
+     }
+ 
+     /**
+      * create default database if it doesn't exist.
+      *
+      * This is a potential contention when HiveServer2 using embedded metastore and Metastore
+      * Server try to concurrently invoke createDefaultDB. If one failed, JDOException was caught
+      * for one more time try, if failed again, simply ignored by warning, which meant another
+      * succeeds.
+      *
+      * @throws MetaException
+      */
+     private void createDefaultDB() throws MetaException {
+       try {
+         RawStore ms = getMS();
+         createDefaultCatalog(ms, wh);
+         createDefaultDB_core(ms);
+       } catch (JDOException e) {
+         LOG.warn("Retrying creating default database after error: " + e.getMessage(), e);
+         try {
+           createDefaultDB_core(getMS());
+         } catch (InvalidObjectException e1) {
+           throw new MetaException(e1.getMessage());
+         }
+       } catch (InvalidObjectException|InvalidOperationException e) {
+         throw new MetaException(e.getMessage());
+       }
+     }
+ 
+     /**
+      * create default roles if they don't exist.
+      *
+      * This is a potential contention when HiveServer2 using embedded metastore and Metastore
+      * Server try to concurrently invoke createDefaultRoles. If one failed, JDOException was caught
+      * for one more time try, if failed again, simply ignored by warning, which meant another
+      * succeeds.
+      *
+      * @throws MetaException
+      */
+     private void createDefaultRoles() throws MetaException {
+       try {
+         createDefaultRoles_core();
+       } catch (JDOException e) {
+         LOG.warn("Retrying creating default roles after error: " + e.getMessage(), e);
+         createDefaultRoles_core();
+       }
+     }
+ 
+     private void createDefaultRoles_core() throws MetaException {
+ 
+       RawStore ms = getMS();
+       try {
+         ms.addRole(ADMIN, ADMIN);
+       } catch (InvalidObjectException e) {
+         LOG.debug(ADMIN +" role already exists",e);
+       } catch (NoSuchObjectException e) {
+         // This should never be thrown.
+         LOG.warn("Unexpected exception while adding " +ADMIN+" roles" , e);
+       }
+       LOG.info("Added "+ ADMIN+ " role in metastore");
+       try {
+         ms.addRole(PUBLIC, PUBLIC);
+       } catch (InvalidObjectException e) {
+         LOG.debug(PUBLIC + " role already exists",e);
+       } catch (NoSuchObjectException e) {
+         // This should never be thrown.
+         LOG.warn("Unexpected exception while adding "+PUBLIC +" roles" , e);
+       }
+       LOG.info("Added "+PUBLIC+ " role in metastore");
+       // now grant all privs to admin
+       PrivilegeBag privs = new PrivilegeBag();
+       privs.addToPrivileges(new HiveObjectPrivilege( new HiveObjectRef(HiveObjectType.GLOBAL, null,
+         null, null, null), ADMIN, PrincipalType.ROLE, new PrivilegeGrantInfo("All", 0, ADMIN,
+           PrincipalType.ROLE, true), "SQL"));
+       try {
+         ms.grantPrivileges(privs);
+       } catch (InvalidObjectException e) {
+         // Surprisingly these privs are already granted.
+         LOG.debug("Failed while granting global privs to admin", e);
+       } catch (NoSuchObjectException e) {
+         // Unlikely to be thrown.
+         LOG.warn("Failed while granting global privs to admin", e);
+       }
+     }
+ 
+     /**
+      * add admin users if they don't exist.
+      *
+      * This is a potential contention when HiveServer2 using embedded metastore and Metastore
+      * Server try to concurrently invoke addAdminUsers. If one failed, JDOException was caught for
+      * one more time try, if failed again, simply ignored by warning, which meant another succeeds.
+      *
+      * @throws MetaException
+      */
+     private void addAdminUsers() throws MetaException {
+       try {
+         addAdminUsers_core();
+       } catch (JDOException e) {
+         LOG.warn("Retrying adding admin users after error: " + e.getMessage(), e);
+         addAdminUsers_core();
+       }
+     }
+ 
+     private void addAdminUsers_core() throws MetaException {
+ 
+       // now add pre-configured users to admin role
+       String userStr = MetastoreConf.getVar(conf,ConfVars.USERS_IN_ADMIN_ROLE,"").trim();
+       if (userStr.isEmpty()) {
+         LOG.info("No user is added in admin role, since config is empty");
+         return;
+       }
+       // Since user names need to be valid unix user names, per IEEE Std 1003.1-2001 they cannot
+       // contain comma, so we can safely split above string on comma.
+ 
+      Iterator<String> users = Splitter.on(",").trimResults().omitEmptyStrings().split(userStr).iterator();
+       if (!users.hasNext()) {
+         LOG.info("No user is added in admin role, since config value "+ userStr +
+           " is in incorrect format. We accept comma separated list of users.");
+         return;
+       }
+       Role adminRole;
+       RawStore ms = getMS();
+       try {
+         adminRole = ms.getRole(ADMIN);
+       } catch (NoSuchObjectException e) {
+         LOG.error("Failed to retrieve just added admin role",e);
+         return;
+       }
+       while (users.hasNext()) {
+         String userName = users.next();
+         try {
+           ms.grantRole(adminRole, userName, PrincipalType.USER, ADMIN, PrincipalType.ROLE, true);
+           LOG.info("Added " + userName + " to admin role");
+         } catch (NoSuchObjectException e) {
+           LOG.error("Failed to add "+ userName + " in admin role",e);
+         } catch (InvalidObjectException e) {
+           LOG.debug(userName + " already in admin role", e);
+         }
+       }
+     }
+ 
+     private static void logInfo(String m) {
+       LOG.info(threadLocalId.get().toString() + ": " + m);
+       logAuditEvent(m);
+     }
+ 
+     private String startFunction(String function, String extraLogInfo) {
+       incrementCounter(function);
+       logInfo((getThreadLocalIpAddress() == null ? "" : "source:" + getThreadLocalIpAddress() + " ") +
+           function + extraLogInfo);
+       com.codahale.metrics.Timer timer =
+           Metrics.getOrCreateTimer(MetricsConstants.API_PREFIX + function);
+       if (timer != null) {
+         // Timer will be null we aren't using the metrics
+         timerContexts.get().put(function, timer.time());
+       }
+       Counter counter = Metrics.getOrCreateCounter(MetricsConstants.ACTIVE_CALLS + function);
+       if (counter != null) {
+         counter.inc();
+       }
+       return function;
+     }
+ 
+     private String startFunction(String function) {
+       return startFunction(function, "");
+     }
+ 
+     private void startTableFunction(String function, String catName, String db, String tbl) {
+       startFunction(function, " : tbl=" +
+           TableName.getQualified(catName, db, tbl));
+     }
+ 
+     private void startMultiTableFunction(String function, String db, List<String> tbls) {
+       String tableNames = join(tbls, ",");
+       startFunction(function, " : db=" + db + " tbls=" + tableNames);
+     }
+ 
+     private void startPartitionFunction(String function, String cat, String db, String tbl,
+                                         List<String> partVals) {
+       startFunction(function, " : tbl=" +
+           TableName.getQualified(cat, db, tbl) + "[" + join(partVals, ",") + "]");
+     }
+ 
+     private void startPartitionFunction(String function, String catName, String db, String tbl,
+                                         Map<String, String> partName) {
+       startFunction(function, " : tbl=" +
+           TableName.getQualified(catName, db, tbl) + "partition=" + partName);
+     }
+ 
+     private void endFunction(String function, boolean successful, Exception e) {
+       endFunction(function, successful, e, null);
+     }
+     private void endFunction(String function, boolean successful, Exception e,
+                             String inputTableName) {
+       endFunction(function, new MetaStoreEndFunctionContext(successful, e, inputTableName));
+     }
+ 
+     private void endFunction(String function, MetaStoreEndFunctionContext context) {
+       com.codahale.metrics.Timer.Context timerContext = timerContexts.get().remove(function);
+       if (timerContext != null) {
+         timerContext.close();
+       }
+       Counter counter = Metrics.getOrCreateCounter(MetricsConstants.ACTIVE_CALLS + function);
+       if (counter != null) {
+         counter.dec();
+       }
+ 
+       for (MetaStoreEndFunctionListener listener : endFunctionListeners) {
+         listener.onEndFunction(function, context);
+       }
+     }
+ 
+     @Override
+     public fb_status getStatus() {
+       return fb_status.ALIVE;
+     }
+ 
+     @Override
+     public void shutdown() {
+       cleanupRawStore();
+       PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics();
+     }
+ 
+     @Override
+     public AbstractMap<String, Long> getCounters() {
+       AbstractMap<String, Long> counters = super.getCounters();
+ 
+       // Allow endFunctionListeners to add any counters they have collected
+       if (endFunctionListeners != null) {
+         for (MetaStoreEndFunctionListener listener : endFunctionListeners) {
+           listener.exportCounters(counters);
+         }
+       }
+ 
+       return counters;
+     }
+ 
+     @Override
+     public void create_catalog(CreateCatalogRequest rqst)
+         throws AlreadyExistsException, InvalidObjectException, MetaException {
+       Catalog catalog = rqst.getCatalog();
+       startFunction("create_catalog", ": " + catalog.toString());
+       boolean success = false;
+       Exception ex = null;
+       try {
+         try {
+           getMS().getCatalog(catalog.getName());
+           throw new AlreadyExistsException("Catalog " + catalog.getName() + " already exists");
+         } catch (NoSuchObjectException e) {
+           // expected
+         }
+ 
+         if (!MetaStoreUtils.validateName(catalog.getName(), null)) {
+           throw new InvalidObjectException(catalog.getName() + " is not a valid catalog name");
+         }
+ 
+         if (catalog.getLocationUri() == null) {
+           throw new InvalidObjectException("You must specify a path for the catalog");
+         }
+ 
+         RawStore ms = getMS();
+         Path catPath = new Path(catalog.getLocationUri());
+         boolean madeDir = false;
+         Map<String, String> transactionalListenersResponses = Collections.emptyMap();
+         try {
+           firePreEvent(new PreCreateCatalogEvent(this, catalog));
+           if (!wh.isDir(catPath)) {
+             if (!wh.mkdirs(catPath)) {
+               throw new MetaException("Unable to create catalog path " + catPath +
+                   ", failed to create catalog " + catalog.getName());
+             }
+             madeDir = true;
+           }
+ 
+           ms.openTransaction();
+           ms.createCatalog(catalog);
+ 
+           // Create a default database inside the catalog
+           Database db = new Database(DEFAULT_DATABASE_NAME, "Default database for catalog " +
+                            catalog.getName(), catalog.getLocationUri(), Collections.emptyMap());
+           db.setCatalogName(catalog.getName());
+           create_database_core(ms, db);
+ 
+           if (!transactionalListeners.isEmpty()) {
+             transactionalListenersResponses =
+                 MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                     EventType.CREATE_CATALOG,
+                     new CreateCatalogEvent(true, this, catalog));
+           }
+ 
+           success = ms.commitTransaction();
+         } finally {
+           if (!success) {
+             ms.rollbackTransaction();
+             if (madeDir) {
+               wh.deleteDir(catPath, true, false, false);
+             }
+           }
+ 
+           if (!listeners.isEmpty()) {
+             MetaStoreListenerNotifier.notifyEvent(listeners,
+                 EventType.CREATE_CATALOG,
+                 new CreateCatalogEvent(success, this, catalog),
+                 null,
+                 transactionalListenersResponses, ms);
+           }
+         }
+         success = true;
+       } catch (AlreadyExistsException|InvalidObjectException|MetaException e) {
+         ex = e;
+         throw e;
+       } finally {
+         endFunction("create_catalog", success, ex);
+       }
+     }
+ 
+     @Override
+     public void alter_catalog(AlterCatalogRequest rqst) throws TException {
+       startFunction("alter_catalog " + rqst.getName());
+       boolean success = false;
+       Exception ex = null;
+       RawStore ms = getMS();
+       Map<String, String> transactionalListenersResponses = Collections.emptyMap();
+       GetCatalogResponse oldCat = null;
+ 
+       try {
+         oldCat = get_catalog(new GetCatalogRequest(rqst.getName()));
+         // Above should have thrown NoSuchObjectException if there is no such catalog
+         assert oldCat != null && oldCat.getCatalog() != null;
+         firePreEvent(new PreAlterCatalogEvent(oldCat.getCatalog(), rqst.getNewCat(), this));
+ 
+         ms.openTransaction();
+         ms.alterCatalog(rqst.getName(), rqst.getNewCat());
+ 
+         if (!transactionalListeners.isEmpty()) {
+           transactionalListenersResponses =
+               MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                   EventType.ALTER_CATALOG,
+                   new AlterCatalogEvent(oldCat.getCatalog(), rqst.getNewCat(), true, this));
+         }
+ 
+         success = ms.commitTransaction();
+       } catch (MetaException|NoSuchObjectException e) {
+         ex = e;
+         throw e;
+       } finally {
+         if (!success) {
+           ms.rollbackTransaction();
+         }
+ 
+         if ((null != oldCat) && (!listeners.isEmpty())) {
+           MetaStoreListenerNotifier.notifyEvent(listeners,
+               EventType.ALTER_CATALOG,
+               new AlterCatalogEvent(oldCat.getCatalog(), rqst.getNewCat(), success, this),
+               null, transactionalListenersResponses, ms);
+         }
+         endFunction("alter_catalog", success, ex);
+       }
+ 
+     }
+ 
+     @Override
+     public GetCatalogResponse get_catalog(GetCatalogRequest rqst)
+         throws NoSuchObjectException, TException {
+       String catName = rqst.getName();
+       startFunction("get_catalog", ": " + catName);
+       Catalog cat = null;
+       Exception ex = null;
+       try {
+         cat = getMS().getCatalog(catName);
+         firePreEvent(new PreReadCatalogEvent(this, cat));
+         return new GetCatalogResponse(cat);
+       } catch (MetaException|NoSuchObjectException e) {
+         ex = e;
+         throw e;
+       } finally {
+         endFunction("get_database", cat != null, ex);
+       }
+     }
+ 
+     @Override
+     public GetCatalogsResponse get_catalogs() throws MetaException {
+       startFunction("get_catalogs");
+ 
+       List<String> ret = null;
+       Exception ex = null;
+       try {
+         ret = getMS().getCatalogs();
+       } catch (MetaException e) {
+         ex = e;
+         throw e;
+       } finally {
+         endFunction("get_catalog", ret != null, ex);
+       }
+       return new GetCatalogsResponse(ret == null ? Collections.emptyList() : ret);
+ 
+     }
+ 
+     @Override
+     public void drop_catalog(DropCatalogRequest rqst)
+         throws NoSuchObjectException, InvalidOperationException, MetaException {
+       String catName = rqst.getName();
+       startFunction("drop_catalog", ": " + catName);
+       if (DEFAULT_CATALOG_NAME.equalsIgnoreCase(catName)) {
+         endFunction("drop_catalog", false, null);
+         throw new MetaException("Can not drop " + DEFAULT_CATALOG_NAME + " catalog");
+       }
+ 
+       boolean success = false;
+       Exception ex = null;
+       try {
+         dropCatalogCore(catName);
+         success = true;
+       } catch (NoSuchObjectException|InvalidOperationException|MetaException e) {
+         ex = e;
+         throw e;
+       } catch (Exception e) {
+         ex = e;
+         throw newMetaException(e);
+       } finally {
+         endFunction("drop_catalog", success, ex);
+       }
+ 
+     }
+ 
+     private void dropCatalogCore(String catName)
+         throws MetaException, NoSuchObjectException, InvalidOperationException {
+       boolean success = false;
+       Catalog cat = null;
+       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+       RawStore ms = getMS();
+       try {
+         ms.openTransaction();
+         cat = ms.getCatalog(catName);
+ 
+         firePreEvent(new PreDropCatalogEvent(this, cat));
+ 
+         List<String> allDbs = get_databases(prependNotNullCatToDbName(catName, null));
+         if (allDbs != null && !allDbs.isEmpty()) {
+           // It might just be the default, in which case we can drop that one if it's empty
+           if (allDbs.size() == 1 && allDbs.get(0).equals(DEFAULT_DATABASE_NAME)) {
+             try {
+               drop_database_core(ms, catName, DEFAULT_DATABASE_NAME, true, false);
+             } catch (InvalidOperationException e) {
+               // This means there are tables of something in the database
+               throw new InvalidOperationException("There are still objects in the default " +
+                   "database for catalog " + catName);
+             } catch (InvalidObjectException|IOException|InvalidInputException e) {
+               MetaException me = new MetaException("Error attempt to drop default database for " +
+                   "catalog " + catName);
+               me.initCause(e);
+               throw me;
+             }
+           } else {
+             throw new InvalidOperationException("There are non-default databases in the catalog " +
+                 catName + " so it cannot be dropped.");
+           }
+         }
+ 
+         ms.dropCatalog(catName) ;
+         if (!transactionalListeners.isEmpty()) {
+           transactionalListenerResponses =
+               MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                   EventType.DROP_CATALOG,
+                   new DropCatalogEvent(true, this, cat));
+         }
+ 
+         success = ms.commitTransaction();
+       } finally {
+         if (success) {
+           wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false, false, false);
+         } else {
+           ms.rollbackTransaction();
+         }
+ 
+         if (!listeners.isEmpty()) {
+           MetaStoreListenerNotifier.notifyEvent(listeners,
+               EventType.DROP_CATALOG,
+               new DropCatalogEvent(success, this, cat),
+               null,
+               transactionalListenerResponses, ms);
+         }
+       }
+     }
+ 
+ 
+     // Assumes that the catalog has already been set.
+     private void create_database_core(RawStore ms, final Database db)
+         throws AlreadyExistsException, InvalidObjectException, MetaException {
+       if (!MetaStoreUtils.validateName(db.getName(), null)) {
+         throw new InvalidObjectException(db.getName() + " is not a valid database name");
+       }
+ 
+       Catalog cat = null;
+       try {
+         cat = getMS().getCatalog(db.getCatalogName());
+       } catch (NoSuchObjectException e) {
+         LOG.error("No such catalog " + db.getCatalogName());
+         throw new InvalidObjectException("No such catalog " + db.getCatalogName());
+       }
+       Path dbPath = wh.determineDatabasePath(cat, db);
+       db.setLocationUri(dbPath.toString());
+ 
+       boolean success = false;
+       boolean madeDir = false;
+       Map<String, String> transactionalListenersResponses = Collections.emptyMap();
+       try {
+         firePreEvent(new PreCreateDatabaseEvent(db, this));
+         if (!wh.isDir(dbPath)) {
+           LOG.debug("Creating database path " + dbPath);
+           if (!wh.mkdirs(dbPath)) {
+             throw new MetaException("Unable to create database path " + dbPath +
+                 ", failed to create database " + db.getName());
+           }
+           madeDir = true;
+         }
+ 
+         ms.openTransaction();
+         ms.createDatabase(db);
+ 
+         if (!transactionalListeners.isEmpty()) {
+           transactionalListenersResponses =
+               MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                     EventType.CREATE_DATABASE,
+                                                     new CreateDatabaseEvent(db, true, this));
+         }
+ 
+         success = ms.commitTransaction();
+       } finally {
+         if (!success) {
+           ms.rollbackTransaction();
+           if (madeDir) {
+             wh.deleteDir(dbPath, true, db);
+           }
+         }
+ 
+         if (!listeners.isEmpty()) {
+           MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                 EventType.CREATE_DATABASE,
+                                                 new CreateDatabaseEvent(db, success, this),
+                                                 null,
+                                                 transactionalListenersResponses, ms);
+         }
+       }
+     }
+ 
+     @Override
+     public void create_database(final Database db)
+         throws AlreadyExistsException, InvalidObjectException, MetaException {
+       startFunction("create_database", ": " + db.toString());
+       boolean success = false;
+       Exception ex = null;
+       if (!db.isSetCatalogName()) {
+         db.setCatalogName(getDefaultCatalog(conf));
+       }
+       try {
+         try {
+           if (null != get_database_core(db.getCatalogName(), db.getName())) {
+             throw new AlreadyExistsException("Database " + db.getName() + " already exists");
+           }
+         } catch (NoSuchObjectException e) {
+           // expected
+         }
+ 
+         if (TEST_TIMEOUT_ENABLED) {
+           try {
+             Thread.sleep(TEST_TIMEOUT_VALUE);
+           } catch (InterruptedException e) {
+             // do nothing
+           }
+           Deadline.checkTimeout();
+         }
+         create_database_core(getMS(), db);
+         success = true;
+       } catch (MetaException | InvalidObjectException | AlreadyExistsException e) {
+         ex = e;
+         throw e;
+       } catch (Exception e) {
+         ex = e;
+         throw newMetaException(e);
+       } finally {
+         endFunction("create_database", success, ex);
+       }
+     }
+ 
+     @Override
+     public Database get_database(final String name) throws NoSuchObjectException, MetaException {
+       startFunction("get_database", ": " + name);
+       Database db = null;
+       Exception ex = null;
+       try {
+         String[] parsedDbName = parseDbName(name, conf);
+         db = get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]);
+         firePreEvent(new PreReadDatabaseEvent(db, this));
+       } catch (MetaException|NoSuchObjectException e) {
+         ex = e;
+         throw e;
+       } finally {
+         endFunction("get_database", db != null, ex);
+       }
+       return db;
+     }
+ 
+     @Override
+     public Database get_database_core(String catName, final String name) throws NoSuchObjectException, MetaException {
+       Database db = null;
+       if (name == null) {
+         throw new MetaException("Database name cannot be null.");
+       }
+       try {
+         db = getMS().getDatabase(catName, name);
+       } catch (MetaException | NoSuchObjectException e) {
+         throw e;
+       } catch (Exception e) {
+         assert (e instanceof RuntimeException);
+         throw (RuntimeException) e;
+       }
+       return db;
+     }
+ 
+     @Override
+     public void alter_database(final String dbName, final Database newDB) throws TException {
+       startFunction("alter_database " + dbName);
+       boolean success = false;
+       Exception ex = null;
+       RawStore ms = getMS();
+       Database oldDB = null;
+       Map<String, String> transactionalListenersResponses = Collections.emptyMap();
+ 
+       // Perform the same URI normalization as create_database_core.
+       if (newDB.getLocationUri() != null) {
+         newDB.setLocationUri(wh.getDnsPath(new Path(newDB.getLocationUri())).toString());
+       }
+ 
+       String[] parsedDbName = parseDbName(dbName, conf);
+ 
+       try {
+         oldDB = get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]);
+         if (oldDB == null) {
+           throw new MetaException("Could not alter database \"" + parsedDbName[DB_NAME] +
+               "\". Could not retrieve old definition.");
+         }
+         firePreEvent(new PreAlterDatabaseEvent(oldDB, newDB, this));
+ 
+         ms.openTransaction();
+         ms.alterDatabase(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], newDB);
+ 
+         if (!transactionalListeners.isEmpty()) {
+           transactionalListenersResponses =
+               MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                   EventType.ALTER_DATABASE,
+                   new AlterDatabaseEvent(oldDB, newDB, true, this));
+         }
+ 
+         success = ms.commitTransaction();
+       } catch (MetaException|NoSuchObjectException e) {
+         ex = e;
+         throw e;
+       } finally {
+         if (!success) {
+           ms.rollbackTransaction();
+         }
+ 
+         if ((null != oldDB) && (!listeners.isEmpty())) {
+           MetaStoreListenerNotifier.notifyEvent(listeners,
+               EventType.ALTER_DATABASE,
+               new AlterDatabaseEvent(oldDB, newDB, success, this),
+               null,
+               transactionalListenersResponses, ms);
+         }
+         endFunction("alter_database", success, ex);
+       }
+     }
+ 
+     private void drop_database_core(RawStore ms, String catName,
+         final String name, final boolean deleteData, final boolean cascade)
+         throws NoSuchObjectException, InvalidOperationException, MetaException,
+         IOException, InvalidObjectException, InvalidInputException {
+       boolean success = false;
+       Database db = null;
+       List<Path> tablePaths = new ArrayList<>();
+       List<Path> partitionPaths = new ArrayList<>();
+       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+       if (name == null) {
+         throw new MetaException("Database name cannot be null.");
+       }
+       try {
+         ms.openTransaction();
+         db = ms.getDatabase(catName, name);
+ 
+         if (!isInTest && ReplChangeManager.isSourceOfReplication(db)) {
+           throw new InvalidOperationException("can not drop a database which is a source of replication");
+         }
+ 
+         firePreEvent(new PreDropDatabaseEvent(db, this));
+         String catPrependedName = MetaStoreUtils.prependCatalogToDbName(catName, name, conf);
+ 
+         Set<String> uniqueTableNames = new HashSet<>(get_all_tables(catPrependedName));
+         List<String> allFunctions = get_functions(catPrependedName, "*");
+ 
+         if (!cascade) {
+           if (!uniqueTableNames.isEmpty()) {
+             throw new InvalidOperationException(
+                 "Database " + db.getName() + " is not empty. One or more tables exist.");
+           }
+           if (!allFunctions.isEmpty()) {
+             throw new InvalidOperationException(
+                 "Database " + db.getName() + " is not empty. One or more functions exist.");
+           }
+         }
+         Path path = new Path(db.getLocationUri()).getParent();
+         if (!wh.isWritable(path)) {
+           throw new MetaException("Database not dropped since " +
+               path + " is not writable by " +
+               SecurityUtils.getUser());
+         }
+ 
+         Path databasePath = wh.getDnsPath(wh.getDatabasePath(db));
+ 
+         // drop any functions before dropping db
+         for (String funcName : allFunctions) {
+           drop_function(catPrependedName, funcName);
+         }
+ 
+         final int tableBatchSize = MetastoreConf.getIntVar(conf,
+             ConfVars.BATCH_RETRIEVE_MAX);
+ 
+         // First pass will drop the materialized views
+         List<String> materializedViewNames = get_tables_by_type(name, ".*", TableType.MATERIALIZED_VIEW.toString());
+         int startIndex = 0;
+         // retrieve the tables from the metastore in batches to alleviate memory constraints
+         while (startIndex < materializedViewNames.size()) {
+           int endIndex = Math.min(startIndex + tableBatchSize, materializedViewNames.size());
+ 
+           List<Table> materializedViews;
+           try {
+             materializedViews = ms.getTableObjectsByName(catName, name, materializedViewNames.subList(startIndex, endIndex));
+           } catch (UnknownDBException e) {
+             throw new MetaException(e.getMessage());
+           }
+ 
+           if (materializedViews != null && !materializedViews.isEmpty()) {
+             for (Table materializedView : materializedViews) {
+               if (materializedView.getSd().getLocation() != null) {
+                 Path materializedViewPath = wh.getDnsPath(new Path(materializedView.getSd().getLocation()));
+                 if (!wh.isWritable(materializedViewPath.getParent())) {
+                   throw new MetaException("Database metadata not deleted since table: " +
+                       materializedView.getTableName() + " has a parent location " + materializedViewPath.getParent() +
+                       " which is not writable by " + SecurityUtils.getUser());
+                 }
+ 
+                 if (!FileUtils.isSubdirectory(databasePath.toString(),
+                     materializedViewPath.toString())) {
+                   tablePaths.add(materializedViewPath);
+                 }
+               }
+               // Drop the materialized view but not its data
+               drop_table(name, materializedView.getTableName(), false);
+               // Remove from all tables
+               uniqueTableNames.remove(materializedView.getTableName());
+             }
+           }
+           startIndex = endIndex;
+         }
+ 
+         // drop tables before dropping db
+         List<String> allTables = new ArrayList<>(uniqueTableNames);
+         startIndex = 0;
+         // retrieve the tables from the metastore in batches to alleviate memory constraints
+         while (startIndex < allTables.size()) {
+           int endIndex = Math.min(startIndex + tableBatchSize, allTables.size());
+ 
+           List<Table> tables;
+           try {
+             tables = ms.getTableObjectsByName(catName, name, allTables.subList(startIndex, endIndex));
+           } catch (UnknownDBException e) {
+             throw new MetaException(e.getMessage());
+           }
+ 
+           if (tables != null && !tables.isEmpty()) {
+             for (Table table : tables) {
+               // If the table is not external and it might not be in a subdirectory of the database
+               // add it's locations to the list of paths to delete
+               Path tablePath = null;
+               boolean tableDataShouldBeDeleted = checkTableDataShouldBeDeleted(table, deleteData);
+               if (table.getSd().getLocation() != null && tableDataShouldBeDeleted) {
+                 tablePath = wh.getDnsPath(new Path(table.getSd().getLocation()));
+                 if (!wh.isWritable(tablePath.getParent())) {
+                   throw new MetaException("Database metadata not deleted since table: " +
+                       table.getTableName() + " has a parent location " + tablePath.getParent() +
+                       " which is not writable by " + SecurityUtils.getUser());
+                 }
+ 
+                 if (!FileUtils.isSubdirectory(databasePath.toString(), tablePath.toString())) {
+                   tablePaths.add(tablePath);
+                 }
+               }
+ 
+               // For each partition in each table, drop the partitions and get a list of
+               // partitions' locations which might need to be deleted
+               partitionPaths = dropPartitionsAndGetLocations(ms, catName, name, table.getTableName(),
+                   tablePath, tableDataShouldBeDeleted);
+ 
+               // Drop the table but not its data
+               drop_table(MetaStoreUtils.prependCatalogToDbName(table.getCatName(), table.getDbName(), conf),
+                   table.getTableName(), false);
+             }
+ 
+             startIndex = endIndex;
+           }
+         }
+ 
+         if (ms.dropDatabase(catName, name)) {
+           if (!transactionalListeners.isEmpty()) {
+             transactionalListenerResponses =
+                 MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                       EventType.DROP_DATABASE,
+                                                       new DropDatabaseEvent(db, true, this));
+           }
+ 
+           success = ms.commitTransaction();
+         }
+       } finally {
+         if (!success) {
+           ms.rollbackTransaction();
+         } else if (deleteData) {
+           // Delete the data in the partitions which have other locations
+           deletePartitionData(partitionPaths, false, db);
+           // Delete the data in the tables which have other locations
+           for (Path tablePath : tablePaths) {
+             deleteTableData(tablePath, false, db);
+           }
+           // Delete the data in the database
+           try {
+             wh.deleteDir(new Path(db.getLocationUri()), true, db);
+           } catch (Exception e) {
+             LOG.error("Failed to delete database directory: " + db.getLocationUri() +
+                 " " + e.getMessage());
+           }
+           // it is not a terrible thing even if the data is not deleted
+         }
+ 
+         if (!listeners.isEmpty()) {
+           MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                 EventType.DROP_DATABASE,
+                                                 new DropDatabaseEvent(db, success, this),
+                                                 null,
+                                                 transactionalListenerResponses, ms);
+         }
+       }
+     }
+ 
+     @Override
+     public void drop_database(final String dbName, final boolean deleteData, final boolean cascade)
+         throws NoSuchObjectException, InvalidOperationException, MetaException {
+       startFunction("drop_database", ": " + dbName);
+       String[] parsedDbName = parseDbName(dbName, conf);
+       if (DEFAULT_CATALOG_NAME.equalsIgnoreCase(parsedDbName[CAT_NAME]) &&
+           DEFAULT_DATABASE_NAME.equalsIgnoreCase(parsedDbName[DB_NAME])) {
+         endFunction("drop_database", false, null);
+         throw new MetaException("Can not drop " + DEFAULT_DATABASE_NAME + " database in catalog "
+             + DEFAULT_CATALOG_NAME);
+       }
+ 
+       boolean success = false;
+       Exception ex = null;
+       try {
+         drop_database_core(getMS(), parsedDbName[CAT_NAME], parsedDbName[DB_NAME], deleteData,
+             cascade);
+         success = true;
+       } catch (NoSuchObjectException|InvalidOperationException|MetaException e) {
+         ex = e;
+         throw e;
+       } catch (Exception e) {
+         ex = e;
+         throw newMetaException(e);
+       } finally {
+         endFunction("drop_database", success, ex);
+       }
+     }
+ 
+ 
+     @Override
+     public List<String> get_databases(final String pattern) throws MetaException {
+       startFunction("get_databases", ": " + pattern);
+ 
+       String[] parsedDbNamed = parseDbName(pattern, conf);
+       List<String> ret = null;
+       Exception ex = null;
+       try {
+         if (parsedDbNamed[DB_NAME] == null) {
+           ret = getMS().getAllDatabases(parsedDbNamed[CAT_NAME]);
+         } else {
+           ret = getMS().getDatabases(parsedDbNamed[CAT_NAME], parsedDbNamed[DB_NAME]);
+         }
+       } catch (MetaException e) {
+         ex = e;
+         throw e;
+       } catch (Exception e) {
+         ex = e;
+         throw newMetaException(e);
+       } finally {
+         endFunction("get_databases", ret != null, ex);
+       }
+       return ret;
+     }
+ 
+     @Override
+     public List<String> get_all_databases() throws MetaException {
+       return get_databases(MetaStoreUtils.prependCatalogToDbName(null, null, conf));
+     }
+ 
+     private void create_type_core(final RawStore ms, final Type type)
+         throws AlreadyExistsException, MetaException, InvalidObjectException {
+       if (!MetaStoreUtils.validateName(type.getName(), null)) {
+         throw new InvalidObjectException("Invalid type name");
+       }
+ 
+       boolean success = false;
+       try {
+         ms.openTransaction();
+         if (is_type_exists(ms, type.getName())) {
+           throw new AlreadyExistsException("Type " + type.getName() + " already exists");
+         }
+         ms.createType(type);
+         success = ms.commitTransaction();
+       } finally {
+         if (!success) {
+           ms.rollbackTransaction();
+         }
+       }
+     }
+ 
+     @Override
+     public boolean create_type(final Type type) throws AlreadyExistsException,
+         MetaException, InvalidObjectException {
+       startFunction("create_type", ": " + type.toString());
+       boolean success = false;
+       Exception ex = null;
+       try {
+         create_type_core(getMS(), type);
+         success = true;
+       } catch (MetaException | InvalidObjectException | AlreadyExistsException e) {
+         ex = e;
+         throw e;
+       } catch (Exception e) {
+         ex = e;
+         throw newMetaException(e);
+       } finally {
+         endFunction("create_type", success, ex);
+       }
+ 
+       return success;
+     }
+ 
+     @Override
+     public Type get_type(final String name) throws MetaException, NoSuchObjectException {
+       startFunction("get_type", ": " + name);
+ 
+       Type ret = null;
+       Exception ex = null;
+       try {
+         ret = getMS().getType(name);
+         if (null == ret) {
+           throw new NoSuchObjectException("Type \"" + name + "\" not found.");
+         }
+       } catch (Exception e) {
+         ex = e;
+         throwMetaException(e);
+       } finally {
+         endFunction("get_type", ret != null, ex);
+       }
+       return ret;
+     }
+ 
+     private boolean is_type_exists(RawStore ms, String typeName)
+         throws MetaException {
+       return (ms.getType(typeName) != null);
+     }
+ 
+     @Override
+     public boolean drop_type(final String name) throws MetaException, NoSuchObjectException {
+       startFunction("drop_type", ": " + name);
+ 
+       boolean success = false;
+       Exception ex = null;
+       try {
+         // TODO:pc validate that there are no types that refer to this
+         success = getMS().dropType(name);
+       } catch (Exception e) {
+         ex = e;
+         throwMetaException(e);
+       } finally {
+         endFunction("drop_type", success, ex);
+       }
+       return success;
+     }
+ 
+     @Override
+     public Map<String, Type> get_type_all(String name) throws MetaException {
+       // TODO Auto-generated method stub
+       startFunction("get_type_all", ": " + name);
+       endFunction("get_type_all", false, null);
+       throw new MetaException("Not yet implemented");
+     }
+ 
+     private void create_table_core(final RawStore ms, final Table tbl,
+         final EnvironmentContext envContext)
+             throws AlreadyExistsException, MetaException,
+             InvalidObjectException, NoSuchObjectException {
+       create_table_core(ms, tbl, envContext, null, null, null, null, null, null);
+     }
+ 
+     private void create_table_core(final RawStore ms, final Table tbl,
+         final EnvironmentContext envContext, List<SQLPrimaryKey> primaryKeys,
+         List<SQLForeignKey> foreignKeys, List<SQLUniqueConstraint> uniqueConstraints,
+         List<SQLNotNullConstraint> notNullConstraints, List<SQLDefaultConstraint> defaultConstraints,
+                                    List<SQLCheckConstraint> checkConstraints)
+         throws AlreadyExistsException, MetaException,
+         InvalidObjectException, NoSuchObjectException {
+       // To preserve backward compatibility throw MetaException in case of null database
+       if (tbl.getDbName() == null) {
+         throw new MetaException("Null database name is not allowed");
+       }
+ 
+       if (!MetaStoreUtils.validateName(tbl.getTableName(), conf)) {
+         throw new InvalidObjectException(tbl.getTableName()
+             + " is not a valid object name");
+       }
+       String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols());
+       if (validate != null) {
+         throw new InvalidObjectException("Invalid column " + validate);
+       }
+       if (tbl.getPartitionKeys() != null) {
+         validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys());
+         if (validate != null) {
+           throw new InvalidObjectException("Invalid partition column " + validate);
+         }
+       }
+       SkewedInfo skew = tbl.getSd().getSkewedInfo();
+       if (skew != null) {
+         validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames());
+         if (validate != null) {
+           throw new InvalidObjectException("Invalid skew column " + validate);
+         }
+         validate = MetaStoreUtils.validateSkewedColNamesSubsetCol(
+             skew.getSkewedColNames(), tbl.getSd().getCols());
+         if (validate != null) {
+           throw new InvalidObjectException("Invalid skew column " + validate);
+         }
+       }
+ 
+       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+       Path tblPath = null;
+       boolean success = false, madeDir = false;
+       Database db = null;
+       try {
+         if (!tbl.isSetCatName()) {
+           tbl.setCatName(getDefaultCatalog(conf));
+         }
+         firePreEvent(new PreCreateTableEvent(tbl, this));
+ 
+         ms.openTransaction();
+ 
+         db = ms.getDatabase(tbl.getCatName(), tbl.getDbName());
+ 
+         // get_table checks whether database exists, it should be moved here
+         if (is_table_exists(ms, tbl.getCatName(), tbl.getDbName(), tbl.getTableName())) {
+           throw new AlreadyExistsException("Table " + getCatalogQualifiedTableName(tbl)
+               + " already exists");
+         }
+ 
+         if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
+           if (tbl.getSd().getLocation() == null
+               || tbl.getSd().getLocation().isEmpty()) {
+             tblPath = wh.getDefaultTablePath(db, tbl);
+           } else {
+             if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
+               LOG.warn("Location: " + tbl.getSd().getLocation()
+                   + " specified for non-external table:" + tbl.getTableName());
+             }
+             tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
+           }
+           tbl.getSd().setLocation(tblPath.toString());
+         }
+ 
+         if (tblPath != null) {
+           if (!wh.isDir(tblPath)) {
+             if (!wh.mkdirs(tblPath)) {
+               throw new MetaException(tblPath
+                   + " is not a directory or unable to create one");
+             }
+             madeDir = true;
+           }
+         }
+         if (MetastoreConf.getBoolVar(conf, ConfVars.STATS_AUTO_GATHER) &&
+             !MetaStoreUtils.isView(tbl)) {
+           MetaStoreUtils.updateTableStatsSlow(db, tbl, wh, madeDir, false, envContext);
+         }
+ 
+         // set create time
+         long time = System.currentTimeMillis() / 1000;
+         tbl.setCreateTime((int) time);
+         if (tbl.getParameters() == null ||
+             tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
+           tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
+         }
+ 
+         if (primaryKeys == null && foreignKeys == null
+                 && uniqueConstraints == null && notNullConstraints == null && defaultConstraints == null
+             && checkConstraints == null) {
+           ms.createTable(tbl);
+         } else {
+           // Check that constraints have catalog name properly set first
+           if (primaryKeys != null && !primaryKeys.isEmpty() && !primaryKeys.get(0).isSetCatName()) {
+             for (SQLPrimaryKey pkcol : primaryKeys) pkcol.setCatName(tbl.getCatName());
+           }
+           if (foreignKeys != null && !foreignKeys.isEmpty() && !foreignKeys.get(0).isSetCatName()) {
+             for (SQLForeignKey fkcol : foreignKeys) fkcol.setCatName(tbl.getCatName());
+           }
+           if (uniqueConstraints != null && !uniqueConstraints.isEmpty() && !uniqueConstraints.get(0).isSetCatName()) {
+             for (SQLUniqueConstraint uccol : uniqueConstraints) uccol.setCatName(tbl.getCatName());
+           }
+           if (notNullConstraints != null && !notNullConstraints.isEmpty() && !notNullConstraints.get(0).isSetCatName()) {
+             for (SQLNotNullConstraint nncol : notNullConstraints) nncol.setCatName(tbl.getCatName());
+           }
+           if (defaultConstraints != null && !defaultConstraints.isEmpty() && !defaultConstraints.get(0).isSetCatName()) {
+             for (SQLDefaultConstraint dccol : defaultConstraints) dccol.setCatName(tbl.getCatName());
+           }
+           if (checkConstraints != null && !checkConstraints.isEmpty() && !checkConstraints.get(0).isSetCatName()) {
+             for (SQLCheckConstraint cccol : checkConstraints) cccol.setCatName(tbl.getCatName());
+           }
+           // Set constraint name if null before sending to listener
+           List<String> constraintNames = ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys,
+               uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+           int primaryKeySize = 0;
+           if (primaryKeys != null) {
+             primaryKeySize = primaryKeys.size();
+             for (int i = 0; i < primaryKeys.size(); i++) {
+               if (primaryKeys.get(i).getPk_name() == null) {
+                 primaryKeys.get(i).setPk_name(constraintNames.get(i));
+               }
+               if (!primaryKeys.get(i).isSetCatName()) primaryKeys.get(i).setCatName(tbl.getCatName());
+             }
+           }
+           int foreignKeySize = 0;
+           if (foreignKeys != null) {
+             foreignKeySize = foreignKeys.size();
+             for (int i = 0; i < foreignKeySize; i++) {
+               if (foreignKeys.get(i).getFk_name() == null) {
+                 foreignKeys.get(i).setFk_name(constraintNames.get(primaryKeySize + i));
+               }
+               if (!foreignKeys.get(i).isSetCatName()) foreignKeys.get(i).setCatName(tbl.getCatName());
+             }
+           }
+           int uniqueConstraintSize = 0;
+           if (uniqueConstraints != null) {
+             uniqueConstraintSize = uniqueConstraints.size();
+             for (int i = 0; i < uniqueConstraintSize; i++) {
+               if (uniqueConstraints.get(i).getUk_name() == null) {
+                 uniqueConstraints.get(i).setUk_name(constraintNames.get(primaryKeySize + foreignKeySize + i));
+               }
+               if (!uniqueConstraints.get(i).isSetCatName()) uniqueConstraints.get(i).setCatName(tbl.getCatName());
+             }
+           }
+           int notNullConstraintSize =  0;
+           if (notNullConstraints != null) {
+             for (int i = 0; i < notNullConstraints.size(); i++) {
+               if (notNullConstraints.get(i).getNn_name() == null) {
+                 notNullConstraints.get(i).setNn_name(constraintNames.get(primaryKeySize + foreignKeySize + uniqueConstraintSize + i));
+               }
+               if (!notNullConstraints.get(i).isSetCatName()) notNullConstraints.get(i).setCatName(tbl.getCatName());
+             }
+           }
+           int defaultConstraintSize =  0;
+           if (defaultConstraints!= null) {
+             for (int i = 0; i < defaultConstraints.size(); i++) {
+               if (defaultConstraints.get(i).getDc_name() == null) {
+                 defaultConstraints.get(i).setDc_name(constraintNames.get(primaryKeySize + foreignKeySize
+                     + uniqueConstraintSize + notNullConstraintSize + i));
+               }
+               if (!defaultConstraints.get(i).isSetCatName()) defaultConstraints.get(i).setCatName(tbl.getCatName());
+             }
+           }
+           if (checkConstraints!= null) {
+             for (int i = 0; i < checkConstraints.size(); i++) {
+               if (checkConstraints.get(i).getDc_name() == null) {
+                 checkConstraints.get(i).setDc_name(constraintNames.get(primaryKeySize + foreignKeySize
+                                                                              + uniqueConstraintSize
+                                                                              + defaultConstraintSize
+                                                                            + notNullConstraintSize + i));
+               }
+               if (!checkConstraints.get(i).isSetCatName()) checkConstraints.get(i).setCatName(tbl.getCatName());
+             }
+           }
+         }
+ 
+         if (!transactionalListeners.isEmpty()) {
+           transactionalListenerResponses = MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+               EventType.CREATE_TABLE, new CreateTableEvent(tbl, true, this), envContext);
+           if (primaryKeys != null && !primaryKeys.isEmpty()) {
+             MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_PRIMARYKEY,
+                 new AddPrimaryKeyEvent(primaryKeys, true, this), envContext);
+           }
+           if (foreignKeys != null && !foreignKeys.isEmpty()) {
+             MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_FOREIGNKEY,
+                 new AddForeignKeyEvent(foreignKeys, true, this), envContext);
+           }
+           if (uniqueConstraints != null && !uniqueConstraints.isEmpty()) {
+             MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_UNIQUECONSTRAINT,
+                 new AddUniqueConstraintEvent(uniqueConstraints, true, this), envContext);
+           }
+           if (notNullConstraints != null && !notNullConstraints.isEmpty()) {
+             MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_NOTNULLCONSTRAINT,
+                 new AddNotNullConstraintEvent(notNullConstraints, true, this), envContext);
+           }
+         }
+ 
+         success = ms.commitTransaction();
+       } finally {
+         if (!success) {
+           ms.rollbackTransaction();
+           if (madeDir) {
+             wh.deleteDir(tblPath, true, db);
+           }
+         }
+ 
+         if (!listeners.isEmpty()) {
+           MetaStoreListenerNotifier.notifyEvent(listeners, EventType.CREATE_TABLE,
+               new CreateTableEvent(tbl, success, this), envContext, transactionalListenerResponses, ms);
+           if (primaryKeys != null && !primaryKeys.isEmpty()) {
+             MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_PRIMARYKEY,
+                 new AddPrimaryKeyEvent(primaryKeys, success, this), envContext);
+           }
+           if (foreignKeys != null && !foreignKeys.isEmpty()) {
+             MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_FOREIGNKEY,
+                 new AddForeignKeyEvent(foreignKeys, success, this), envContext);
+           }
+           if (uniqueConstraints != null && !uniqueConstraints.isEmpty()) {
+             MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_UNIQUECONSTRAINT,
+                 new AddUniqueConstraintEvent(uniqueConstraints, success, this), envContext);
+           }
+           if (notNullConstraints != null && !notNullConstraints.isEmpty()) {
+             MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_NOTNULLCONSTRAINT,
+                 new AddNotNullConstraintEvent(notNullConstraints, success, this), envContext);
+           }
+         }
+       }
+     }
+ 
+     @Override
+     public void create_table(final Table tbl) throws AlreadyExistsException,
+         MetaException, InvalidObjectException {
+       create_table_with_environment_context(tbl, null);
+     }
+ 
+     @Override
+     public void create_table_with_environment_context(final Table tbl,
+         final EnvironmentContext envContext)
+         throws AlreadyExistsException, MetaException, InvalidObjectException {
+       startFunction("create_table", ": " + tbl.toString());
+       boolean success = false;
+       Exception ex = null;
+       try {
+         create_table_core(getMS(), tbl, envContext);
+         success = true;
+       } catch (NoSuchObjectException e) {
+         LOG.warn("create_table_with_environment_context got ", e);
+         ex = e;
+         throw new InvalidObjectException(e.getMessage());
+       } catch (MetaException | InvalidObjectException | AlreadyExistsException e) {
+         ex = e;
+         throw e;
+       } catch (Exception e) {
+         ex = e;
+         throw newMetaException(e);
+       } finally {
+         endFunction("create_table", success, ex, tbl.getTableName());
+       }
+     }
+ 
+     @Override
+     public void create_table_with_constraints(final Table tbl,
+         final List<SQLPrimaryKey> primaryKeys, final List<SQLForeignKey> foreignKeys,
+         List<SQLUniqueConstraint> uniqueConstraints,
+         List<SQLNotNullConstraint> notNullConstraints,
+         List<SQLDefaultConstraint> defaultConstraints,
+         List<SQLCheckConstraint> checkConstraints)
+         throws AlreadyExistsException, MetaException, InvalidObjectException {
+       startFunction("create_table", ": " + tbl.toString());
+       boolean success = false;
+       Exception ex = null;
+       try {
+         create_table_core(getMS(), tbl, null, primaryKeys, foreignKeys,
+             uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+         success = true;
+       } catch (NoSuchObjectException e) {
+         ex = e;
+         throw new InvalidObjectException(e.getMessage());
+       } catch (MetaException | InvalidObjectException | AlreadyExistsException e) {
+         ex = e;
+         throw e;
+       } catch (Exception e) {
+         ex = e;
+         throw newMetaException(e);
+       } finally {
+         endFunction("create_table", success, ex, tbl.getTableName());
+       }
+     }
+ 
+     @Override
+     public void drop_constraint(DropConstraintRequest req)
+         throws MetaException, InvalidObjectException {
+       String catName = req.isSetCatName() ? req.getCatName() : getDefaultCatalog(conf);
+       String dbName = req.getDbname();
+       String tableName = req.getTablename();
+       String constraintName = req.getConstraintname();
+       startFunction("drop_constraint", ": " + constraintName);
+       boolean success = false;
+       Exception ex = null;
+       RawStore ms = getMS();
+       try {
+         ms.openTransaction();
+         ms.dropConstraint(catName, dbName, tableName, constraintName);
+         if (transactionalListeners.size() > 0) {
+           DropConstraintEvent dropConstraintEvent = new DropConstraintEvent(catName, dbName,
+               tableName, constraintName, true, this);
+           for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+             transactionalListener.onDropConstraint(dropConstraintEvent);
+           }
+         }
+         success = ms.commitTransaction();
+       } catch (NoSuchObjectException e) {
+         ex = e;
+         throw new InvalidObjectException(e.getMessage());
+       } catch (MetaException e) {


<TRUNCATED>