You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2017/11/21 20:07:37 UTC

[3/7] hive git commit: HIVE-17967 Move HiveMetaStore class. This closes #270 (Alan Gates, reviewed by Thejas Nair).

http://git-wip-us.apache.org/repos/asf/hive/blob/8fcc7f32/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
new file mode 100644
index 0000000..7a636aa
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -0,0 +1,8037 @@
+/* * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_COMMENT;
+import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import javax.jdo.JDOException;
+
+import com.codahale.metrics.Counter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
+
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
+import org.apache.hadoop.hive.metastore.cache.CachedStore;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AddNotNullConstraintEvent;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AddPrimaryKeyEvent;
+import org.apache.hadoop.hive.metastore.events.AddUniqueConstraintEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropConstraintEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hadoop.hive.metastore.events.PreAddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.PreAddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.PreAlterDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.PreAlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.PreAlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.PreAuthorizationCallEvent;
+import org.apache.hadoop.hive.metastore.events.PreCreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropTableEvent;
+import org.apache.hadoop.hive.metastore.events.PreEventContext;
+import org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent;
+import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+import org.apache.hadoop.hive.metastore.metrics.JvmPauseMonitor;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
+import org.apache.hadoop.hive.metastore.security.TUGIContainingTransport;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.HdfsUtils;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.metastore.utils.LogUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.fb_status;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * TODO:pc remove application logic to a separate interface.
+ */
+public class HiveMetaStore extends ThriftHiveMetastore {
+  public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStore.class);
+  public static final String PARTITION_NUMBER_EXCEED_LIMIT_MSG =
+      "Number of partitions scanned (=%d) on table '%s' exceeds limit (=%d). This is controlled on the metastore server by %s.";
+
+  // boolean that tells if the HiveMetaStore (remote) server is being used.
+  // Can be used to determine if the calls to metastore api (HMSHandler) are being made with
+  // embedded metastore or a remote one
+  private static boolean isMetaStoreRemote = false;
+
+  // Used for testing to simulate method timeout.
+  @VisibleForTesting
+  static boolean TEST_TIMEOUT_ENABLED = false;
+  @VisibleForTesting
+  static long TEST_TIMEOUT_VALUE = -1;
+
+  private static ShutdownHookManager shutdownHookMgr;
+
+  public static final String ADMIN = "admin";
+  public static final String PUBLIC = "public";
+  /** MM write states. */
+  public static final char MM_WRITE_OPEN = 'o', MM_WRITE_COMMITTED = 'c', MM_WRITE_ABORTED = 'a';
+
+  private static HadoopThriftAuthBridge.Server saslServer;
+  private static MetastoreDelegationTokenManager delegationTokenManager;
+  private static boolean useSasl;
+
+  static final String NO_FILTER_STRING = "";
+  static final int UNLIMITED_MAX_PARTITIONS = -1;
+
+  private static final class ChainedTTransportFactory extends TTransportFactory {
+    private final TTransportFactory parentTransFactory;
+    private final TTransportFactory childTransFactory;
+
+    private ChainedTTransportFactory(
+        TTransportFactory parentTransFactory,
+        TTransportFactory childTransFactory) {
+      this.parentTransFactory = parentTransFactory;
+      this.childTransFactory = childTransFactory;
+    }
+
+    @Override
+    public TTransport getTransport(TTransport trans) {
+      return childTransFactory.getTransport(parentTransFactory.getTransport(trans));
+    }
+  }
+
+  public static class HMSHandler extends FacebookBase implements IHMSHandler {
+    public static final Logger LOG = HiveMetaStore.LOG;
+    private final Configuration conf; // stores datastore (jpox) properties,
+                                     // right now they come from jpox.properties
+
+    private static String currentUrl;
+    private FileMetadataManager fileMetadataManager;
+    private PartitionExpressionProxy expressionProxy;
+    private StorageSchemaReader storageSchemaReader;
+
+    // Variables for metrics
+    // Package visible so that HMSMetricsListener can see them.
+    static AtomicInteger databaseCount, tableCount, partCount;
+
+    private Warehouse wh; // hdfs warehouse
+    private static final ThreadLocal<RawStore> threadLocalMS =
+        new ThreadLocal<RawStore>() {
+          @Override
+          protected RawStore initialValue() {
+            return null;
+          }
+        };
+
+    private static final ThreadLocal<TxnStore> threadLocalTxn = new ThreadLocal<TxnStore>() {
+      @Override
+      protected TxnStore initialValue() {
+        return null;
+      }
+    };
+
+    private static final ThreadLocal<Map<String, com.codahale.metrics.Timer.Context>> timerContexts =
+        new ThreadLocal<Map<String, com.codahale.metrics.Timer.Context>>() {
+      @Override
+      protected Map<String, com.codahale.metrics.Timer.Context> initialValue() {
+        return new HashMap<>();
+      }
+    };
+
+    public static RawStore getRawStore() {
+      return threadLocalMS.get();
+    }
+
+    static void removeRawStore() {
+      threadLocalMS.remove();
+    }
+
+    // Thread local configuration is needed as many threads could make changes
+    // to the conf using the connection hook
+    private static final ThreadLocal<Configuration> threadLocalConf =
+        new ThreadLocal<Configuration>() {
+          @Override
+          protected Configuration initialValue() {
+            return null;
+          }
+        };
+
+    /**
+     * Thread local HMSHandler used during shutdown to notify meta listeners
+     */
+    private static final ThreadLocal<HMSHandler> threadLocalHMSHandler = new ThreadLocal<>();
+
+    /**
+     * Thread local Map to keep track of modified meta conf keys
+     */
+    private static final ThreadLocal<Map<String, String>> threadLocalModifiedConfig =
+        new ThreadLocal<>();
+
+    private static ExecutorService threadPool;
+
+    static final Logger auditLog = LoggerFactory.getLogger(
+        HiveMetaStore.class.getName() + ".audit");
+    
+    private static void logAuditEvent(String cmd) {
+      if (cmd == null) {
+        return;
+      }
+
+      UserGroupInformation ugi;
+      try {
+        ugi = SecurityUtils.getUGI();
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+
+      String address = getIPAddress();
+      if (address == null) {
+        address = "unknown-ip-addr";
+      }
+
+      auditLog.info("ugi={}	ip={}	cmd={}	", ugi.getUserName(), address, cmd);
+    }
+
+    private static String getIPAddress() {
+      if (useSasl) {
+        if (saslServer != null && saslServer.getRemoteAddress() != null) {
+          return saslServer.getRemoteAddress().getHostAddress();
+        }
+      } else {
+        // if kerberos is not enabled
+        return getThreadLocalIpAddress();
+      }
+      return null;
+    }
+
+    private static int nextSerialNum = 0;
+    private static ThreadLocal<Integer> threadLocalId = new ThreadLocal<Integer>() {
+      @Override
+      protected Integer initialValue() {
+        return nextSerialNum++;
+      }
+    };
+
+    // This will only be set if the metastore is being accessed from a metastore Thrift server,
+    // not if it is from the CLI. Also, only if the TTransport being used to connect is an
+    // instance of TSocket. This is also not set when kerberos is used.
+    private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
+      @Override
+      protected String initialValue() {
+        return null;
+      }
+    };
+
+    /**
+     * Internal function to notify listeners for meta config change events
+     */
+    private void notifyMetaListeners(String key, String oldValue, String newValue) throws MetaException {
+      for (MetaStoreEventListener listener : listeners) {
+        listener.onConfigChange(new ConfigChangeEvent(this, key, oldValue, newValue));
+      }
+
+      if (transactionalListeners.size() > 0) {
+        // All the fields of this event are final, so no reason to create a new one for each
+        // listener
+        ConfigChangeEvent cce = new ConfigChangeEvent(this, key, oldValue, newValue);
+        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+          transactionalListener.onConfigChange(cce);
+        }
+      }
+    }
+
+    /**
+     * Internal function to notify listeners to revert back to old values of keys
+     * that were modified during setMetaConf. This would get called from HiveMetaStore#cleanupRawStore
+     */
+    private void notifyMetaListenersOnShutDown() {
+      Map<String, String> modifiedConf = threadLocalModifiedConfig.get();
+      if (modifiedConf == null) {
+        // Nothing got modified
+        return;
+      }
+      try {
+        Configuration conf = threadLocalConf.get();
+        if (conf == null) {
+          throw new MetaException("Unexpected: modifiedConf is non-null but conf is null");
+        }
+        // Notify listeners of the changed value
+        for (Entry<String, String> entry : modifiedConf.entrySet()) {
+          String key = entry.getKey();
+          // curr value becomes old and vice-versa
+          String currVal = entry.getValue();
+          String oldVal = conf.get(key);
+          if (!Objects.equals(oldVal, currVal)) {
+            notifyMetaListeners(key, oldVal, currVal);
+          }
+        }
+        logInfo("Meta listeners shutdown notification completed.");
+      } catch (MetaException e) {
+        LOG.error("Failed to notify meta listeners on shutdown: ", e);
+      }
+    }
+
+    static void setThreadLocalIpAddress(String ipAddress) {
+      threadLocalIpAddress.set(ipAddress);
+    }
+
+    // This will return null if the metastore is not being accessed from a metastore Thrift server,
+    // or if the TTransport being used to connect is not an instance of TSocket, or if kereberos
+    // is used
+    static String getThreadLocalIpAddress() {
+      return threadLocalIpAddress.get();
+    }
+
+    // Make it possible for tests to check that the right type of PartitionExpressionProxy was
+    // instantiated.
+    @VisibleForTesting
+    PartitionExpressionProxy getExpressionProxy() {
+      return expressionProxy;
+    }
+
+    /**
+     * Use {@link #getThreadId()} instead.
+     * @return thread id
+     */
+    @Deprecated
+    public static Integer get() {
+      return threadLocalId.get();
+    }
+
+    @Override
+    public int getThreadId() {
+      return threadLocalId.get();
+    }
+
+    public HMSHandler(String name) throws MetaException {
+      this(name, MetastoreConf.newMetastoreConf(), true);
+    }
+
+    public HMSHandler(String name, Configuration conf) throws MetaException {
+      this(name, conf, true);
+    }
+
+    public HMSHandler(String name, Configuration conf, boolean init) throws MetaException {
+      super(name);
+      this.conf = conf;
+      isInTest = MetastoreConf.getBoolVar(this.conf, ConfVars.HIVE_IN_TEST);
+      if (threadPool == null) {
+        synchronized (HMSHandler.class) {
+          int numThreads = MetastoreConf.getIntVar(conf, ConfVars.FS_HANDLER_THREADS_COUNT);
+          threadPool = Executors.newFixedThreadPool(numThreads,
+              new ThreadFactoryBuilder().setDaemon(true)
+                  .setNameFormat("HMSHandler #%d").build());
+        }
+      }
+      if (init) {
+        init();
+      }
+    }
+
+    /**
+     * Use {@link #getConf()} instead.
+     * @return Configuration object
+     */
+    @Deprecated
+    public Configuration getHiveConf() {
+      return conf;
+    }
+
+    private ClassLoader classLoader;
+    private AlterHandler alterHandler;
+    private List<MetaStorePreEventListener> preListeners;
+    private List<MetaStoreEventListener> listeners;
+    private List<TransactionalMetaStoreEventListener> transactionalListeners;
+    private List<MetaStoreEndFunctionListener> endFunctionListeners;
+    private List<MetaStoreInitListener> initListeners;
+    private Pattern partitionValidationPattern;
+    private final boolean isInTest;
+
+    {
+      classLoader = Thread.currentThread().getContextClassLoader();
+      if (classLoader == null) {
+        classLoader = Configuration.class.getClassLoader();
+      }
+    }
+
+    @Override
+    public List<TransactionalMetaStoreEventListener> getTransactionalListeners() {
+      return transactionalListeners;
+    }
+
+    @Override
+    public void init() throws MetaException {
+      initListeners = MetaStoreUtils.getMetaStoreListeners(
+          MetaStoreInitListener.class, conf, MetastoreConf.getVar(conf, ConfVars.INIT_HOOKS));
+      for (MetaStoreInitListener singleInitListener: initListeners) {
+          MetaStoreInitContext context = new MetaStoreInitContext();
+          singleInitListener.onInit(context);
+      }
+
+      String alterHandlerName = MetastoreConf.getVar(conf, ConfVars.ALTER_HANDLER);
+      alterHandler = ReflectionUtils.newInstance(JavaUtils.getClass(
+          alterHandlerName, AlterHandler.class), conf);
+      wh = new Warehouse(conf);
+
+      synchronized (HMSHandler.class) {
+        if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(conf))) {
+          createDefaultDB();
+          createDefaultRoles();
+          addAdminUsers();
+          currentUrl = MetaStoreInit.getConnectionURL(conf);
+        }
+      }
+
+      //Start Metrics
+      if (MetastoreConf.getBoolVar(conf, ConfVars.METRICS_ENABLED)) {
+        LOG.info("Begin calculating metadata count metrics.");
+        Metrics.initialize(conf);
+        databaseCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_DATABASES);
+        tableCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_TABLES);
+        partCount = Metrics.getOrCreateGauge(MetricsConstants.TOTAL_PARTITIONS);
+        updateMetrics();
+
+      }
+
+      preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
+          conf, MetastoreConf.getVar(conf, ConfVars.PRE_EVENT_LISTENERS));
+      preListeners.add(0, new TransactionalValidationListener(conf));
+      listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, conf,
+          MetastoreConf.getVar(conf, ConfVars.EVENT_LISTENERS));
+      listeners.add(new SessionPropertiesListener(conf));
+      listeners.add(new AcidEventListener(conf));
+      transactionalListeners = MetaStoreUtils.getMetaStoreListeners(TransactionalMetaStoreEventListener.class,
+          conf, MetastoreConf.getVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS));
+      if (Metrics.getRegistry() != null) {
+        listeners.add(new HMSMetricsListener(conf));
+      }
+
+      endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
+          MetaStoreEndFunctionListener.class, conf, MetastoreConf.getVar(conf, ConfVars.END_FUNCTION_LISTENERS));
+
+      String partitionValidationRegex =
+          MetastoreConf.getVar(conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN);
+      if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) {
+        partitionValidationPattern = Pattern.compile(partitionValidationRegex);
+      } else {
+        partitionValidationPattern = null;
+      }
+
+      ThreadPool.initialize(conf);
+      Collection<String> taskNames =
+          MetastoreConf.getStringCollection(conf, ConfVars.TASK_THREADS_ALWAYS);
+      for (String taskName : taskNames) {
+        MetastoreTaskThread task =
+            JavaUtils.newInstance(JavaUtils.getClass(taskName, MetastoreTaskThread.class));
+        task.setConf(conf);
+        long freq = task.runFrequency(TimeUnit.MILLISECONDS);
+        // For backwards compatibility, since some threads used to be hard coded but only run if
+        // frequency was > 0
+        if (freq > 0) {
+          ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
+
+        }
+      }
+      expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
+      fileMetadataManager = new FileMetadataManager(this.getMS(), conf);
+    }
+
+    private static String addPrefix(String s) {
+      return threadLocalId.get() + ": " + s;
+    }
+
+    /**
+     * Set copy of invoking HMSHandler on thread local
+     */
+    private static void setHMSHandler(HMSHandler handler) {
+      if (threadLocalHMSHandler.get() == null) {
+        threadLocalHMSHandler.set(handler);
+      }
+    }
+    @Override
+    public void setConf(Configuration conf) {
+      threadLocalConf.set(conf);
+      RawStore ms = threadLocalMS.get();
+      if (ms != null) {
+        ms.setConf(conf); // reload if DS related configuration is changed
+      }
+    }
+
+    @Override
+    public Configuration getConf() {
+      Configuration conf = threadLocalConf.get();
+      if (conf == null) {
+        conf = new Configuration(this.conf);
+        threadLocalConf.set(conf);
+      }
+      return conf;
+    }
+
+    private Map<String, String> getModifiedConf() {
+      Map<String, String> modifiedConf = threadLocalModifiedConfig.get();
+      if (modifiedConf == null) {
+        modifiedConf = new HashMap<>();
+        threadLocalModifiedConfig.set(modifiedConf);
+      }
+      return modifiedConf;
+    }
+
+    @Override
+    public Warehouse getWh() {
+      return wh;
+    }
+
+    @Override
+    public void setMetaConf(String key, String value) throws MetaException {
+      ConfVars confVar = MetastoreConf.getMetaConf(key);
+      if (confVar == null) {
+        throw new MetaException("Invalid configuration key " + key);
+      }
+      try {
+        confVar.validate(value);
+      } catch (IllegalArgumentException e) {
+        throw new MetaException("Invalid configuration value " + value + " for key " + key +
+            " by " + e.getMessage());
+      }
+      Configuration configuration = getConf();
+      String oldValue = MetastoreConf.get(configuration, key);
+      // Save prev val of the key on threadLocal
+      Map<String, String> modifiedConf = getModifiedConf();
+      if (!modifiedConf.containsKey(key)) {
+        modifiedConf.put(key, oldValue);
+      }
+      // Set invoking HMSHandler on threadLocal, this will be used later to notify
+      // metaListeners in HiveMetaStore#cleanupRawStore
+      setHMSHandler(this);
+      configuration.set(key, value);
+      notifyMetaListeners(key, oldValue, value);
+    }
+
+    @Override
+    public String getMetaConf(String key) throws MetaException {
+      ConfVars confVar = MetastoreConf.getMetaConf(key);
+      if (confVar == null) {
+        throw new MetaException("Invalid configuration key " + key);
+      }
+      return getConf().get(key, confVar.getDefaultVal().toString());
+    }
+
+    /**
+     * Get a cached RawStore.
+     *
+     * @return the cached RawStore
+     * @throws MetaException
+     */
+    @Override
+    public RawStore getMS() throws MetaException {
+      Configuration conf = getConf();
+      return getMSForConf(conf);
+    }
+
+    public static RawStore getMSForConf(Configuration conf) throws MetaException {
+      RawStore ms = threadLocalMS.get();
+      if (ms == null) {
+        ms = newRawStoreForConf(conf);
+        ms.verifySchema();
+        threadLocalMS.set(ms);
+        ms = threadLocalMS.get();
+      }
+      return ms;
+    }
+
+    private TxnStore getTxnHandler() {
+      TxnStore txn = threadLocalTxn.get();
+      if (txn == null) {
+        txn = TxnUtils.getTxnStore(conf);
+        threadLocalTxn.set(txn);
+      }
+      return txn;
+    }
+
+    private static RawStore newRawStoreForConf(Configuration conf) throws MetaException {
+      Configuration newConf = new Configuration(conf);
+      String rawStoreClassName = MetastoreConf.getVar(newConf, ConfVars.RAW_STORE_IMPL);
+      LOG.info(addPrefix("Opening raw store with implementation class:" + rawStoreClassName));
+      return RawStoreProxy.getProxy(newConf, conf, rawStoreClassName, threadLocalId.get());
+    }
+
+    private void createDefaultDB_core(RawStore ms) throws MetaException, InvalidObjectException {
+      try {
+        ms.getDatabase(DEFAULT_DATABASE_NAME);
+      } catch (NoSuchObjectException e) {
+        Database db = new Database(DEFAULT_DATABASE_NAME, DEFAULT_DATABASE_COMMENT,
+          wh.getDefaultDatabasePath(DEFAULT_DATABASE_NAME).toString(), null);
+        db.setOwnerName(PUBLIC);
+        db.setOwnerType(PrincipalType.ROLE);
+        ms.createDatabase(db);
+      }
+    }
+
+    /**
+     * create default database if it doesn't exist.
+     *
+     * This is a potential contention when HiveServer2 using embedded metastore and Metastore
+     * Server try to concurrently invoke createDefaultDB. If one failed, JDOException was caught
+     * for one more time try, if failed again, simply ignored by warning, which meant another
+     * succeeds.
+     *
+     * @throws MetaException
+     */
+    private void createDefaultDB() throws MetaException {
+      try {
+        createDefaultDB_core(getMS());
+      } catch (JDOException e) {
+        LOG.warn("Retrying creating default database after error: " + e.getMessage(), e);
+        try {
+          createDefaultDB_core(getMS());
+        } catch (InvalidObjectException e1) {
+          throw new MetaException(e1.getMessage());
+        }
+      } catch (InvalidObjectException e) {
+        throw new MetaException(e.getMessage());
+      }
+    }
+
+    /**
+     * create default roles if they don't exist.
+     *
+     * This is a potential contention when HiveServer2 using embedded metastore and Metastore
+     * Server try to concurrently invoke createDefaultRoles. If one failed, JDOException was caught
+     * for one more time try, if failed again, simply ignored by warning, which meant another
+     * succeeds.
+     *
+     * @throws MetaException
+     */
+    private void createDefaultRoles() throws MetaException {
+      try {
+        createDefaultRoles_core();
+      } catch (JDOException e) {
+        LOG.warn("Retrying creating default roles after error: " + e.getMessage(), e);
+        createDefaultRoles_core();
+      }
+    }
+
+    private void createDefaultRoles_core() throws MetaException {
+
+      RawStore ms = getMS();
+      try {
+        ms.addRole(ADMIN, ADMIN);
+      } catch (InvalidObjectException e) {
+        LOG.debug(ADMIN +" role already exists",e);
+      } catch (NoSuchObjectException e) {
+        // This should never be thrown.
+        LOG.warn("Unexpected exception while adding " +ADMIN+" roles" , e);
+      }
+      LOG.info("Added "+ ADMIN+ " role in metastore");
+      try {
+        ms.addRole(PUBLIC, PUBLIC);
+      } catch (InvalidObjectException e) {
+        LOG.debug(PUBLIC + " role already exists",e);
+      } catch (NoSuchObjectException e) {
+        // This should never be thrown.
+        LOG.warn("Unexpected exception while adding "+PUBLIC +" roles" , e);
+      }
+      LOG.info("Added "+PUBLIC+ " role in metastore");
+      // now grant all privs to admin
+      PrivilegeBag privs = new PrivilegeBag();
+      privs.addToPrivileges(new HiveObjectPrivilege( new HiveObjectRef(HiveObjectType.GLOBAL, null,
+        null, null, null), ADMIN, PrincipalType.ROLE, new PrivilegeGrantInfo("All", 0, ADMIN,
+        PrincipalType.ROLE, true)));
+      try {
+        ms.grantPrivileges(privs);
+      } catch (InvalidObjectException e) {
+        // Surprisingly these privs are already granted.
+        LOG.debug("Failed while granting global privs to admin", e);
+      } catch (NoSuchObjectException e) {
+        // Unlikely to be thrown.
+        LOG.warn("Failed while granting global privs to admin", e);
+      }
+    }
+
+    /**
+     * add admin users if they don't exist.
+     *
+     * This is a potential contention when HiveServer2 using embedded metastore and Metastore
+     * Server try to concurrently invoke addAdminUsers. If one failed, JDOException was caught for
+     * one more time try, if failed again, simply ignored by warning, which meant another succeeds.
+     *
+     * @throws MetaException
+     */
+    private void addAdminUsers() throws MetaException {
+      try {
+        addAdminUsers_core();
+      } catch (JDOException e) {
+        LOG.warn("Retrying adding admin users after error: " + e.getMessage(), e);
+        addAdminUsers_core();
+      }
+    }
+
+    private void addAdminUsers_core() throws MetaException {
+
+      // now add pre-configured users to admin role
+      String userStr = MetastoreConf.getVar(conf,ConfVars.USERS_IN_ADMIN_ROLE,"").trim();
+      if (userStr.isEmpty()) {
+        LOG.info("No user is added in admin role, since config is empty");
+        return;
+      }
+      // Since user names need to be valid unix user names, per IEEE Std 1003.1-2001 they cannot
+      // contain comma, so we can safely split above string on comma.
+
+     Iterator<String> users = Splitter.on(",").trimResults().omitEmptyStrings().split(userStr).iterator();
+      if (!users.hasNext()) {
+        LOG.info("No user is added in admin role, since config value "+ userStr +
+          " is in incorrect format. We accept comma separated list of users.");
+        return;
+      }
+      Role adminRole;
+      RawStore ms = getMS();
+      try {
+        adminRole = ms.getRole(ADMIN);
+      } catch (NoSuchObjectException e) {
+        LOG.error("Failed to retrieve just added admin role",e);
+        return;
+      }
+      while (users.hasNext()) {
+        String userName = users.next();
+        try {
+          ms.grantRole(adminRole, userName, PrincipalType.USER, ADMIN, PrincipalType.ROLE, true);
+          LOG.info("Added " + userName + " to admin role");
+        } catch (NoSuchObjectException e) {
+          LOG.error("Failed to add "+ userName + " in admin role",e);
+        } catch (InvalidObjectException e) {
+          LOG.debug(userName + " already in admin role", e);
+        }
+      }
+    }
+
+    private static void logInfo(String m) {
+      LOG.info(threadLocalId.get().toString() + ": " + m);
+      logAuditEvent(m);
+    }
+
+    private String startFunction(String function, String extraLogInfo) {
+      incrementCounter(function);
+      logInfo((getThreadLocalIpAddress() == null ? "" : "source:" + getThreadLocalIpAddress() + " ") +
+          function + extraLogInfo);
+      com.codahale.metrics.Timer timer =
+          Metrics.getOrCreateTimer(MetricsConstants.API_PREFIX + function);
+      if (timer != null) {
+        // Timer will be null we aren't using the metrics
+        timerContexts.get().put(function, timer.time());
+      }
+      Counter counter = Metrics.getOrCreateCounter(MetricsConstants.ACTIVE_CALLS + function);
+      if (counter != null) counter.inc();
+      return function;
+    }
+
+    private String startFunction(String function) {
+      return startFunction(function, "");
+    }
+
+    private void startTableFunction(String function, String db, String tbl) {
+      startFunction(function, " : db=" + db + " tbl=" + tbl);
+    }
+
+    private void startMultiTableFunction(String function, String db, List<String> tbls) {
+      String tableNames = join(tbls, ",");
+      startFunction(function, " : db=" + db + " tbls=" + tableNames);
+    }
+
+    private void startPartitionFunction(String function, String db, String tbl,
+                                        List<String> partVals) {
+      startFunction(function, " : db=" + db + " tbl=" + tbl + "[" + join(partVals, ",") + "]");
+    }
+
+    private void startPartitionFunction(String function, String db, String tbl,
+                                        Map<String, String> partName) {
+      startFunction(function, " : db=" + db + " tbl=" + tbl + "partition=" + partName);
+    }
+
+    private void endFunction(String function, boolean successful, Exception e) {
+      endFunction(function, successful, e, null);
+    }
+    private void endFunction(String function, boolean successful, Exception e,
+                            String inputTableName) {
+      endFunction(function, new MetaStoreEndFunctionContext(successful, e, inputTableName));
+    }
+
+    private void endFunction(String function, MetaStoreEndFunctionContext context) {
+      com.codahale.metrics.Timer.Context timerContext = timerContexts.get().remove(function);
+      if (timerContext != null) {
+        timerContext.close();
+      }
+      Counter counter = Metrics.getOrCreateCounter(MetricsConstants.ACTIVE_CALLS + function);
+      if (counter != null) counter.dec();
+
+      for (MetaStoreEndFunctionListener listener : endFunctionListeners) {
+        listener.onEndFunction(function, context);
+      }
+    }
+
+    @Override
+    public fb_status getStatus() {
+      return fb_status.ALIVE;
+    }
+
+    @Override
+    public void shutdown() {
+      cleanupRawStore();
+      PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics();
+    }
+
+    @Override
+    public AbstractMap<String, Long> getCounters() {
+      AbstractMap<String, Long> counters = super.getCounters();
+
+      // Allow endFunctionListeners to add any counters they have collected
+      if (endFunctionListeners != null) {
+        for (MetaStoreEndFunctionListener listener : endFunctionListeners) {
+          listener.exportCounters(counters);
+        }
+      }
+
+      return counters;
+    }
+
+    private void create_database_core(RawStore ms, final Database db)
+        throws AlreadyExistsException, InvalidObjectException, MetaException {
+      if (!MetaStoreUtils.validateName(db.getName(), null)) {
+        throw new InvalidObjectException(db.getName() + " is not a valid database name");
+      }
+
+      if (null == db.getLocationUri()) {
+        db.setLocationUri(wh.getDefaultDatabasePath(db.getName()).toString());
+      } else {
+        db.setLocationUri(wh.getDnsPath(new Path(db.getLocationUri())).toString());
+      }
+
+      Path dbPath = new Path(db.getLocationUri());
+      boolean success = false;
+      boolean madeDir = false;
+      Map<String, String> transactionalListenersResponses = Collections.emptyMap();
+      try {
+        firePreEvent(new PreCreateDatabaseEvent(db, this));
+        if (!wh.isDir(dbPath)) {
+          if (!wh.mkdirs(dbPath)) {
+            throw new MetaException("Unable to create database path " + dbPath +
+                ", failed to create database " + db.getName());
+          }
+          madeDir = true;
+        }
+
+        ms.openTransaction();
+        ms.createDatabase(db);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenersResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_DATABASE,
+                                                    new CreateDatabaseEvent(db, true, this));
+        }
+
+        success = ms.commitTransaction();
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+          if (madeDir) {
+            wh.deleteDir(dbPath, true);
+          }
+        }
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_DATABASE,
+                                                new CreateDatabaseEvent(db, success, this),
+                                                null,
+                                                transactionalListenersResponses, ms);
+        }
+      }
+    }
+
+    @Override
+    public void create_database(final Database db)
+        throws AlreadyExistsException, InvalidObjectException, MetaException {
+      startFunction("create_database", ": " + db.toString());
+      boolean success = false;
+      Exception ex = null;
+      try {
+        try {
+          if (null != get_database_core(db.getName())) {
+            throw new AlreadyExistsException("Database " + db.getName() + " already exists");
+          }
+        } catch (NoSuchObjectException e) {
+          // expected
+        }
+
+        if (TEST_TIMEOUT_ENABLED) {
+          try {
+            Thread.sleep(TEST_TIMEOUT_VALUE);
+          } catch (InterruptedException e) {
+            // do nothing
+          }
+          Deadline.checkTimeout();
+        }
+        create_database_core(getMS(), db);
+        success = true;
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof InvalidObjectException) {
+          throw (InvalidObjectException) e;
+        } else if (e instanceof AlreadyExistsException) {
+          throw (AlreadyExistsException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        endFunction("create_database", success, ex);
+      }
+    }
+
+    @Override
+    public Database get_database(final String name) throws NoSuchObjectException, MetaException {
+      startFunction("get_database", ": " + name);
+      Database db = null;
+      Exception ex = null;
+      try {
+        db = get_database_core(name);
+        firePreEvent(new PreReadDatabaseEvent(db, this));
+      } catch (MetaException|NoSuchObjectException e) {
+        ex = e;
+        throw e;
+      } finally {
+        endFunction("get_database", db != null, ex);
+      }
+      return db;
+    }
+
+    @Override
+    public Database get_database_core(final String name) throws NoSuchObjectException,
+        MetaException {
+      Database db = null;
+      try {
+        db = getMS().getDatabase(name);
+      } catch (MetaException | NoSuchObjectException e) {
+        throw e;
+      } catch (Exception e) {
+        assert (e instanceof RuntimeException);
+        throw (RuntimeException) e;
+      }
+      return db;
+    }
+
+    @Override
+    public void alter_database(final String dbName, final Database newDB) throws TException {
+      startFunction("alter_database" + dbName);
+      boolean success = false;
+      Exception ex = null;
+
+      // Perform the same URI normalization as create_database_core.
+      if (newDB.getLocationUri() != null) {
+        newDB.setLocationUri(wh.getDnsPath(new Path(newDB.getLocationUri())).toString());
+      }
+
+      try {
+        Database oldDB = get_database_core(dbName);
+        if (oldDB == null) {
+          throw new MetaException("Could not alter database \"" + dbName + "\". Could not retrieve old definition.");
+        }
+        firePreEvent(new PreAlterDatabaseEvent(oldDB, newDB, this));
+        getMS().alterDatabase(dbName, newDB);
+        success = true;
+      } catch (Exception e) {
+        ex = e;
+        rethrowException(e);
+      } finally {
+        endFunction("alter_database", success, ex);
+      }
+    }
+
+    private void drop_database_core(RawStore ms,
+        final String name, final boolean deleteData, final boolean cascade)
+        throws NoSuchObjectException, InvalidOperationException, MetaException,
+        IOException, InvalidObjectException, InvalidInputException {
+      boolean success = false;
+      Database db = null;
+      List<Path> tablePaths = new ArrayList<>();
+      List<Path> partitionPaths = new ArrayList<>();
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+      try {
+        ms.openTransaction();
+        db = ms.getDatabase(name);
+
+        firePreEvent(new PreDropDatabaseEvent(db, this));
+
+        List<String> allTables = get_all_tables(db.getName());
+        List<String> allFunctions = get_functions(db.getName(), "*");
+
+        if (!cascade) {
+          if (!allTables.isEmpty()) {
+            throw new InvalidOperationException(
+                "Database " + db.getName() + " is not empty. One or more tables exist.");
+          }
+          if (!allFunctions.isEmpty()) {
+            throw new InvalidOperationException(
+                "Database " + db.getName() + " is not empty. One or more functions exist.");
+          }
+        }
+        Path path = new Path(db.getLocationUri()).getParent();
+        if (!wh.isWritable(path)) {
+          throw new MetaException("Database not dropped since " +
+              path + " is not writable by " +
+              SecurityUtils.getUser());
+        }
+
+        Path databasePath = wh.getDnsPath(wh.getDatabasePath(db));
+
+        // drop any functions before dropping db
+        for (String funcName : allFunctions) {
+          drop_function(name, funcName);
+        }
+
+        // drop tables before dropping db
+        int tableBatchSize = MetastoreConf.getIntVar(conf,
+            ConfVars.BATCH_RETRIEVE_MAX);
+
+        int startIndex = 0;
+        // retrieve the tables from the metastore in batches to alleviate memory constraints
+        while (startIndex < allTables.size()) {
+          int endIndex = Math.min(startIndex + tableBatchSize, allTables.size());
+
+          List<Table> tables;
+          try {
+            tables = ms.getTableObjectsByName(name, allTables.subList(startIndex, endIndex));
+          } catch (UnknownDBException e) {
+            throw new MetaException(e.getMessage());
+          }
+
+          if (tables != null && !tables.isEmpty()) {
+            for (Table table : tables) {
+
+              // If the table is not external and it might not be in a subdirectory of the database
+              // add it's locations to the list of paths to delete
+              Path tablePath = null;
+              if (table.getSd().getLocation() != null && !isExternal(table)) {
+                tablePath = wh.getDnsPath(new Path(table.getSd().getLocation()));
+                if (!wh.isWritable(tablePath.getParent())) {
+                  throw new MetaException("Database metadata not deleted since table: " +
+                      table.getTableName() + " has a parent location " + tablePath.getParent() +
+                      " which is not writable by " + SecurityUtils.getUser());
+                }
+
+                if (!isSubdirectory(databasePath, tablePath)) {
+                  tablePaths.add(tablePath);
+                }
+              }
+
+              // For each partition in each table, drop the partitions and get a list of
+              // partitions' locations which might need to be deleted
+              partitionPaths = dropPartitionsAndGetLocations(ms, name, table.getTableName(),
+                  tablePath, table.getPartitionKeys(), deleteData && !isExternal(table));
+
+              // Drop the table but not its data
+              drop_table(name, table.getTableName(), false);
+            }
+
+            startIndex = endIndex;
+          }
+        }
+
+        if (ms.dropDatabase(name)) {
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_DATABASE,
+                                                      new DropDatabaseEvent(db, true, this));
+          }
+
+          success = ms.commitTransaction();
+        }
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+        } else if (deleteData) {
+          // Delete the data in the partitions which have other locations
+          deletePartitionData(partitionPaths);
+          // Delete the data in the tables which have other locations
+          for (Path tablePath : tablePaths) {
+            deleteTableData(tablePath);
+          }
+          // Delete the data in the database
+          try {
+            wh.deleteDir(new Path(db.getLocationUri()), true);
+          } catch (Exception e) {
+            LOG.error("Failed to delete database directory: " + db.getLocationUri() +
+                " " + e.getMessage());
+          }
+          // it is not a terrible thing even if the data is not deleted
+        }
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_DATABASE,
+                                                new DropDatabaseEvent(db, success, this),
+                                                null,
+                                                transactionalListenerResponses, ms);
+        }
+      }
+    }
+
+    /**
+     * Returns a BEST GUESS as to whether or not other is a subdirectory of parent. It does not
+     * take into account any intricacies of the underlying file system, which is assumed to be
+     * HDFS. This should not return any false positives, but may return false negatives.
+     *
+     * @param parent
+     * @param other
+     * @return
+     */
+    private boolean isSubdirectory(Path parent, Path other) {
+      return other.toString().startsWith(parent.toString().endsWith(Path.SEPARATOR) ?
+          parent.toString() : parent.toString() + Path.SEPARATOR);
+    }
+
+    @Override
+    public void drop_database(final String dbName, final boolean deleteData, final boolean cascade)
+        throws NoSuchObjectException, InvalidOperationException, MetaException {
+
+      startFunction("drop_database", ": " + dbName);
+      if (DEFAULT_DATABASE_NAME.equalsIgnoreCase(dbName)) {
+        endFunction("drop_database", false, null);
+        throw new MetaException("Can not drop default database");
+      }
+
+      boolean success = false;
+      Exception ex = null;
+      try {
+        drop_database_core(getMS(), dbName, deleteData, cascade);
+        success = true;
+      } catch (IOException e) {
+        ex = e;
+        throw new MetaException(e.getMessage());
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof InvalidOperationException) {
+          throw (InvalidOperationException) e;
+        } else if (e instanceof NoSuchObjectException) {
+          throw (NoSuchObjectException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        endFunction("drop_database", success, ex);
+      }
+    }
+
+    @Override
+    public List<String> get_databases(final String pattern) throws MetaException {
+      startFunction("get_databases", ": " + pattern);
+
+      List<String> ret = null;
+      Exception ex = null;
+      try {
+        ret = getMS().getDatabases(pattern);
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        endFunction("get_databases", ret != null, ex);
+      }
+      return ret;
+    }
+
+    @Override
+    public List<String> get_all_databases() throws MetaException {
+      startFunction("get_all_databases");
+
+      List<String> ret = null;
+      Exception ex = null;
+      try {
+        ret = getMS().getAllDatabases();
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        endFunction("get_all_databases", ret != null, ex);
+      }
+      return ret;
+    }
+
+    private void create_type_core(final RawStore ms, final Type type)
+        throws AlreadyExistsException, MetaException, InvalidObjectException {
+      if (!MetaStoreUtils.validateName(type.getName(), null)) {
+        throw new InvalidObjectException("Invalid type name");
+      }
+
+      boolean success = false;
+      try {
+        ms.openTransaction();
+        if (is_type_exists(ms, type.getName())) {
+          throw new AlreadyExistsException("Type " + type.getName() + " already exists");
+        }
+        ms.createType(type);
+        success = ms.commitTransaction();
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+        }
+      }
+    }
+
+    @Override
+    public boolean create_type(final Type type) throws AlreadyExistsException,
+        MetaException, InvalidObjectException {
+      startFunction("create_type", ": " + type.toString());
+      boolean success = false;
+      Exception ex = null;
+      try {
+        create_type_core(getMS(), type);
+        success = true;
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof InvalidObjectException) {
+          throw (InvalidObjectException) e;
+        } else if (e instanceof AlreadyExistsException) {
+          throw (AlreadyExistsException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        endFunction("create_type", success, ex);
+      }
+
+      return success;
+    }
+
+    @Override
+    public Type get_type(final String name) throws MetaException, NoSuchObjectException {
+      startFunction("get_type", ": " + name);
+
+      Type ret = null;
+      Exception ex = null;
+      try {
+        ret = getMS().getType(name);
+        if (null == ret) {
+          throw new NoSuchObjectException("Type \"" + name + "\" not found.");
+        }
+      } catch (Exception e) {
+        ex = e;
+        throwMetaException(e);
+      } finally {
+        endFunction("get_type", ret != null, ex);
+      }
+      return ret;
+    }
+
+    private boolean is_type_exists(RawStore ms, String typeName)
+        throws MetaException {
+      return (ms.getType(typeName) != null);
+    }
+
+    @Override
+    public boolean drop_type(final String name) throws MetaException, NoSuchObjectException {
+      startFunction("drop_type", ": " + name);
+
+      boolean success = false;
+      Exception ex = null;
+      try {
+        // TODO:pc validate that there are no types that refer to this
+        success = getMS().dropType(name);
+      } catch (Exception e) {
+        ex = e;
+        throwMetaException(e);
+      } finally {
+        endFunction("drop_type", success, ex);
+      }
+      return success;
+    }
+
+    @Override
+    public Map<String, Type> get_type_all(String name) throws MetaException {
+      // TODO Auto-generated method stub
+      startFunction("get_type_all", ": " + name);
+      endFunction("get_type_all", false, null);
+      throw new MetaException("Not yet implemented");
+    }
+
+    private void create_table_core(final RawStore ms, final Table tbl,
+        final EnvironmentContext envContext)
+            throws AlreadyExistsException, MetaException,
+            InvalidObjectException, NoSuchObjectException {
+      create_table_core(ms, tbl, envContext, null, null, null, null);
+    }
+
+    private void create_table_core(final RawStore ms, final Table tbl,
+        final EnvironmentContext envContext, List<SQLPrimaryKey> primaryKeys,
+        List<SQLForeignKey> foreignKeys, List<SQLUniqueConstraint> uniqueConstraints,
+        List<SQLNotNullConstraint> notNullConstraints)
+        throws AlreadyExistsException, MetaException,
+        InvalidObjectException, NoSuchObjectException {
+      if (!MetaStoreUtils.validateName(tbl.getTableName(), conf)) {
+        throw new InvalidObjectException(tbl.getTableName()
+            + " is not a valid object name");
+      }
+      String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols());
+      if (validate != null) {
+        throw new InvalidObjectException("Invalid column " + validate);
+      }
+      if (tbl.getPartitionKeys() != null) {
+        validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys());
+        if (validate != null) {
+          throw new InvalidObjectException("Invalid partition column " + validate);
+        }
+      }
+      SkewedInfo skew = tbl.getSd().getSkewedInfo();
+      if (skew != null) {
+        validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames());
+        if (validate != null) {
+          throw new InvalidObjectException("Invalid skew column " + validate);
+        }
+        validate = MetaStoreUtils.validateSkewedColNamesSubsetCol(
+            skew.getSkewedColNames(), tbl.getSd().getCols());
+        if (validate != null) {
+          throw new InvalidObjectException("Invalid skew column " + validate);
+        }
+      }
+
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+      Path tblPath = null;
+      boolean success = false, madeDir = false;
+      try {
+        firePreEvent(new PreCreateTableEvent(tbl, this));
+
+        ms.openTransaction();
+
+        Database db = ms.getDatabase(tbl.getDbName());
+        if (db == null) {
+          throw new NoSuchObjectException("The database " + tbl.getDbName() + " does not exist");
+        }
+
+        // get_table checks whether database exists, it should be moved here
+        if (is_table_exists(ms, tbl.getDbName(), tbl.getTableName())) {
+          throw new AlreadyExistsException("Table " + tbl.getTableName()
+              + " already exists");
+        }
+
+        if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
+          if (tbl.getSd().getLocation() == null
+              || tbl.getSd().getLocation().isEmpty()) {
+            tblPath = wh.getDefaultTablePath(
+                ms.getDatabase(tbl.getDbName()), tbl.getTableName());
+          } else {
+            if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
+              LOG.warn("Location: " + tbl.getSd().getLocation()
+                  + " specified for non-external table:" + tbl.getTableName());
+            }
+            tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
+          }
+          tbl.getSd().setLocation(tblPath.toString());
+        }
+
+        if (tblPath != null) {
+          if (!wh.isDir(tblPath)) {
+            if (!wh.mkdirs(tblPath)) {
+              throw new MetaException(tblPath
+                  + " is not a directory or unable to create one");
+            }
+            madeDir = true;
+          }
+        }
+        if (MetastoreConf.getBoolVar(conf, ConfVars.STATS_AUTO_GATHER) &&
+            !MetaStoreUtils.isView(tbl)) {
+          MetaStoreUtils.updateTableStatsFast(db, tbl, wh, madeDir, envContext);
+        }
+
+        // set create time
+        long time = System.currentTimeMillis() / 1000;
+        tbl.setCreateTime((int) time);
+        if (tbl.getParameters() == null ||
+            tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
+          tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
+        }
+        if (primaryKeys == null && foreignKeys == null
+                && uniqueConstraints == null && notNullConstraints == null) {
+          ms.createTable(tbl);
+        } else {
+          // Set constraint name if null before sending to listener
+          List<String> constraintNames = ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys,
+              uniqueConstraints, notNullConstraints);
+          int primaryKeySize = 0;
+          if (primaryKeys != null) {
+            primaryKeySize = primaryKeys.size();
+            for (int i = 0; i < primaryKeys.size(); i++) {
+              if (primaryKeys.get(i).getPk_name() == null) {
+                primaryKeys.get(i).setPk_name(constraintNames.get(i));
+              }
+            }
+          }
+          int foreignKeySize = 0;
+          if (foreignKeys != null) {
+            foreignKeySize = foreignKeys.size();
+            for (int i = 0; i < foreignKeySize; i++) {
+              if (foreignKeys.get(i).getFk_name() == null) {
+                foreignKeys.get(i).setFk_name(constraintNames.get(primaryKeySize + i));
+              }
+            }
+          }
+          int uniqueConstraintSize = 0;
+          if (uniqueConstraints != null) {
+            uniqueConstraintSize = uniqueConstraints.size();
+            for (int i = 0; i < uniqueConstraintSize; i++) {
+              if (uniqueConstraints.get(i).getUk_name() == null) {
+                uniqueConstraints.get(i).setUk_name(constraintNames.get(primaryKeySize + foreignKeySize + i));
+              }
+            }
+          }
+          if (notNullConstraints != null) {
+            for (int i = 0; i < notNullConstraints.size(); i++) {
+              if (notNullConstraints.get(i).getNn_name() == null) {
+                notNullConstraints.get(i).setNn_name(constraintNames.get(primaryKeySize + foreignKeySize + uniqueConstraintSize + i));
+              }
+            }
+          }
+        }
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses = MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+              EventType.CREATE_TABLE, new CreateTableEvent(tbl, true, this), envContext);
+          if (primaryKeys != null && !primaryKeys.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_PRIMARYKEY,
+                new AddPrimaryKeyEvent(primaryKeys, true, this), envContext);
+          }
+          if (foreignKeys != null && !foreignKeys.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_FOREIGNKEY,
+                new AddForeignKeyEvent(foreignKeys, true, this), envContext);
+          }
+          if (uniqueConstraints != null && !uniqueConstraints.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_UNIQUECONSTRAINT,
+                new AddUniqueConstraintEvent(uniqueConstraints, true, this), envContext);
+          }
+          if (notNullConstraints != null && !notNullConstraints.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ADD_NOTNULLCONSTRAINT,
+                new AddNotNullConstraintEvent(notNullConstraints, true, this), envContext);
+          }
+        }
+
+        success = ms.commitTransaction();
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+          if (madeDir) {
+            wh.deleteDir(tblPath, true);
+          }
+        }
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners, EventType.CREATE_TABLE,
+              new CreateTableEvent(tbl, success, this), envContext, transactionalListenerResponses, ms);
+          if (primaryKeys != null && !primaryKeys.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_PRIMARYKEY,
+                new AddPrimaryKeyEvent(primaryKeys, success, this), envContext);
+          }
+          if (foreignKeys != null && !foreignKeys.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_FOREIGNKEY,
+                new AddForeignKeyEvent(foreignKeys, success, this), envContext);
+          }
+          if (uniqueConstraints != null && !uniqueConstraints.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_UNIQUECONSTRAINT,
+                new AddUniqueConstraintEvent(uniqueConstraints, success, this), envContext);
+          }
+          if (notNullConstraints != null && !notNullConstraints.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_NOTNULLCONSTRAINT,
+                new AddNotNullConstraintEvent(notNullConstraints, success, this), envContext);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void create_table(final Table tbl) throws AlreadyExistsException,
+        MetaException, InvalidObjectException {
+      create_table_with_environment_context(tbl, null);
+    }
+
+    @Override
+    public void create_table_with_environment_context(final Table tbl,
+        final EnvironmentContext envContext)
+        throws AlreadyExistsException, MetaException, InvalidObjectException {
+      startFunction("create_table", ": " + tbl.toString());
+      boolean success = false;
+      Exception ex = null;
+      try {
+        create_table_core(getMS(), tbl, envContext);
+        success = true;
+      } catch (NoSuchObjectException e) {
+        ex = e;
+        throw new InvalidObjectException(e.getMessage());
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof InvalidObjectException) {
+          throw (InvalidObjectException) e;
+        } else if (e instanceof AlreadyExistsException) {
+          throw (AlreadyExistsException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        endFunction("create_table", success, ex, tbl.getTableName());
+      }
+    }
+
+    @Override
+    public void create_table_with_constraints(final Table tbl,
+        final List<SQLPrimaryKey> primaryKeys, final List<SQLForeignKey> foreignKeys,
+        List<SQLUniqueConstraint> uniqueConstraints,
+        List<SQLNotNullConstraint> notNullConstraints)
+        throws AlreadyExistsException, MetaException, InvalidObjectException {
+      startFunction("create_table", ": " + tbl.toString());
+      boolean success = false;
+      Exception ex = null;
+      try {
+        create_table_core(getMS(), tbl, null, primaryKeys, foreignKeys,
+            uniqueConstraints, notNullConstraints);
+        success = true;
+      } catch (NoSuchObjectException e) {
+        ex = e;
+        throw new InvalidObjectException(e.getMessage());
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof InvalidObjectException) {
+          throw (InvalidObjectException) e;
+        } else if (e instanceof AlreadyExistsException) {
+          throw (AlreadyExistsException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        endFunction("create_table", success, ex, tbl.getTableName());
+      }
+    }
+
+    @Override
+    public void drop_constraint(DropConstraintRequest req)
+        throws MetaException, InvalidObjectException {
+      String dbName = req.getDbname();
+      String tableName = req.getTablename();
+      String constraintName = req.getConstraintname();
+      startFunction("drop_constraint", ": " + constraintName);
+      boolean success = false;
+      Exception ex = null;
+      RawStore ms = getMS();
+      try {
+        ms.openTransaction();
+        ms.dropConstraint(dbName, tableName, constraintName);
+        if (transactionalListeners.size() > 0) {
+          DropConstraintEvent dropConstraintEvent = new DropConstraintEvent(dbName,
+              tableName, constraintName, true, this);
+          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+            transactionalListener.onDropConstraint(dropConstraintEvent);
+          }
+        }
+        success = ms.commitTransaction();
+      } catch (NoSuchObjectException e) {
+        ex = e;
+        throw new InvalidObjectException(e.getMessage());
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+        } else {
+          for (MetaStoreEventListener listener : listeners) {
+            DropConstraintEvent dropConstraintEvent = new DropConstraintEvent(dbName,
+                tableName, constraintName, true, this);
+            listener.onDropConstraint(dropConstraintEvent);
+          }
+        }
+        endFunction("drop_constraint", success, ex, constraintName);
+      }
+    }
+
+    @Override
+    public void add_primary_key(AddPrimaryKeyRequest req)
+      throws MetaException, InvalidObjectException {
+      List<SQLPrimaryKey> primaryKeyCols = req.getPrimaryKeyCols();
+      String constraintName = (primaryKeyCols != null && primaryKeyCols.size() > 0) ?
+        primaryKeyCols.get(0).getPk_name() : "null";
+      startFunction("add_primary_key", ": " + constraintName);
+      boolean success = false;
+      Exception ex = null;
+      RawStore ms = getMS();
+      try {
+        ms.openTransaction();
+        List<String> constraintNames = ms.addPrimaryKeys(primaryKeyCols);
+        // Set primary key name if null before sending to listener
+        if (primaryKeyCols != null) {
+          for (int i = 0; i < primaryKeyCols.size(); i++) {
+            if (primaryKeyCols.get(i).getPk_name() == null) {
+              primaryKeyCols.get(i).setPk_name(constraintNames.get(i));
+            }
+          }
+        }
+        if (transactionalListeners.size() > 0) {
+          if (primaryKeyCols != null && primaryKeyCols.size() > 0) {
+            AddPrimaryKeyEvent addPrimaryKeyEvent = new AddPrimaryKeyEvent(primaryKeyCols, true, this);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onAddPrimaryKey(addPrimaryKeyEvent);
+            }
+          }
+        }
+        success = ms.commitTransaction();
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof InvalidObjectException) {
+          throw (InvalidObjectException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+        } else if (primaryKeyCols != null && primaryKeyCols.size() > 0) {
+          for (MetaStoreEventListener listener : listeners) {
+            AddPrimaryKeyEvent addPrimaryKeyEvent = new AddPrimaryKeyEvent(primaryKeyCols, true, this);
+            listener.onAddPrimaryKey(addPrimaryKeyEvent);
+          }
+        }
+        endFunction("add_primary_key", success, ex, constraintName);
+      }
+    }
+
+    @Override
+    public void add_foreign_key(AddForeignKeyRequest req)
+      throws MetaException, InvalidObjectException {
+      List<SQLForeignKey> foreignKeyCols = req.getForeignKeyCols();
+      String constraintName = (foreignKeyCols != null && foreignKeyCols.size() > 0) ?
+        foreignKeyCols.get(0).getFk_name() : "null";
+      startFunction("add_foreign_key", ": " + constraintName);
+      boolean success = false;
+      Exception ex = null;
+      RawStore ms = getMS();
+      try {
+        ms.openTransaction();
+        List<String> constraintNames = ms.addForeignKeys(foreignKeyCols);
+        // Set foreign key name if null before sending to listener
+        if (foreignKeyCols != null) {
+          for (int i = 0; i < foreignKeyCols.size(); i++) {
+            if (foreignKeyCols.get(i).getFk_name() == null) {
+              foreignKeyCols.get(i).setFk_name(constraintNames.get(i));
+            }
+          }
+        }
+        if (transactionalListeners.size() > 0) {
+          if (foreignKeyCols != null && foreignKeyCols.size() > 0) {
+            AddForeignKeyEvent addForeignKeyEvent = new AddForeignKeyEvent(foreignKeyCols, true, this);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onAddForeignKey(addForeignKeyEvent);
+            }
+          }
+        }
+        success = ms.commitTransaction();
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof InvalidObjectException) {
+          throw (InvalidObjectException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+        } else if (foreignKeyCols != null && foreignKeyCols.size() > 0) {
+          for (MetaStoreEventListener listener : listeners) {
+            AddForeignKeyEvent addForeignKeyEvent = new AddForeignKeyEvent(foreignKeyCols, true, this);
+            listener.onAddForeignKey(addForeignKeyEvent);
+          }
+        }
+        endFunction("add_foreign_key", success, ex, constraintName);
+      }
+    }
+
+    @Override
+    public void add_unique_constraint(AddUniqueConstraintRequest req)
+      throws MetaException, InvalidObjectException {
+      List<SQLUniqueConstraint> uniqueConstraintCols = req.getUniqueConstraintCols();
+      String constraintName = (uniqueConstraintCols != null && uniqueConstraintCols.size() > 0) ?
+              uniqueConstraintCols.get(0).getUk_name() : "null";
+      startFunction("add_unique_constraint", ": " + constraintName);
+      boolean success = false;
+      Exception ex = null;
+      RawStore ms = getMS();
+      try {
+        ms.openTransaction();
+        List<String> constraintNames = ms.addUniqueConstraints(uniqueConstraintCols);
+        // Set unique constraint name if null before sending to listener
+        if (uniqueConstraintCols != null) {
+          for (int i = 0; i < uniqueConstraintCols.size(); i++) {
+            if (uniqueConstraintCols.get(i).getUk_name() == null) {
+              uniqueConstraintCols.get(i).setUk_name(constraintNames.get(i));
+            }
+          }
+        }
+        if (transactionalListeners.size() > 0) {
+          if (uniqueConstraintCols != null && uniqueConstraintCols.size() > 0) {
+            AddUniqueConstraintEvent addUniqueConstraintEvent = new AddUniqueConstraintEvent(uniqueConstraintCols, true, this);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onAddUniqueConstraint(addUniqueConstraintEvent);
+            }
+          }
+        }
+        success = ms.commitTransaction();
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof InvalidObjectException) {
+          throw (InvalidObjectException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+        } else if (uniqueConstraintCols != null && uniqueConstraintCols.size() > 0) {
+          for (MetaStoreEventListener listener : listeners) {
+            AddUniqueConstraintEvent addUniqueConstraintEvent = new AddUniqueConstraintEvent(uniqueConstraintCols, true, this);
+            listener.onAddUniqueConstraint(addUniqueConstraintEvent);
+          }
+        }
+        endFunction("add_unique_constraint", success, ex, constraintName);
+      }
+    }
+
+    @Override
+    public void add_not_null_constraint(AddNotNullConstraintRequest req)
+      throws MetaException, InvalidObjectException {
+      List<SQLNotNullConstraint> notNullConstraintCols = req.getNotNullConstraintCols();
+      String constraintName = (notNullConstraintCols != null && notNullConstraintCols.size() > 0) ?
+              notNullConstraintCols.get(0).getNn_name() : "null";
+      startFunction("add_not_null_constraint", ": " + constraintName);
+      boolean success = false;
+      Exception ex = null;
+      RawStore ms = getMS();
+      try {
+        ms.openTransaction();
+        List<String> constraintNames = ms.addNotNullConstraints(notNullConstraintCols);
+        // Set not null constraint name if null before sending to listener
+        if (notNullConstraintCols != null) {
+          for (int i = 0; i < notNullConstraintCols.size(); i++) {
+            if (notNullConstraintCols.get(i).getNn_name() == null) {
+              notNullConstraintCols.get(i).setNn_name(constraintNames.get(i));
+            }
+          }
+        }
+        if (transactionalListeners.size() > 0) {
+          if (notNullConstraintCols != null && notNullConstraintCols.size() > 0) {
+            AddNotNullConstraintEvent addNotNullConstraintEvent = new AddNotNullConstraintEvent(notNullConstraintCols, true, this);
+            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+              transactionalListener.onAddNotNullConstraint(addNotNullConstraintEvent);
+            }
+          }
+        }
+        success = ms.commitTransaction();
+      } catch (Exception e) {
+        ex = e;
+        if (e instanceof MetaException) {
+          throw (MetaException) e;
+        } else if (e instanceof InvalidObjectException) {
+          throw (InvalidObjectException) e;
+        } else {
+          throw newMetaException(e);
+        }
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+        } else if (notNullConstraintCols != null && notNullConstraintCols.size() > 0) {
+          for (MetaStoreEventListener listener : listeners) {
+            AddNotNullConstraintEvent addNotNullConstraintEvent = new AddNotNullConstraintEvent(notNullConstraintCols, true, this);
+            listener.onAddNotNullConstraint(addNotNullConstraintEvent);
+          }
+        }
+        endFunction("add_not_null_constraint", success, ex, constraintName);
+      }
+    }
+
+    private boolean is_table_exists(RawStore ms, String dbname, String name)
+        throws MetaException {
+      return (ms.getTable(dbname, name) != null);
+    }
+
+    private boolean drop_table_core(final RawStore ms, final String dbname, final String name,
+        final boolean deleteData, final EnvironmentContext envContext,
+        final String indexName) throws NoSuchObjectException,
+        MetaException, IOException, InvalidObjectException, InvalidInputException {
+      boolean success = false;
+      boolean isExternal = false;
+      Path tblPath = null;
+      List<Path> partPaths = null;
+      Table tbl = null;
+      boolean ifPurge = false;
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+      try {
+        ms.openTransaction();
+        // drop any partitions
+        tbl = get_table_core(dbname, name);
+        if (tbl == null) {
+          throw new NoSuchObjectException(name + " doesn't exist");
+        }
+        if (tbl.getSd() == null) {
+          throw new MetaException("Table metadata is corrupted");
+        }
+        ifPurge = isMustPurge(envContext, tbl);
+
+        firePreEvent(new PreDropTableEvent(tbl, deleteData, this));
+
+        boolean isIndexTable = isIndexTable(tbl);
+        if (indexName == null && isIndexTable) {
+          throw new RuntimeException(
+              "The table " + name + " is an index table. Please do drop index instead.");
+        }
+
+        if (!isIndexTable) {
+          try {
+            List<Index> indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
+            while (indexes != null && indexes.size() > 0) {
+              for (Index idx : indexes) {
+                this.drop_index_by_name(dbname, name, idx.getIndexName(), true);
+              }
+              indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE);
+            }
+          } catch (TException e) {
+            throw new MetaException(e.getMessage());
+          }
+        }
+        isExternal = isExternal(tbl);
+        if (tbl.getSd().getLocation() != null) {
+          tblPath = new Path(tbl.getSd().getLocation());
+          if (!wh.isWritable(tblPath.getParent())) {
+            String target = indexName == null ? "Table" : "Index table";
+            throw new MetaException(target + " metadata not deleted since " +
+                tblPath.getParent() + " is not writable by " +
+                SecurityUtils.getUser());
+          }
+        }
+
+        // Drop the partitions and get a list of locations which need to be deleted
+        partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath,
+            tbl.getPartitionKeys(), deleteData && !isExternal);
+        if (!ms.dropTable(dbname, name)) {
+          String tableName = dbname + "." + name;
+          throw new MetaException(indexName == null ? "Unable to drop table " + tableName:
+              "Unable to drop index table " + tableName + " for index " + indexName);
+        } else {
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_TABLE,
+                                                      new DropTableEvent(tbl, true, deleteData, this),
+                                                      envContext);
+          }
+          success = ms.commitTransaction();
+        }
+      } finally {
+        if (!success) {
+          ms.rollbackTransaction();
+        } else if (deleteData && !isExternal) {
+          // Data needs deletion. Check if trash may be skipped.
+          // Delete the data in the partitions which have other locations
+          deletePartitionData(partPaths, ifPurge);
+          // Delete the data in the table
+          deleteTableData(tblPath, ifPurge);
+          // ok even if the data is not deleted
+        }
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_TABLE,
+                                                new DropTableEvent(tbl, success, deleteData, this),
+                                                envContext,
+                                                transactionalListenerResponses, ms);
+        }
+      }
+      return success;
+    }
+
+    /**
+     * Deletes the data in a table's location, if it fails logs an error
+     *
+     * @param tablePath
+     */
+    private void deleteTableData(Path tablePath) {
+      deleteTableData(tablePath, false);
+    }
+
+    /**
+     * Deletes the data in a table's location, if it fails logs an error
+     *
+     * @param tablePath
+     * @param ifPurge completely purge the table (skipping trash) while removing
+     *                data from warehouse
+     */
+    private void deleteTableData(Path tablePath, boolean ifPurge) {
+
+      if (tablePath != null) {
+        try {
+          wh.deleteDir(tablePath, true, ifPurge);
+        } catch (Exception e) {
+          LOG.error("Failed to delete table directory: " + tablePath +
+              " " + e.getMessage());
+        }
+      }
+    }
+
+    /**
+     * Give a list of partitions' locations, tries to delete each one
+     * and for each that fails logs an error.
+     *
+     * @param partPaths
+     */
+    private void deletePartitionData(List<Path> partPaths) {
+      deletePartitionData(partPaths, false);
+    }
+
+    /**
+    * Give a list of partitions' locations, tries to delete each one
+    * and for each that fails logs an error.
+    *
+    * @param partPaths
+    * @param ifPurge completely purge the partition (skipping trash) while
+    *                removing data from warehouse
+    */
+    private void deletePartitionData(List<Path> partPaths, boolean ifPurge) {
+      if (partPaths != null && !partPaths.isEmpty()) {
+        for (Path partPath : partPaths) {
+          try {
+            wh.deleteDir(partPath, true, ifPurge);
+          } catch (Exception e) {
+            LOG.error("Failed to delete partition directory: " + partPath +
+                " " + e.getMessage());
+          }
+        }
+      }
+    }
+
+    /**
+     * Retrieves the partitions specified by partitionKeys. If checkLocation, for locations of
+     * partitions which may not be subdirectories of tablePath checks to make the locations are
+     * writable.
+     *
+     * Drops the metadata for each partition.
+     *
+     * Provides a list of locations of partitions which may not be subdirectories of tablePath.
+     *
+     * @param ms
+     * @param dbName
+     * @param tableName
+     * @param tablePath
+     * @param partitionKeys
+     * @param checkLocation
+     * @return
+     * @throws MetaException
+     * @throws IOException
+     * @throws InvalidInputException
+     * @throws InvalidObjectException
+     * @throws NoSuchObjectException
+     */
+    private List<Path> dropPartitionsAndGetLocations(RawStore ms, String dbName,
+      String tableName, Path tablePath, List<FieldSchema> partitionKeys, boolean checkLocation)
+      throws MetaException, IOException, NoSuchObjectException, InvalidObjectException,
+      InvalidInputException {
+      int partitionBatchSize = MetastoreConf.getIntVar(conf,
+          ConfVars.BATCH_RETRIEVE_MAX);
+      Path tableDnsPath = null;
+      if (tablePath != null) {
+        tableDnsPath = wh.getDnsPath(tablePath);
+      }
+      List<Path> partPaths = new ArrayList<>();
+      Table tbl = ms.getTable(dbName, tableName);
+
+      // call dropPartition on each of the table's partitions to follow the
+      // procedure for cleanly dropping partitions.
+      while (true) {
+        List<Partition> partsToDelete = ms.getPartitions(dbName, tableName, partitionBatchSize);
+        if (partsToDelete == null || partsToDelete.isEmpty()) {
+          break;
+        }
+        List<String> partNames = new ArrayList<>();
+        for (Partition part : partsToDelete) {
+          if (checkLocation && part.getSd() != null &&
+              part.getSd().getLocation() != null) {
+
+            Path partPath = wh.getDnsPath(new Path(part.getSd().getLocation()));
+            if (tableDnsPath == null ||
+                (partPath != null && !isSubdirectory(tableDnsPath, partPath))) {
+              if (!wh.isWritable(partPath.getParent())) {
+                throw new MetaException("Table metadata not deleted since the partition " +
+                    Warehouse.makePartName(partitionKeys, part.getValues()) +
+                    " has parent location " + partPath.getParent() + " which is not writable " +
+                    "by " + SecurityUtils.getUser());
+              }
+              partPaths.add(partPath);
+            }
+          }
+          partNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()));
+        }
+        for (MetaStoreEventListener listener : listeners) {
+          //No drop part listener events fired for public listeners historically, for drop table case.
+          //Limiting to internal listeners for now, to avoid unexpected calls for public listeners.
+          if (listener instanceof HMSMetricsListener) {
+            for (@SuppressWarnings("unused") Partition part : partsToDelete) {
+              listener.onDropPartition(null);
+            }
+          }
+        }
+        ms.dropPartitions(dbName, tableName, partNames);
+      }
+
+      return partPaths;
+    }
+
+    @Override
+    public void drop_table(final String dbname, final String name, final boolean deleteData)
+        throws NoSuchObjectException, MetaException {
+      drop_table_with_environment_context(dbname, name, deleteData, null);
+    }
+
+    @Override
+    public void drop_table_with_environment_context(final String dbname, final String name,
+        final boolean deleteData, final EnvironmentContext envContext)
+        throws NoSuchObjectException, MetaException {
+      startTableFunction("drop_table", dbname, name);
+
+      boolean success = false;
+      Exception ex = null;
+      try {
+        success = drop_table_core(getMS(), dbname, name, deleteData, envContext, null);
+      } catch (IOException e) {
+        ex = e;
+        throw new MetaException(e.getMessage());
+      } catch (Exception e) {
+        ex = e;
+        throwMetaException(e);
+      } finally {
+        endFunction("drop_table", success, ex, name);
+      }
+
+    }
+
+    private void updateStatsForTruncate(Map<String,String> props, EnvironmentContext environmentContext) {
+      if (null == props) {
+        return;
+      }
+      for (String stat : StatsSetupConst.supportedStats) {
+        String statVal = props.get(stat);
+        if (statVal != null) {
+          //In the case of truncate table, we set the stats to be 0.
+          props.put(stat, "0");
+        }
+      }
+      //first set basic stats to true
+      StatsSetupConst.setBasicStatsState(props, StatsSetupConst.TRUE);
+      environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
+      //then invalidate column stats
+      StatsSetupConst.clearColumnStatsState(props);
+      return;
+    }
+
+    private void alterPartitionForTruncate(final RawStore ms,
+                                           final String dbName,
+                                           final String tableName,
+                                           final Table table,
+                                           final Partition partition) throws Exception {
+      EnvironmentContext environmentContext = new EnvironmentContext();
+      updateStatsForTruncate(partition.getParameters(), environmentContext);
+
+      if (!transactionalListeners.isEmpty()) {
+        MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                EventType.ALTER_PARTITION,
+                new AlterPartitionEvent(partition, partition, table, true, true, this));
+      }
+
+      if (!listeners.isEmpty()) {
+        MetaStoreListenerNotifier.notifyEvent(listeners,
+                EventType.ALTER_PARTITION,
+                new AlterPartitionEvent(partition, partition, table, true, true, this));
+      }
+
+      alterHandler.alterPartition(ms, wh, dbName, tableName, null, partition, environmentContext, this);
+    }
+
+    private void alterTableStatsForTruncate(final RawStore ms,
+                                            final String dbName,
+                                            final String tableName,
+                                            final Table table,
+                                            final List<String> partNames) throws Exception {
+      if (partNames == null) {
+        if (0 != table.getPartitionKeysSize()) {
+          for (Partition partition : ms.getPartitions(dbName, tableName, Integer.MAX_VALUE)) {
+            alterPartitionForTruncate(ms, dbName, tableName, table, partition);
+        

<TRUNCATED>