You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:35 UTC

[26/50] [abbrv] hive git commit: HIVE-19416 : merge master into branch (Sergey Shelukhin) 0719

http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 0000000,92e2805..70edb96
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@@ -1,0 -1,3422 +1,3597 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.metastore;
+ 
+ import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependCatalogToDbName;
+ 
+ import java.io.IOException;
+ import java.lang.reflect.Constructor;
+ import java.lang.reflect.InvocationHandler;
+ import java.lang.reflect.InvocationTargetException;
+ import java.lang.reflect.Method;
+ import java.lang.reflect.Proxy;
+ import java.net.InetAddress;
+ import java.net.URI;
+ import java.net.UnknownHostException;
+ import java.nio.ByteBuffer;
+ import java.security.PrivilegedExceptionAction;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.NoSuchElementException;
+ import java.util.Random;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+ import javax.security.auth.login.LoginException;
+ 
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.classification.InterfaceStability;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.common.StatsSetupConst;
+ import org.apache.hadoop.hive.common.ValidTxnList;
+ import org.apache.hadoop.hive.common.ValidWriteIdList;
+ import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.hooks.URIResolverHook;
+ import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+ import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+ import org.apache.hadoop.hive.metastore.utils.ObjectPair;
+ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.util.ReflectionUtils;
+ import org.apache.hadoop.util.StringUtils;
+ import org.apache.thrift.TApplicationException;
+ import org.apache.thrift.TException;
+ import org.apache.thrift.protocol.TBinaryProtocol;
+ import org.apache.thrift.protocol.TCompactProtocol;
+ import org.apache.thrift.protocol.TProtocol;
+ import org.apache.thrift.transport.TFramedTransport;
+ import org.apache.thrift.transport.TSocket;
+ import org.apache.thrift.transport.TTransport;
+ import org.apache.thrift.transport.TTransportException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.collect.Lists;
+ 
+ /**
+  * Hive Metastore Client.
+  * The public implementation of IMetaStoreClient. Methods not inherited from IMetaStoreClient
+  * are not public and can change. Hence this is marked as unstable.
+  * For users who require retry mechanism when the connection between metastore and client is
+  * broken, RetryingMetaStoreClient class should be used.
+  */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
+   /**
+    * Capabilities of the current client. If this client talks to a MetaStore server in a manner
+    * implying the usage of some expanded features that require client-side support that this client
+    * doesn't have (e.g. a getting a table of a new type), it will get back failures when the
+    * capability checking is enabled (the default).
+    */
+   public final static ClientCapabilities VERSION = new ClientCapabilities(
+       Lists.newArrayList(ClientCapability.INSERT_ONLY_TABLES));
+   // Test capability for tests.
+   public final static ClientCapabilities TEST_VERSION = new ClientCapabilities(
+       Lists.newArrayList(ClientCapability.INSERT_ONLY_TABLES, ClientCapability.TEST_CAPABILITY));
+ 
+   ThriftHiveMetastore.Iface client = null;
+   private TTransport transport = null;
+   private boolean isConnected = false;
+   private URI metastoreUris[];
+   private final HiveMetaHookLoader hookLoader;
+   protected final Configuration conf;  // Keep a copy of HiveConf so if Session conf changes, we may need to get a new HMS client.
+   private String tokenStrForm;
+   private final boolean localMetaStore;
+   private final MetaStoreFilterHook filterHook;
+   private final URIResolverHook uriResolverHook;
+   private final int fileMetadataBatchSize;
+ 
+   private Map<String, String> currentMetaVars;
+ 
+   private static final AtomicInteger connCount = new AtomicInteger(0);
+ 
+   // for thrift connects
+   private int retries = 5;
+   private long retryDelaySeconds = 0;
+   private final ClientCapabilities version;
+ 
+   //copied from ErrorMsg.java
+   private static final String REPL_EVENTS_MISSING_IN_METASTORE = "Notification events are missing in the meta store.";
 -  
++
+   static final protected Logger LOG = LoggerFactory.getLogger(HiveMetaStoreClient.class);
+ 
+   public HiveMetaStoreClient(Configuration conf) throws MetaException {
+     this(conf, null, true);
+   }
+ 
+   public HiveMetaStoreClient(Configuration conf, HiveMetaHookLoader hookLoader) throws MetaException {
+     this(conf, hookLoader, true);
+   }
+ 
+   public HiveMetaStoreClient(Configuration conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded)
+     throws MetaException {
+ 
+     this.hookLoader = hookLoader;
+     if (conf == null) {
+       conf = MetastoreConf.newMetastoreConf();
+       this.conf = conf;
+     } else {
+       this.conf = new Configuration(conf);
+     }
+     version = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) ? TEST_VERSION : VERSION;
+     filterHook = loadFilterHooks();
+     uriResolverHook = loadUriResolverHook();
+     fileMetadataBatchSize = MetastoreConf.getIntVar(
+         conf, ConfVars.BATCH_RETRIEVE_OBJECTS_MAX);
+ 
+     String msUri = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS);
+     localMetaStore = MetastoreConf.isEmbeddedMetaStore(msUri);
+     if (localMetaStore) {
+       if (!allowEmbedded) {
+         throw new MetaException("Embedded metastore is not allowed here. Please configure "
+             + ConfVars.THRIFT_URIS.toString() + "; it is currently set to [" + msUri + "]");
+       }
+       // instantiate the metastore server handler directly instead of connecting
+       // through the network
+       client = HiveMetaStore.newRetryingHMSHandler("hive client", this.conf, true);
+       isConnected = true;
+       snapshotActiveConf();
+       return;
+     }
+ 
+     // get the number retries
+     retries = MetastoreConf.getIntVar(conf, ConfVars.THRIFT_CONNECTION_RETRIES);
+     retryDelaySeconds = MetastoreConf.getTimeVar(conf,
+         ConfVars.CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
+ 
+     // user wants file store based configuration
+     if (MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS) != null) {
+       resolveUris();
+     } else {
+       LOG.error("NOT getting uris from conf");
+       throw new MetaException("MetaStoreURIs not found in conf file");
+     }
+ 
+     //If HADOOP_PROXY_USER is set in env or property,
+     //then need to create metastore client that proxies as that user.
+     String HADOOP_PROXY_USER = "HADOOP_PROXY_USER";
+     String proxyUser = System.getenv(HADOOP_PROXY_USER);
+     if (proxyUser == null) {
+       proxyUser = System.getProperty(HADOOP_PROXY_USER);
+     }
+     //if HADOOP_PROXY_USER is set, create DelegationToken using real user
+     if(proxyUser != null) {
+       LOG.info(HADOOP_PROXY_USER + " is set. Using delegation "
+           + "token for HiveMetaStore connection.");
+       try {
+         UserGroupInformation.getLoginUser().getRealUser().doAs(
+             new PrivilegedExceptionAction<Void>() {
+               @Override
+               public Void run() throws Exception {
+                 open();
+                 return null;
+               }
+             });
+         String delegationTokenPropString = "DelegationTokenForHiveMetaStoreServer";
+         String delegationTokenStr = getDelegationToken(proxyUser, proxyUser);
+         SecurityUtils.setTokenStr(UserGroupInformation.getCurrentUser(), delegationTokenStr,
+             delegationTokenPropString);
+         MetastoreConf.setVar(this.conf, ConfVars.TOKEN_SIGNATURE, delegationTokenPropString);
+         close();
+       } catch (Exception e) {
+         LOG.error("Error while setting delegation token for " + proxyUser, e);
+         if(e instanceof MetaException) {
+           throw (MetaException)e;
+         } else {
+           throw new MetaException(e.getMessage());
+         }
+       }
+     }
+     // finally open the store
+     open();
+   }
+ 
+   private void resolveUris() throws MetaException {
+     String metastoreUrisString[] =  MetastoreConf.getVar(conf,
+             ConfVars.THRIFT_URIS).split(",");
+ 
+     List<URI> metastoreURIArray = new ArrayList<URI>();
+     try {
+       int i = 0;
+       for (String s : metastoreUrisString) {
+         URI tmpUri = new URI(s);
+         if (tmpUri.getScheme() == null) {
+           throw new IllegalArgumentException("URI: " + s
+                   + " does not have a scheme");
+         }
+         if (uriResolverHook != null) {
+           metastoreURIArray.addAll(uriResolverHook.resolveURI(tmpUri));
+         } else {
+           metastoreURIArray.add(new URI(
+                   tmpUri.getScheme(),
+                   tmpUri.getUserInfo(),
+                   HadoopThriftAuthBridge.getBridge().getCanonicalHostName(tmpUri.getHost()),
+                   tmpUri.getPort(),
+                   tmpUri.getPath(),
+                   tmpUri.getQuery(),
+                   tmpUri.getFragment()
+           ));
+         }
+       }
+       metastoreUris = new URI[metastoreURIArray.size()];
+       for (int j = 0; j < metastoreURIArray.size(); j++) {
+         metastoreUris[j] = metastoreURIArray.get(j);
+       }
+ 
+       if (MetastoreConf.getVar(conf, ConfVars.THRIFT_URI_SELECTION).equalsIgnoreCase("RANDOM")) {
+         List uriList = Arrays.asList(metastoreUris);
+         Collections.shuffle(uriList);
+         metastoreUris = (URI[]) uriList.toArray();
+       }
+     } catch (IllegalArgumentException e) {
+       throw (e);
+     } catch (Exception e) {
+       MetaStoreUtils.logAndThrowMetaException(e);
+     }
+   }
+ 
+ 
+   private MetaStoreFilterHook loadFilterHooks() throws IllegalStateException {
+     Class<? extends MetaStoreFilterHook> authProviderClass = MetastoreConf.
+         getClass(conf, ConfVars.FILTER_HOOK, DefaultMetaStoreFilterHookImpl.class,
+             MetaStoreFilterHook.class);
+     String msg = "Unable to create instance of " + authProviderClass.getName() + ": ";
+     try {
+       Constructor<? extends MetaStoreFilterHook> constructor =
+           authProviderClass.getConstructor(Configuration.class);
+       return constructor.newInstance(conf);
+     } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InstantiationException | IllegalArgumentException | InvocationTargetException e) {
+       throw new IllegalStateException(msg + e.getMessage(), e);
+     }
+   }
+ 
+   //multiple clients may initialize the hook at the same time
+   synchronized private URIResolverHook loadUriResolverHook() throws IllegalStateException {
+ 
+     String uriResolverClassName =
+             MetastoreConf.getAsString(conf, ConfVars.URI_RESOLVER);
+     if (uriResolverClassName.equals("")) {
+       return null;
+     } else {
+       LOG.info("Loading uri resolver" + uriResolverClassName);
+       try {
+         Class<?> uriResolverClass = Class.forName(uriResolverClassName, true,
+                 JavaUtils.getClassLoader());
+         return (URIResolverHook) ReflectionUtils.newInstance(uriResolverClass, null);
+       } catch (Exception e) {
+         LOG.error("Exception loading uri resolver hook" + e);
+         return null;
+       }
+     }
+   }
+ 
+   /**
+    * Swaps the first element of the metastoreUris array with a random element from the
+    * remainder of the array.
+    */
+   private void promoteRandomMetaStoreURI() {
+     if (metastoreUris.length <= 1) {
+       return;
+     }
+     Random rng = new Random();
+     int index = rng.nextInt(metastoreUris.length - 1) + 1;
+     URI tmp = metastoreUris[0];
+     metastoreUris[0] = metastoreUris[index];
+     metastoreUris[index] = tmp;
+   }
+ 
+   @VisibleForTesting
+   public TTransport getTTransport() {
+     return transport;
+   }
+ 
+   @Override
+   public boolean isLocalMetaStore() {
+     return localMetaStore;
+   }
+ 
+   @Override
+   public boolean isCompatibleWith(Configuration conf) {
+     // Make a copy of currentMetaVars, there is a race condition that
+ 	// currentMetaVars might be changed during the execution of the method
+     Map<String, String> currentMetaVarsCopy = currentMetaVars;
+     if (currentMetaVarsCopy == null) {
+       return false; // recreate
+     }
+     boolean compatible = true;
+     for (ConfVars oneVar : MetastoreConf.metaVars) {
+       // Since metaVars are all of different types, use string for comparison
+       String oldVar = currentMetaVarsCopy.get(oneVar.getVarname());
+       String newVar = MetastoreConf.getAsString(conf, oneVar);
+       if (oldVar == null ||
+           (oneVar.isCaseSensitive() ? !oldVar.equals(newVar) : !oldVar.equalsIgnoreCase(newVar))) {
+         LOG.info("Mestastore configuration " + oneVar.toString() +
+             " changed from " + oldVar + " to " + newVar);
+         compatible = false;
+       }
+     }
+     return compatible;
+   }
+ 
+   @Override
+   public void setHiveAddedJars(String addedJars) {
+     MetastoreConf.setVar(conf, ConfVars.ADDED_JARS, addedJars);
+   }
+ 
+   @Override
+   public void reconnect() throws MetaException {
+     if (localMetaStore) {
+       // For direct DB connections we don't yet support reestablishing connections.
+       throw new MetaException("For direct MetaStore DB connections, we don't support retries" +
+           " at the client level.");
+     } else {
+       close();
+ 
+       if (uriResolverHook != null) {
+         //for dynamic uris, re-lookup if there are new metastore locations
+         resolveUris();
+       }
+ 
+       if (MetastoreConf.getVar(conf, ConfVars.THRIFT_URI_SELECTION).equalsIgnoreCase("RANDOM")) {
+         // Swap the first element of the metastoreUris[] with a random element from the rest
+         // of the array. Rationale being that this method will generally be called when the default
+         // connection has died and the default connection is likely to be the first array element.
+         promoteRandomMetaStoreURI();
+       }
+       open();
+     }
+   }
+ 
+   @Override
+   public void alter_table(String dbname, String tbl_name, Table new_tbl) throws TException {
+     alter_table_with_environmentContext(dbname, tbl_name, new_tbl, null);
+   }
+ 
+   @Override
+   public void alter_table(String defaultDatabaseName, String tblName, Table table,
+                           boolean cascade) throws TException {
+     EnvironmentContext environmentContext = new EnvironmentContext();
+     if (cascade) {
+       environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE);
+     }
+     alter_table_with_environmentContext(defaultDatabaseName, tblName, table, environmentContext);
+   }
+ 
+   @Override
+   public void alter_table_with_environmentContext(String dbname, String tbl_name, Table new_tbl,
+       EnvironmentContext envContext) throws InvalidOperationException, MetaException, TException {
+     HiveMetaHook hook = getHook(new_tbl);
+     if (hook != null) {
+       hook.preAlterTable(new_tbl, envContext);
+     }
 -    client.alter_table_with_environment_context(prependCatalogToDbName(dbname, conf),
 -        tbl_name, new_tbl, envContext);
++    AlterTableRequest req = new AlterTableRequest(dbname, tbl_name, new_tbl);
++    req.setCatName(MetaStoreUtils.getDefaultCatalog(conf));
++    req.setEnvironmentContext(envContext);
++    client.alter_table_req(req);
+   }
+ 
+   @Override
+   public void alter_table(String catName, String dbName, String tblName, Table newTable,
+                          EnvironmentContext envContext) throws TException {
 -    client.alter_table_with_environment_context(prependCatalogToDbName(catName,
 -        dbName, conf), tblName, newTable, envContext);
++    // This never used to call the hook. Why? There's overload madness in metastore...
++    AlterTableRequest req = new AlterTableRequest(dbName, tblName, newTable);
++    req.setCatName(catName);
++    req.setEnvironmentContext(envContext);
++    client.alter_table_req(req);
++  }
++
++  @Override
++  public void alter_table(String catName, String dbName, String tbl_name, Table new_tbl,
++      EnvironmentContext envContext, long txnId, String validWriteIds)
++          throws InvalidOperationException, MetaException, TException {
++    HiveMetaHook hook = getHook(new_tbl);
++    if (hook != null) {
++      hook.preAlterTable(new_tbl, envContext);
++    }
++    AlterTableRequest req = new AlterTableRequest(dbName, tbl_name, new_tbl);
++    req.setCatName(catName);
++    req.setTxnId(txnId);
++    req.setValidWriteIdList(validWriteIds);
++    req.setEnvironmentContext(envContext);
++    client.alter_table_req(req);
+   }
+ 
++  @Deprecated
+   @Override
+   public void renamePartition(final String dbname, final String tableName, final List<String> part_vals,
+                               final Partition newPart) throws TException {
 -    renamePartition(getDefaultCatalog(conf), dbname, tableName, part_vals, newPart);
++    renamePartition(getDefaultCatalog(conf), dbname, tableName, part_vals, newPart, -1, null);
+   }
+ 
+   @Override
+   public void renamePartition(String catName, String dbname, String tableName, List<String> part_vals,
 -                              Partition newPart) throws TException {
 -    client.rename_partition(prependCatalogToDbName(catName, dbname, conf), tableName, part_vals, newPart);
 -
++                              Partition newPart, long txnId, String validWriteIds) throws TException {
++    RenamePartitionRequest req = new RenamePartitionRequest(dbname, tableName, part_vals, newPart);
++    req.setCatName(catName);
++    req.setTxnId(txnId);
++    req.setValidWriteIdList(validWriteIds);
++    client.rename_partition_req(req);
+   }
+ 
+   private void open() throws MetaException {
+     isConnected = false;
+     TTransportException tte = null;
+     boolean useSSL = MetastoreConf.getBoolVar(conf, ConfVars.USE_SSL);
+     boolean useSasl = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_SASL);
+     boolean useFramedTransport = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_FRAMED_TRANSPORT);
+     boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_COMPACT_PROTOCOL);
+     int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf,
+         ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
+ 
+     for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
+       for (URI store : metastoreUris) {
+         LOG.info("Trying to connect to metastore with URI " + store);
+ 
+         try {
+           if (useSSL) {
+             try {
+               String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim();
+               if (trustStorePath.isEmpty()) {
+                 throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH.toString()
+                     + " Not configured for SSL connection");
+               }
+               String trustStorePassword =
+                   MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD);
+ 
+               // Create an SSL socket and connect
+               transport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout,
+                   trustStorePath, trustStorePassword );
+               LOG.info("Opened an SSL connection to metastore, current connections: " + connCount.incrementAndGet());
+             } catch(IOException e) {
+               throw new IllegalArgumentException(e);
+             } catch(TTransportException e) {
+               tte = e;
+               throw new MetaException(e.toString());
+             }
+           } else {
+             transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
+           }
+ 
+           if (useSasl) {
+             // Wrap thrift connection with SASL for secure connection.
+             try {
+               HadoopThriftAuthBridge.Client authBridge =
+                 HadoopThriftAuthBridge.getBridge().createClient();
+ 
+               // check if we should use delegation tokens to authenticate
+               // the call below gets hold of the tokens if they are set up by hadoop
+               // this should happen on the map/reduce tasks if the client added the
+               // tokens into hadoop's credential store in the front end during job
+               // submission.
+               String tokenSig = MetastoreConf.getVar(conf, ConfVars.TOKEN_SIGNATURE);
+               // tokenSig could be null
+               tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig);
+ 
+               if(tokenStrForm != null) {
+                 LOG.info("HMSC::open(): Found delegation token. Creating DIGEST-based thrift connection.");
+                 // authenticate using delegation tokens via the "DIGEST" mechanism
+                 transport = authBridge.createClientTransport(null, store.getHost(),
+                     "DIGEST", tokenStrForm, transport,
+                         MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
+               } else {
+                 LOG.info("HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection.");
+                 String principalConfig =
+                     MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL);
+                 transport = authBridge.createClientTransport(
+                     principalConfig, store.getHost(), "KERBEROS", null,
+                     transport, MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
+               }
+             } catch (IOException ioe) {
+               LOG.error("Couldn't create client transport", ioe);
+               throw new MetaException(ioe.toString());
+             }
+           } else {
+             if (useFramedTransport) {
+               transport = new TFramedTransport(transport);
+             }
+           }
+ 
+           final TProtocol protocol;
+           if (useCompactProtocol) {
+             protocol = new TCompactProtocol(transport);
+           } else {
+             protocol = new TBinaryProtocol(transport);
+           }
+           client = new ThriftHiveMetastore.Client(protocol);
+           try {
+             if (!transport.isOpen()) {
+               transport.open();
+               LOG.info("Opened a connection to metastore, current connections: " + connCount.incrementAndGet());
+             }
+             isConnected = true;
+           } catch (TTransportException e) {
+             tte = e;
+             if (LOG.isDebugEnabled()) {
+               LOG.warn("Failed to connect to the MetaStore Server...", e);
+             } else {
+               // Don't print full exception trace if DEBUG is not on.
+               LOG.warn("Failed to connect to the MetaStore Server...");
+             }
+           }
+ 
+           if (isConnected && !useSasl && MetastoreConf.getBoolVar(conf, ConfVars.EXECUTE_SET_UGI)){
+             // Call set_ugi, only in unsecure mode.
+             try {
+               UserGroupInformation ugi = SecurityUtils.getUGI();
+               client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
+             } catch (LoginException e) {
+               LOG.warn("Failed to do login. set_ugi() is not successful, " +
+                        "Continuing without it.", e);
+             } catch (IOException e) {
+               LOG.warn("Failed to find ugi of client set_ugi() is not successful, " +
+                   "Continuing without it.", e);
+             } catch (TException e) {
+               LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. "
+                   + "Continuing without it.", e);
+             }
+           }
+         } catch (MetaException e) {
+           LOG.error("Unable to connect to metastore with URI " + store
+                     + " in attempt " + attempt, e);
+         }
+         if (isConnected) {
+           break;
+         }
+       }
+       // Wait before launching the next round of connection retries.
+       if (!isConnected && retryDelaySeconds > 0) {
+         try {
+           LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt.");
+           Thread.sleep(retryDelaySeconds * 1000);
+         } catch (InterruptedException ignore) {}
+       }
+     }
+ 
+     if (!isConnected) {
+       throw new MetaException("Could not connect to meta store using any of the URIs provided." +
+         " Most recent failure: " + StringUtils.stringifyException(tte));
+     }
+ 
+     snapshotActiveConf();
+ 
+     LOG.info("Connected to metastore.");
+   }
+ 
+   private void snapshotActiveConf() {
+     currentMetaVars = new HashMap<>(MetastoreConf.metaVars.length);
+     for (ConfVars oneVar : MetastoreConf.metaVars) {
+       currentMetaVars.put(oneVar.getVarname(), MetastoreConf.getAsString(conf, oneVar));
+     }
+   }
+ 
+   @Override
+   public String getTokenStrForm() throws IOException {
+     return tokenStrForm;
+    }
+ 
+   @Override
+   public void close() {
+     isConnected = false;
+     currentMetaVars = null;
+     try {
+       if (null != client) {
+         client.shutdown();
+       }
+     } catch (TException e) {
+       LOG.debug("Unable to shutdown metastore client. Will try closing transport directly.", e);
+     }
+     // Transport would have got closed via client.shutdown(), so we dont need this, but
+     // just in case, we make this call.
+     if ((transport != null) && transport.isOpen()) {
+       transport.close();
+       LOG.info("Closed a connection to metastore, current connections: " + connCount.decrementAndGet());
+     }
+   }
+ 
+   @Override
+   public void setMetaConf(String key, String value) throws TException {
+     client.setMetaConf(key, value);
+   }
+ 
+   @Override
+   public String getMetaConf(String key) throws TException {
+     return client.getMetaConf(key);
+   }
+ 
+   @Override
+   public void createCatalog(Catalog catalog) throws TException {
+     client.create_catalog(new CreateCatalogRequest(catalog));
+   }
+ 
+   @Override
+   public void alterCatalog(String catalogName, Catalog newCatalog) throws TException {
+     client.alter_catalog(new AlterCatalogRequest(catalogName, newCatalog));
+   }
+ 
+   @Override
+   public Catalog getCatalog(String catName) throws TException {
+     GetCatalogResponse rsp = client.get_catalog(new GetCatalogRequest(catName));
+     return rsp == null ? null : filterHook.filterCatalog(rsp.getCatalog());
+   }
+ 
+   @Override
+   public List<String> getCatalogs() throws TException {
+     GetCatalogsResponse rsp = client.get_catalogs();
+     return rsp == null ? null : filterHook.filterCatalogs(rsp.getNames());
+   }
+ 
+   @Override
+   public void dropCatalog(String catName) throws TException {
+     client.drop_catalog(new DropCatalogRequest(catName));
+   }
+ 
+   /**
+    * @param new_part
+    * @return the added partition
+    * @throws InvalidObjectException
+    * @throws AlreadyExistsException
+    * @throws MetaException
+    * @throws TException
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#add_partition(org.apache.hadoop.hive.metastore.api.Partition)
+    */
+   @Override
+   public Partition add_partition(Partition new_part) throws TException {
+     return add_partition(new_part, null);
+   }
+ 
+   public Partition add_partition(Partition new_part, EnvironmentContext envContext)
+       throws TException {
+     if (new_part != null && !new_part.isSetCatName()) {
+       new_part.setCatName(getDefaultCatalog(conf));
+     }
+     Partition p = client.add_partition_with_environment_context(new_part, envContext);
+     return deepCopy(p);
+   }
+ 
+   /**
+    * @param new_parts
+    * @throws InvalidObjectException
+    * @throws AlreadyExistsException
+    * @throws MetaException
+    * @throws TException
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#add_partitions(List)
+    */
+   @Override
+   public int add_partitions(List<Partition> new_parts) throws TException {
+     if (new_parts == null || new_parts.contains(null)) {
+       throw new MetaException("Partitions cannot be null.");
+     }
+     if (new_parts != null && !new_parts.isEmpty() && !new_parts.get(0).isSetCatName()) {
+       final String defaultCat = getDefaultCatalog(conf);
+       new_parts.forEach(p -> p.setCatName(defaultCat));
+     }
+     return client.add_partitions(new_parts);
+   }
+ 
+   @Override
+   public List<Partition> add_partitions(
+       List<Partition> parts, boolean ifNotExists, boolean needResults) throws TException {
+     if (parts == null || parts.contains(null)) {
+       throw new MetaException("Partitions cannot be null.");
+     }
+     if (parts.isEmpty()) {
+       return needResults ? new ArrayList<>() : null;
+     }
+     Partition part = parts.get(0);
+     // Have to set it for each partition too
+     if (!part.isSetCatName()) {
+       final String defaultCat = getDefaultCatalog(conf);
+       parts.forEach(p -> p.setCatName(defaultCat));
+     }
+     AddPartitionsRequest req = new AddPartitionsRequest(
+         part.getDbName(), part.getTableName(), parts, ifNotExists);
+     req.setCatName(part.isSetCatName() ? part.getCatName() : getDefaultCatalog(conf));
+     req.setNeedResult(needResults);
+     AddPartitionsResult result = client.add_partitions_req(req);
+     return needResults ? filterHook.filterPartitions(result.getPartitions()) : null;
+   }
+ 
+   @Override
+   public int add_partitions_pspec(PartitionSpecProxy partitionSpec) throws TException {
+     if (partitionSpec == null) {
+       throw new MetaException("PartitionSpec cannot be null.");
+     }
+     if (partitionSpec.getCatName() == null) {
+       partitionSpec.setCatName(getDefaultCatalog(conf));
+     }
+     return client.add_partitions_pspec(partitionSpec.toPartitionSpec());
+   }
+ 
+   @Override
+   public Partition appendPartition(String db_name, String table_name,
+       List<String> part_vals) throws TException {
+     return appendPartition(getDefaultCatalog(conf), db_name, table_name, part_vals);
+   }
+ 
+   @Override
+   public Partition appendPartition(String dbName, String tableName, String partName)
+       throws TException {
+     return appendPartition(getDefaultCatalog(conf), dbName, tableName, partName);
+   }
+ 
+   @Override
+   public Partition appendPartition(String catName, String dbName, String tableName,
+                                    String name) throws TException {
+     Partition p = client.append_partition_by_name(prependCatalogToDbName(
+         catName, dbName, conf), tableName, name);
+     return deepCopy(p);
+   }
+ 
+   @Override
+   public Partition appendPartition(String catName, String dbName, String tableName,
+                                    List<String> partVals) throws TException {
+     Partition p = client.append_partition(prependCatalogToDbName(
+         catName, dbName, conf), tableName, partVals);
+     return deepCopy(p);
+   }
+ 
+   @Deprecated
+   public Partition appendPartition(String dbName, String tableName, List<String> partVals,
+                                    EnvironmentContext ec) throws TException {
+     return client.append_partition_with_environment_context(prependCatalogToDbName(dbName, conf),
+         tableName, partVals, ec).deepCopy();
+   }
+ 
+   /**
+    * Exchange the partition between two tables
+    * @param partitionSpecs partitions specs of the parent partition to be exchanged
+    * @param destDb the db of the destination table
+    * @param destinationTableName the destination table name
+    * @return new partition after exchanging
+    */
+   @Override
+   public Partition exchange_partition(Map<String, String> partitionSpecs,
+       String sourceDb, String sourceTable, String destDb,
+       String destinationTableName) throws TException {
+     return exchange_partition(partitionSpecs, getDefaultCatalog(conf), sourceDb, sourceTable,
+         getDefaultCatalog(conf), destDb, destinationTableName);
+   }
+ 
+   @Override
+   public Partition exchange_partition(Map<String, String> partitionSpecs, String sourceCat,
+                                       String sourceDb, String sourceTable, String destCat,
+                                       String destDb, String destTableName) throws TException {
+     return client.exchange_partition(partitionSpecs, prependCatalogToDbName(sourceCat, sourceDb, conf),
+         sourceTable, prependCatalogToDbName(destCat, destDb, conf), destTableName);
+   }
+ 
+   /**
+    * Exchange the partitions between two tables
+    * @param partitionSpecs partitions specs of the parent partition to be exchanged
+    * @param destDb the db of the destination table
+    * @param destinationTableName the destination table name
+    * @return new partitions after exchanging
+    */
+   @Override
+   public List<Partition> exchange_partitions(Map<String, String> partitionSpecs,
+       String sourceDb, String sourceTable, String destDb,
+       String destinationTableName) throws TException {
+     return exchange_partitions(partitionSpecs, getDefaultCatalog(conf), sourceDb, sourceTable,
+         getDefaultCatalog(conf), destDb, destinationTableName);
+   }
+ 
+   @Override
++  public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(
++      String dbName, String tableName, List<String> partNames, List<String> colNames,
++      long txnId, String validWriteIdList)
++      throws NoSuchObjectException, MetaException, TException {
++    return getPartitionColumnStatistics(getDefaultCatalog(conf), dbName, tableName,
++        partNames, colNames, txnId, validWriteIdList);
++  }
++
++  @Override
++  public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(
++      String catName, String dbName, String tableName, List<String> partNames,
++      List<String> colNames, long txnId, String validWriteIdList)
++      throws NoSuchObjectException, MetaException, TException {
++    PartitionsStatsRequest rqst = new PartitionsStatsRequest(dbName, tableName, colNames,
++        partNames);
++    rqst.setCatName(catName);
++    rqst.setTxnId(txnId);
++    rqst.setValidWriteIdList(validWriteIdList);
++    return client.get_partitions_statistics_req(rqst).getPartStats();
++  }
++
++  @Override
++  public AggrStats getAggrColStatsFor(String dbName, String tblName, List<String> colNames,
++      List<String> partNames, long txnId, String writeIdList)
++      throws NoSuchObjectException, MetaException, TException {
++    return getAggrColStatsFor(getDefaultCatalog(conf), dbName, tblName, colNames,
++        partNames, txnId, writeIdList);  }
++
++  @Override
++  public AggrStats getAggrColStatsFor(String catName, String dbName, String tblName, List<String> colNames,
++      List<String> partNames, long txnId, String writeIdList)
++      throws NoSuchObjectException, MetaException, TException {
++    if (colNames.isEmpty() || partNames.isEmpty()) {
++      LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval on client side.");
++      return new AggrStats(new ArrayList<>(),0); // Nothing to aggregate
++    }
++    PartitionsStatsRequest req = new PartitionsStatsRequest(dbName, tblName, colNames, partNames);
++    req.setCatName(catName);
++    req.setTxnId(txnId);
++    req.setValidWriteIdList(writeIdList);
++    return client.get_aggr_stats_for(req);
++  }
++
++  @Override
+   public List<Partition> exchange_partitions(Map<String, String> partitionSpecs, String sourceCat,
+                                              String sourceDb, String sourceTable, String destCat,
+                                              String destDb, String destTableName) throws TException {
+     return client.exchange_partitions(partitionSpecs, prependCatalogToDbName(sourceCat, sourceDb, conf),
+         sourceTable, prependCatalogToDbName(destCat, destDb, conf), destTableName);
+   }
+ 
+   @Override
+   public void validatePartitionNameCharacters(List<String> partVals)
+       throws TException, MetaException {
+     client.partition_name_has_valid_characters(partVals, true);
+   }
+ 
+   /**
+    * Create a new Database
+    * @param db
+    * @throws AlreadyExistsException
+    * @throws InvalidObjectException
+    * @throws MetaException
+    * @throws TException
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#create_database(Database)
+    */
+   @Override
+   public void createDatabase(Database db)
+       throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+     if (!db.isSetCatalogName()) {
+       db.setCatalogName(getDefaultCatalog(conf));
+     }
+     client.create_database(db);
+   }
+ 
+   /**
+    * @param tbl
+    * @throws MetaException
+    * @throws NoSuchObjectException
+    * @throws TException
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#create_table(org.apache.hadoop.hive.metastore.api.Table)
+    */
+   @Override
+   public void createTable(Table tbl) throws AlreadyExistsException,
+       InvalidObjectException, MetaException, NoSuchObjectException, TException {
+     createTable(tbl, null);
+   }
+ 
+   public void createTable(Table tbl, EnvironmentContext envContext) throws AlreadyExistsException,
+       InvalidObjectException, MetaException, NoSuchObjectException, TException {
+     if (!tbl.isSetCatName()) {
+       tbl.setCatName(getDefaultCatalog(conf));
+     }
+     HiveMetaHook hook = getHook(tbl);
+     if (hook != null) {
+       hook.preCreateTable(tbl);
+     }
+     boolean success = false;
+     try {
+       // Subclasses can override this step (for example, for temporary tables)
+       create_table_with_environment_context(tbl, envContext);
+       if (hook != null) {
+         hook.commitCreateTable(tbl);
+       }
+       success = true;
+     }
+     finally {
+       if (!success && (hook != null)) {
+         try {
+           hook.rollbackCreateTable(tbl);
+         } catch (Exception e){
+           LOG.error("Create rollback failed with", e);
+         }
+       }
+     }
+   }
+ 
+   @Override
+   public void createTableWithConstraints(Table tbl,
+     List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
+     List<SQLUniqueConstraint> uniqueConstraints,
+     List<SQLNotNullConstraint> notNullConstraints,
+     List<SQLDefaultConstraint> defaultConstraints,
+     List<SQLCheckConstraint> checkConstraints)
+         throws AlreadyExistsException, InvalidObjectException,
+         MetaException, NoSuchObjectException, TException {
+ 
+     if (!tbl.isSetCatName()) {
+       String defaultCat = getDefaultCatalog(conf);
+       tbl.setCatName(defaultCat);
+       if (primaryKeys != null) {
+         primaryKeys.forEach(pk -> pk.setCatName(defaultCat));
+       }
+       if (foreignKeys != null) {
+         foreignKeys.forEach(fk -> fk.setCatName(defaultCat));
+       }
+       if (uniqueConstraints != null) {
+         uniqueConstraints.forEach(uc -> uc.setCatName(defaultCat));
+       }
+       if (notNullConstraints != null) {
+         notNullConstraints.forEach(nn -> nn.setCatName(defaultCat));
+       }
+       if (defaultConstraints != null) {
+         defaultConstraints.forEach(def -> def.setCatName(defaultCat));
+       }
+       if (checkConstraints != null) {
+         checkConstraints.forEach(cc -> cc.setCatName(defaultCat));
+       }
+     }
+     HiveMetaHook hook = getHook(tbl);
+     if (hook != null) {
+       hook.preCreateTable(tbl);
+     }
+     boolean success = false;
+     try {
+       // Subclasses can override this step (for example, for temporary tables)
+       client.create_table_with_constraints(tbl, primaryKeys, foreignKeys,
+           uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+       if (hook != null) {
+         hook.commitCreateTable(tbl);
+       }
+       success = true;
+     } finally {
+       if (!success && (hook != null)) {
+         hook.rollbackCreateTable(tbl);
+       }
+     }
+   }
+ 
+   @Override
+   public void dropConstraint(String dbName, String tableName, String constraintName)
+       throws TException {
+     dropConstraint(getDefaultCatalog(conf), dbName, tableName, constraintName);
+   }
+ 
+   @Override
+   public void dropConstraint(String catName, String dbName, String tableName, String constraintName)
+       throws TException {
+     DropConstraintRequest rqst = new DropConstraintRequest(dbName, tableName, constraintName);
+     rqst.setCatName(catName);
+     client.drop_constraint(rqst);
+   }
+ 
+   @Override
+   public void addPrimaryKey(List<SQLPrimaryKey> primaryKeyCols) throws TException {
+     if (!primaryKeyCols.isEmpty() && !primaryKeyCols.get(0).isSetCatName()) {
+       String defaultCat = getDefaultCatalog(conf);
+       primaryKeyCols.forEach(pk -> pk.setCatName(defaultCat));
+     }
+     client.add_primary_key(new AddPrimaryKeyRequest(primaryKeyCols));
+   }
+ 
+   @Override
+   public void addForeignKey(List<SQLForeignKey> foreignKeyCols) throws TException {
+     if (!foreignKeyCols.isEmpty() && !foreignKeyCols.get(0).isSetCatName()) {
+       String defaultCat = getDefaultCatalog(conf);
+       foreignKeyCols.forEach(fk -> fk.setCatName(defaultCat));
+     }
+     client.add_foreign_key(new AddForeignKeyRequest(foreignKeyCols));
+   }
+ 
+   @Override
+   public void addUniqueConstraint(List<SQLUniqueConstraint> uniqueConstraintCols) throws
+     NoSuchObjectException, MetaException, TException {
+     if (!uniqueConstraintCols.isEmpty() && !uniqueConstraintCols.get(0).isSetCatName()) {
+       String defaultCat = getDefaultCatalog(conf);
+       uniqueConstraintCols.forEach(uc -> uc.setCatName(defaultCat));
+     }
+     client.add_unique_constraint(new AddUniqueConstraintRequest(uniqueConstraintCols));
+   }
+ 
+   @Override
+   public void addNotNullConstraint(List<SQLNotNullConstraint> notNullConstraintCols) throws
+     NoSuchObjectException, MetaException, TException {
+     if (!notNullConstraintCols.isEmpty() && !notNullConstraintCols.get(0).isSetCatName()) {
+       String defaultCat = getDefaultCatalog(conf);
+       notNullConstraintCols.forEach(nn -> nn.setCatName(defaultCat));
+     }
+     client.add_not_null_constraint(new AddNotNullConstraintRequest(notNullConstraintCols));
+   }
+ 
+   @Override
+   public void addDefaultConstraint(List<SQLDefaultConstraint> defaultConstraints) throws
+       NoSuchObjectException, MetaException, TException {
+     if (!defaultConstraints.isEmpty() && !defaultConstraints.get(0).isSetCatName()) {
+       String defaultCat = getDefaultCatalog(conf);
+       defaultConstraints.forEach(def -> def.setCatName(defaultCat));
+     }
+     client.add_default_constraint(new AddDefaultConstraintRequest(defaultConstraints));
+   }
+ 
+   @Override
+   public void addCheckConstraint(List<SQLCheckConstraint> checkConstraints) throws
+       NoSuchObjectException, MetaException, TException {
+     if (!checkConstraints.isEmpty() && !checkConstraints.get(0).isSetCatName()) {
+       String defaultCat = getDefaultCatalog(conf);
+       checkConstraints.forEach(cc -> cc.setCatName(defaultCat));
+     }
+     client.add_check_constraint(new AddCheckConstraintRequest(checkConstraints));
+   }
+ 
+   /**
+    * @param type
+    * @return true or false
+    * @throws AlreadyExistsException
+    * @throws InvalidObjectException
+    * @throws MetaException
+    * @throws TException
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#create_type(org.apache.hadoop.hive.metastore.api.Type)
+    */
+   public boolean createType(Type type) throws AlreadyExistsException,
+       InvalidObjectException, MetaException, TException {
+     return client.create_type(type);
+   }
+ 
+   /**
+    * @param name
+    * @throws NoSuchObjectException
+    * @throws InvalidOperationException
+    * @throws MetaException
+    * @throws TException
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_database(java.lang.String, boolean, boolean)
+    */
+   @Override
+   public void dropDatabase(String name)
+       throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+     dropDatabase(getDefaultCatalog(conf), name, true, false, false);
+   }
+ 
+   @Override
+   public void dropDatabase(String name, boolean deleteData, boolean ignoreUnknownDb)
+       throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+     dropDatabase(getDefaultCatalog(conf), name, deleteData, ignoreUnknownDb, false);
+   }
+ 
+   @Override
+   public void dropDatabase(String name, boolean deleteData, boolean ignoreUnknownDb, boolean cascade)
+       throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+     dropDatabase(getDefaultCatalog(conf), name, deleteData, ignoreUnknownDb, cascade);
+   }
+ 
+   @Override
+   public void dropDatabase(String catalogName, String dbName, boolean deleteData,
+                            boolean ignoreUnknownDb, boolean cascade)
+       throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+     try {
+       getDatabase(catalogName, dbName);
+     } catch (NoSuchObjectException e) {
+       if (!ignoreUnknownDb) {
+         throw e;
+       }
+       return;
+     }
+ 
+     String dbNameWithCatalog = prependCatalogToDbName(catalogName, dbName, conf);
+ 
+     if (cascade) {
+       // Note that this logic may drop some of the tables of the database
+       // even if the drop database fail for any reason
+       // TODO: Fix this
+       List<String> materializedViews = getTables(dbName, ".*", TableType.MATERIALIZED_VIEW);
+       for (String table : materializedViews) {
+         // First we delete the materialized views
+         dropTable(dbName, table, deleteData, true);
+       }
+ 
+       /**
+        * When dropping db cascade, client side hooks have to be called at each table removal.
+        * If {@link org.apache.hadoop.hive.metastore.conf.MetastoreConf#ConfVars.BATCH_RETRIEVE_MAX
+        * BATCH_RETRIEVE_MAX} is less than the number of tables in the DB, we'll have to call the
+        * hooks one by one each alongside with a
+        * {@link #dropTable(String, String, boolean, boolean, EnvironmentContext) dropTable} call to
+        * ensure transactionality.
+        */
+       List<String> tableNameList = getAllTables(dbName);
+       int tableCount = tableNameList.size();
+       int maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.BATCH_RETRIEVE_MAX);
+       LOG.debug("Selecting dropDatabase method for " + dbName + " (" + tableCount + " tables), " +
+              ConfVars.BATCH_RETRIEVE_MAX.getVarname() + "=" + maxBatchSize);
+ 
+       if (tableCount > maxBatchSize) {
+         LOG.debug("Dropping database in a per table batch manner.");
+         dropDatabaseCascadePerTable(catalogName, dbName, tableNameList, deleteData, maxBatchSize);
+       } else {
+         LOG.debug("Dropping database in a per DB manner.");
+         dropDatabaseCascadePerDb(catalogName, dbName, tableNameList, deleteData);
+       }
+ 
+     } else {
+       client.drop_database(dbNameWithCatalog, deleteData, cascade);
+     }
+   }
+ 
+   /**
+    * Handles dropDatabase by invoking drop_table in HMS for each table.
+    * Useful when table list in DB is too large to fit in memory. It will retrieve tables in
+    * chunks and for each table with a drop_table hook it will invoke drop_table on both HMS and
+    * the hook. This is a timely operation so hookless tables are skipped and will be dropped on
+    * server side when the client invokes drop_database.
+    * Note that this is 'less transactional' than dropDatabaseCascadePerDb since we're dropping
+    * table level objects, so the overall outcome of this method might result in a halfly dropped DB.
+    * @param catName
+    * @param dbName
+    * @param tableList
+    * @param deleteData
+    * @param maxBatchSize
+    * @throws TException
+    */
+   private void dropDatabaseCascadePerTable(String catName, String dbName, List<String> tableList,
+                                            boolean deleteData, int maxBatchSize) throws TException {
+     String dbNameWithCatalog = prependCatalogToDbName(catName, dbName, conf);
+     for (Table table : new TableIterable(this, catName, dbName, tableList, maxBatchSize)) {
+       boolean success = false;
+       HiveMetaHook hook = getHook(table);
+       if (hook == null) {
+         continue;
+       }
+       try {
+         hook.preDropTable(table);
+         client.drop_table_with_environment_context(dbNameWithCatalog, table.getTableName(), deleteData, null);
+         hook.commitDropTable(table, deleteData);
+         success = true;
+       } finally {
+         if (!success) {
+           hook.rollbackDropTable(table);
+         }
+       }
+     }
+     client.drop_database(dbNameWithCatalog, deleteData, true);
+   }
+ 
+   /**
+    * Handles dropDatabase by invoking drop_database in HMS.
+    * Useful when table list in DB can fit in memory, it will retrieve all tables at once and
+    * call drop_database once. Also handles drop_table hooks.
+    * @param catName
+    * @param dbName
+    * @param tableList
+    * @param deleteData
+    * @throws TException
+    */
+   private void dropDatabaseCascadePerDb(String catName, String dbName, List<String> tableList,
+                                         boolean deleteData) throws TException {
+     String dbNameWithCatalog = prependCatalogToDbName(catName, dbName, conf);
+     List<Table> tables = getTableObjectsByName(catName, dbName, tableList);
+     boolean success = false;
+     try {
+       for (Table table : tables) {
+         HiveMetaHook hook = getHook(table);
+         if (hook == null) {
+           continue;
+         }
+         hook.preDropTable(table);
+       }
+       client.drop_database(dbNameWithCatalog, deleteData, true);
+       for (Table table : tables) {
+         HiveMetaHook hook = getHook(table);
+         if (hook == null) {
+           continue;
+         }
+         hook.commitDropTable(table, deleteData);
+       }
+       success = true;
+     } finally {
+       if (!success) {
+         for (Table table : tables) {
+           HiveMetaHook hook = getHook(table);
+           if (hook == null) {
+             continue;
+           }
+           hook.rollbackDropTable(table);
+         }
+       }
+     }
+   }
+ 
+   @Override
+   public boolean dropPartition(String dbName, String tableName, String partName, boolean deleteData)
+       throws TException {
+     return dropPartition(getDefaultCatalog(conf), dbName, tableName, partName, deleteData);
+   }
+ 
+   @Override
+   public boolean dropPartition(String catName, String db_name, String tbl_name, String name,
+                                boolean deleteData) throws TException {
+     return client.drop_partition_by_name_with_environment_context(prependCatalogToDbName(
+         catName, db_name, conf), tbl_name, name, deleteData, null);
+   }
+ 
+   private static EnvironmentContext getEnvironmentContextWithIfPurgeSet() {
+     Map<String, String> warehouseOptions = new HashMap<>();
+     warehouseOptions.put("ifPurge", "TRUE");
+     return new EnvironmentContext(warehouseOptions);
+   }
+ 
+   // A bunch of these are in HiveMetaStoreClient but not IMetaStoreClient.  I have marked these
+   // as deprecated and not updated them for the catalogs.  If we really want to support them we
+   // should add them to IMetaStoreClient.
+ 
+   @Deprecated
+   public boolean dropPartition(String db_name, String tbl_name, List<String> part_vals,
+       EnvironmentContext env_context) throws TException {
+     return client.drop_partition_with_environment_context(prependCatalogToDbName(db_name, conf),
+         tbl_name, part_vals, true, env_context);
+   }
+ 
+   @Deprecated
+   public boolean dropPartition(String dbName, String tableName, String partName, boolean dropData,
+                                EnvironmentContext ec) throws TException {
+     return client.drop_partition_by_name_with_environment_context(prependCatalogToDbName(dbName, conf),
+         tableName, partName, dropData, ec);
+   }
+ 
+   @Deprecated
+   public boolean dropPartition(String dbName, String tableName, List<String> partVals)
+       throws TException {
+     return client.drop_partition(prependCatalogToDbName(dbName, conf), tableName, partVals, true);
+   }
+ 
+   @Override
+   public boolean dropPartition(String db_name, String tbl_name,
+       List<String> part_vals, boolean deleteData) throws TException {
+     return dropPartition(getDefaultCatalog(conf), db_name, tbl_name, part_vals,
+         PartitionDropOptions.instance().deleteData(deleteData));
+   }
+ 
+   @Override
+   public boolean dropPartition(String catName, String db_name, String tbl_name,
+                                List<String> part_vals, boolean deleteData) throws TException {
+     return dropPartition(catName, db_name, tbl_name, part_vals, PartitionDropOptions.instance()
+             .deleteData(deleteData));
+   }
+ 
+   @Override
+   public boolean dropPartition(String db_name, String tbl_name,
+                                List<String> part_vals, PartitionDropOptions options) throws TException {
+     return dropPartition(getDefaultCatalog(conf), db_name, tbl_name, part_vals, options);
+   }
+ 
+   @Override
+   public boolean dropPartition(String catName, String db_name, String tbl_name,
+                                List<String> part_vals, PartitionDropOptions options)
+       throws TException {
+     if (options == null) {
+       options = PartitionDropOptions.instance();
+     }
+     if (part_vals != null) {
+       for (String partVal : part_vals) {
+         if (partVal == null) {
+           throw new MetaException("The partition value must not be null.");
+         }
+       }
+     }
+     return client.drop_partition_with_environment_context(prependCatalogToDbName(
+         catName, db_name, conf), tbl_name, part_vals, options.deleteData,
+         options.purgeData ? getEnvironmentContextWithIfPurgeSet() : null);
+   }
+ 
+   @Override
+   public List<Partition> dropPartitions(String dbName, String tblName,
+                                         List<ObjectPair<Integer, byte[]>> partExprs,
+                                         PartitionDropOptions options)
+       throws TException {
+     return dropPartitions(getDefaultCatalog(conf), dbName, tblName, partExprs, options);
+   }
+ 
+   @Override
+   public List<Partition> dropPartitions(String dbName, String tblName,
+       List<ObjectPair<Integer, byte[]>> partExprs, boolean deleteData,
+       boolean ifExists, boolean needResult) throws NoSuchObjectException, MetaException, TException {
+ 
+     return dropPartitions(getDefaultCatalog(conf), dbName, tblName, partExprs,
+                           PartitionDropOptions.instance()
+                                               .deleteData(deleteData)
+                                               .ifExists(ifExists)
+                                               .returnResults(needResult));
+ 
+   }
+ 
+   @Override
+   public List<Partition> dropPartitions(String dbName, String tblName,
+       List<ObjectPair<Integer, byte[]>> partExprs, boolean deleteData,
+       boolean ifExists) throws NoSuchObjectException, MetaException, TException {
+     // By default, we need the results from dropPartitions();
+     return dropPartitions(getDefaultCatalog(conf), dbName, tblName, partExprs,
+                           PartitionDropOptions.instance()
+                                               .deleteData(deleteData)
+                                               .ifExists(ifExists));
+   }
+ 
+   @Override
+   public List<Partition> dropPartitions(String catName, String dbName, String tblName,
+                                         List<ObjectPair<Integer, byte[]>> partExprs,
+                                         PartitionDropOptions options) throws TException {
+     RequestPartsSpec rps = new RequestPartsSpec();
+     List<DropPartitionsExpr> exprs = new ArrayList<>(partExprs.size());
+     for (ObjectPair<Integer, byte[]> partExpr : partExprs) {
+       DropPartitionsExpr dpe = new DropPartitionsExpr();
+       dpe.setExpr(partExpr.getSecond());
+       dpe.setPartArchiveLevel(partExpr.getFirst());
+       exprs.add(dpe);
+     }
+     rps.setExprs(exprs);
+     DropPartitionsRequest req = new DropPartitionsRequest(dbName, tblName, rps);
+     req.setCatName(catName);
+     req.setDeleteData(options.deleteData);
+     req.setNeedResult(options.returnResults);
+     req.setIfExists(options.ifExists);
+     if (options.purgeData) {
+       LOG.info("Dropped partitions will be purged!");
+       req.setEnvironmentContext(getEnvironmentContextWithIfPurgeSet());
+     }
+     return client.drop_partitions_req(req).getPartitions();
+   }
+ 
+   @Override
+   public void dropTable(String dbname, String name, boolean deleteData,
+       boolean ignoreUnknownTab) throws MetaException, TException,
+       NoSuchObjectException, UnsupportedOperationException {
+     dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, null);
+   }
+ 
+   @Override
+   public void dropTable(String dbname, String name, boolean deleteData,
+       boolean ignoreUnknownTab, boolean ifPurge) throws TException {
+     dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, ifPurge);
+   }
+ 
+   @Override
+   public void dropTable(String dbname, String name) throws TException {
+     dropTable(getDefaultCatalog(conf), dbname, name, true, true, null);
+   }
+ 
+   @Override
+   public void dropTable(String catName, String dbName, String tableName, boolean deleteData,
+                         boolean ignoreUnknownTable, boolean ifPurge) throws TException {
+     //build new environmentContext with ifPurge;
+     EnvironmentContext envContext = null;
+     if(ifPurge){
+       Map<String, String> warehouseOptions;
+       warehouseOptions = new HashMap<>();
+       warehouseOptions.put("ifPurge", "TRUE");
+       envContext = new EnvironmentContext(warehouseOptions);
+     }
+     dropTable(catName, dbName, tableName, deleteData, ignoreUnknownTable, envContext);
+ 
+   }
+ 
+   /**
+    * Drop the table and choose whether to: delete the underlying table data;
+    * throw if the table doesn't exist; save the data in the trash.
+    *
+    * @param catName catalog name
+    * @param dbname database name
+    * @param name table name
+    * @param deleteData
+    *          delete the underlying data or just delete the table in metadata
+    * @param ignoreUnknownTab
+    *          don't throw if the requested table doesn't exist
+    * @param envContext
+    *          for communicating with thrift
+    * @throws MetaException
+    *           could not drop table properly
+    * @throws NoSuchObjectException
+    *           the table wasn't found
+    * @throws TException
+    *           a thrift communication error occurred
+    * @throws UnsupportedOperationException
+    *           dropping an index table is not allowed
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_table(java.lang.String,
+    *      java.lang.String, boolean)
+    */
+   public void dropTable(String catName, String dbname, String name, boolean deleteData,
+       boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
+       NoSuchObjectException, UnsupportedOperationException {
+     Table tbl;
+     try {
+       tbl = getTable(catName, dbname, name);
+     } catch (NoSuchObjectException e) {
+       if (!ignoreUnknownTab) {
+         throw e;
+       }
+       return;
+     }
+     HiveMetaHook hook = getHook(tbl);
+     if (hook != null) {
+       hook.preDropTable(tbl);
+     }
+     boolean success = false;
+     try {
+       drop_table_with_environment_context(catName, dbname, name, deleteData, envContext);
+       if (hook != null) {
+         hook.commitDropTable(tbl, deleteData || (envContext != null && "TRUE".equals(envContext.getProperties().get("ifPurge"))));
+       }
+       success=true;
+     } catch (NoSuchObjectException e) {
+       if (!ignoreUnknownTab) {
+         throw e;
+       }
+     } finally {
+       if (!success && (hook != null)) {
+         hook.rollbackDropTable(tbl);
+       }
+     }
+   }
+ 
+   @Override
++  public void truncateTable(String dbName, String tableName, List<String> partNames,
++      long txnId, String validWriteIds, long writeId) throws TException {
++    truncateTableInternal(getDefaultCatalog(conf),
++        dbName, tableName, partNames, txnId, validWriteIds, writeId);
++  }
++
++  @Override
+   public void truncateTable(String dbName, String tableName, List<String> partNames) throws TException {
 -    truncateTable(getDefaultCatalog(conf), dbName, tableName, partNames);
++    truncateTableInternal(getDefaultCatalog(conf), dbName, tableName, partNames, -1, null, -1);
+   }
+ 
+   @Override
+   public void truncateTable(String catName, String dbName, String tableName, List<String> partNames)
+       throws TException {
 -    client.truncate_table(prependCatalogToDbName(catName, dbName, conf), tableName, partNames);
++    truncateTableInternal(catName, dbName, tableName, partNames, -1, null, -1);
++  }
++
++  private void truncateTableInternal(String catName, String dbName, String tableName,
++      List<String> partNames, long txnId, String validWriteIds, long writeId)
++          throws MetaException, TException {
++    TruncateTableRequest req = new TruncateTableRequest(
++        prependCatalogToDbName(catName, dbName, conf), tableName);
++    req.setPartNames(partNames);
++    req.setTxnId(txnId);
++    req.setValidWriteIdList(validWriteIds);
++    req.setWriteId(writeId);
++    client.truncate_table_req(req);
+   }
+ 
+   /**
+    * Recycles the files recursively from the input path to the cmroot directory either by copying or moving it.
+    *
+    * @param request Inputs for path of the data files to be recycled to cmroot and
+    *                isPurge flag when set to true files which needs to be recycled are not moved to Trash
+    * @return Response which is currently void
+    */
+   @Override
+   public CmRecycleResponse recycleDirToCmPath(CmRecycleRequest request) throws MetaException, TException {
+     return client.cm_recycle(request);
+   }
+ 
+   /**
+    * @param type
+    * @return true if the type is dropped
+    * @throws MetaException
+    * @throws TException
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_type(java.lang.String)
+    */
+   public boolean dropType(String type) throws NoSuchObjectException, MetaException, TException {
+     return client.drop_type(type);
+   }
+ 
+   /**
+    * @param name
+    * @return map of types
+    * @throws MetaException
+    * @throws TException
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#get_type_all(java.lang.String)
+    */
+   public Map<String, Type> getTypeAll(String name) throws MetaException,
+       TException {
+     Map<String, Type> result = null;
+     Map<String, Type> fromClient = client.get_type_all(name);
+     if (fromClient != null) {
+       result = new LinkedHashMap<>();
+       for (String key : fromClient.keySet()) {
+         result.put(key, deepCopy(fromClient.get(key)));
+       }
+     }
+     return result;
+   }
+ 
+   @Override
+   public List<String> getDatabases(String databasePattern) throws TException {
+     return getDatabases(getDefaultCatalog(conf), databasePattern);
+   }
+ 
+   @Override
+   public List<String> getDatabases(String catName, String databasePattern) throws TException {
+     return filterHook.filterDatabases(client.get_databases(prependCatalogToDbName(
+         catName, databasePattern, conf)));
+   }
+ 
+   @Override
+   public List<String> getAllDatabases() throws TException {
+     return getAllDatabases(getDefaultCatalog(conf));
+   }
+ 
+   @Override
+   public List<String> getAllDatabases(String catName) throws TException {
+     return filterHook.filterDatabases(client.get_databases(prependCatalogToDbName(catName, null, conf)));
+   }
+ 
+   @Override
+   public List<Partition> listPartitions(String db_name, String tbl_name, short max_parts)
+       throws TException {
+     return listPartitions(getDefaultCatalog(conf), db_name, tbl_name, max_parts);
+   }
+ 
+   @Override
+   public List<Partition> listPartitions(String catName, String db_name, String tbl_name,
+                                         int max_parts) throws TException {
+     List<Partition> parts = client.get_partitions(prependCatalogToDbName(catName, db_name, conf),
+         tbl_name, shrinkMaxtoShort(max_parts));
+     return deepCopyPartitions(filterHook.filterPartitions(parts));
+   }
+ 
+   @Override
+   public PartitionSpecProxy listPartitionSpecs(String dbName, String tableName, int maxParts) throws TException {
+     return listPartitionSpecs(getDefaultCatalog(conf), dbName, tableName, maxParts);
+   }
+ 
+   @Override
+   public PartitionSpecProxy listPartitionSpecs(String catName, String dbName, String tableName,
+                                                int maxParts) throws TException {
+     return PartitionSpecProxy.Factory.get(filterHook.filterPartitionSpecs(
+         client.get_partitions_pspec(prependCatalogToDbName(catName, dbName, conf), tableName, maxParts)));
+   }
+ 
+   @Override
+   public List<Partition> listPartitions(String db_name, String tbl_name,
+                                         List<String> part_vals, short max_parts) throws TException {
+     return listPartitions(getDefaultCatalog(conf), db_name, tbl_name, part_vals, max_parts);
+   }
+ 
+   @Override
+   public List<Partition> listPartitions(String catName, String db_name, String tbl_name,
+                                         List<String> part_vals, int max_parts) throws TException {
+     List<Partition> parts = client.get_partitions_ps(prependCatalogToDbName(catName, db_name, conf),
+         tbl_name, part_vals, shrinkMaxtoShort(max_parts));
+     return deepCopyPartitions(filterHook.filterPartitions(parts));
+   }
+ 
+   @Override
+   public List<Partition> listPartitionsWithAuthInfo(String db_name, String tbl_name,
+                                                     short max_parts, String user_name,
+                                                     List<String> group_names) throws TException {
+     return listPartitionsWithAuthInfo(getDefaultCatalog(conf), db_name, tbl_name, max_parts, user_name,
+         group_names);
+   }
+ 
+   @Override
+   public List<Partition> listPartitionsWithAuthInfo(String catName, String dbName, String tableName,
+                                                     int maxParts, String userName,
+                                                     List<String> groupNames) throws TException {
+     List<Partition> parts = client.get_partitions_with_auth(prependCatalogToDbName(catName,
+         dbName, conf), tableName, shrinkMaxtoShort(maxParts), userName, groupNames);
+     return deepCopyPartitions(filterHook.filterPartitions(parts));
+   }
+ 
+   @Override
+   public List<Partition> listPartitionsWithAuthInfo(String db_name, String tbl_name,
+                                                     List<String> part_vals, short max_parts,
+                                                     String user_name, List<String> group_names)
+       throws TException {
+     return listPartitionsWithAuthInfo(getDefaultCatalog(conf), db_name, tbl_name, part_vals, max_parts,
+         user_name, group_names);
+   }
+ 
+   @Override
+   public List<Partition> listPartitionsWithAuthInfo(String catName, String dbName, String tableName,
+                                                     List<String> partialPvals, int maxParts,
+                                                     String userName, List<String> groupNames)
+       throws TException {
+     List<Partition> parts = client.get_partitions_ps_with_auth(prependCatalogToDbName(catName,
+         dbName, conf), tableName, partialPvals, shrinkMaxtoShort(maxParts), userName, groupNames);
+     return deepCopyPartitions(filterHook.filterPartitions(parts));
+   }
+ 
+   @Override
+   public List<Partition> listPartitionsByFilter(String db_name, String tbl_name,
+       String filter, short max_parts) throws TException {
+     return listPartitionsByFilter(getDefaultCatalog(conf), db_name, tbl_name, filter, max_parts);
+   }
+ 
+   @Override
+   public List<Partition> listPartitionsByFilter(String catName, String db_name, String tbl_name,
+                                                 String filter, int max_parts) throws TException {
+     List<Partition> parts =client.get_partitions_by_filter(prependCatalogToDbName(
+         catName, db_name, conf), tbl_name, filter, shrinkMaxtoShort(max_parts));
+     return deepCopyPartitions(filterHook.filterPartitions(parts));
+   }
+ 
+   @Override
+   public PartitionSpecProxy listPartitionSpecsByFilter(String db_name, String tbl_name,
+                                                        String filter, int max_parts)
+       throws TException {
+     return listPartitionSpecsByFilter(getDefaultCatalog(conf), db_name, tbl_name, filter, max_parts);
+   }
+ 
+   @Override
+   public PartitionSpecProxy listPartitionSpecsByFilter(String catName, String db_name,
+                                                        String tbl_name, String filter,
+                                                        int max_parts) throws TException {
+     return PartitionSpecProxy.Factory.get(filterHook.filterPartitionSpecs(
+         client.get_part_specs_by_filter(prependCatalogToDbName(catName, db_name, conf), tbl_name, filter,
+             max_parts)));
+   }
+ 
+   @Override
+   public boolean listPartitionsByExpr(String db_name, String tbl_name, byte[] expr,
+                                       String default_partition_name, short max_parts,
+                                       List<Partition> result) throws TException {
+     return listPartitionsByExpr(getDefaultCatalog(conf), db_name, tbl_name, expr,
+         default_partition_name, max_parts, result);
+   }
+ 
+   @Override
+   public boolean listPartitionsByExpr(String catName, String db_name, String tbl_name, byte[] expr,
+       String default_partition_name, int max_parts, List<Partition> result)
+           throws TException {
+     assert result != null;
+     PartitionsByExprRequest req = new PartitionsByExprRequest(
+         db_name, tbl_name, ByteBuffer.wrap(expr));
+     if (default_partition_name != null) {
+       req.setDefaultPartitionName(default_partition_name);
+     }
+     if (max_parts >= 0) {
+       req.setMaxParts(shrinkMaxtoShort(max_parts));
+     }
+     PartitionsByExprResult r;
+     try {
+       r = client.get_partitions_by_expr(req);
+     } catch (TApplicationException te) {
+       // TODO: backward compat for Hive <= 0.12. Can be removed later.
+       if (te.getType() != TApplicationException.UNKNOWN_METHOD
+           && te.getType() != TApplicationException.WRONG_METHOD_NAME) {
+         throw te;
+       }
+       throw new IncompatibleMetastoreException(
+           "Metastore doesn't support listPartitionsByExpr: " + te.getMessage());
+     }
+     r.setPartitions(filterHook.filterPartitions(r.getPartitions()));
+     // TODO: in these methods, do we really need to deepcopy?
+     deepCopyPartitions(r.getPartitions(), result);
+     return !r.isSetHasUnknownPartitions() || r.isHasUnknownPartitions(); // Assume the worst.
+   }
+ 
+   @Override
+   public Database getDatabase(String name) throws TException {
+     return getDatabase(getDefaultCatalog(conf), name);
+   }
+ 
+   @Override
+   public Database getDatabase(String catalogName, String databaseName) throws TException {
+     Database d = client.get_database(prependCatalogToDbName(catalogName, databaseName, conf));
+     return deepCopy(filterHook.filterDatabase(d));
+   }
+ 
+   @Override
+   public Partition getPartition(String db_name, String tbl_name, List<String> part_vals)
+       throws TException {
+     return getPartition(getDefaultCatalog(conf), db_name, tbl_name, part_vals);
+   }
+ 
+   @Override
+   public Partition getPartition(String catName, String dbName, String tblName,
+                                 List<String> partVals) throws TException {
+     Partition p = client.get_partition(prependCatalogToDbName(catName, dbName, conf), tblName, partVals);
+     return deepCopy(filterHook.filterPartition(p));
+   }
+ 
+   @Override
+   public List<Partition> getPartitionsByNames(String db_name, String tbl_name,
+       List<String> part_names) throws TException {
+     return getPartitionsByNames(getDefaultCatalog(conf), db_name, tbl_name, part_names);
+   }
+ 
+   @Override
+   public List<Partition> getPartitionsByNames(String catName, String db_name, String tbl_name,
+                                               List<String> part_names) throws TException {
+     List<Partition> parts =
+         client.get_partitions_by_names(prependCatalogToDbName(catName, db_name, conf), tbl_name, part_names);
+     return deepCopyPartitions(filterHook.filterPartitions(parts));
+   }
+ 
+   @Override
+   public PartitionValuesResponse listPartitionValues(PartitionValuesRequest request)
+       throws MetaException, TException, NoSuchObjectException {
+     if (!request.isSetCatName()) {
+       request.setCatName(getDefaultCatalog(conf));
+     }
+     return client.get_partition_values(request);
+   }
+ 
+   @Override
+   public Partition getPartitionWithAuthInfo(String db_name, String tbl_name,
+       List<String> part_vals, String user_name, List<String> group_names)
+       throws TException {
+     return getPartitionWithAuthInfo(getDefaultCatalog(conf), db_name, tbl_name, part_vals,
+         user_name, group_names);
+   }
+ 
+   @Override
+   public Partition getPartitionWithAuthInfo(String catName, String dbName, String tableName,
+                                             List<String> pvals, String userName,
+                                             List<String> groupNames) throws TException {
+     Partition p = client.get_partition_with_auth(prependCatalogToDbName(catName, dbName, conf), tableName,
+         pvals, userName, groupNames);
+     return deepCopy(filterHook.filterPartition(p));
+   }
+ 
+   @Override
+   public Table getTable(String dbname, String name) throws TException {
+     return getTable(getDefaultCatalog(conf), dbname, name);
+   }
+ 
+   @Override
++  public Table getTable(String dbname, String name,
++                 long txnId, String validWriteIdList)
++      throws MetaException, TException, NoSuchObjectException{
++    return getTable(getDefaultCatalog(conf), dbname, name,
++        txnId, validWriteIdList);
++  };
++
++  @Override
+   public Table getTable(String catName, String dbName, String tableName) throws TException {
+     GetTableRequest req = new GetTableRequest(dbName, tableName);
+     req.setCatName(catName);
+     req.setCapabilities(version);
+     Table t = client.get_table_req(req).getTable();
+     return deepCopy(filterHook.filterTable(t));
+   }
+ 
+   @Override
++  public Table getTable(String catName, String dbName, String tableName,
++    long txnId, String validWriteIdList) throws TException {
++    GetTableRequest req = new GetTableRequest(dbName, tableName);
++    req.setCatName(catName);
++    req.setCapabilities(version);
++    req.setTxnId(txnId);
++    req.setValidWriteIdList(validWriteIdList);
++    Table t = client.get_table_req(req).getTable();
++    return deepCopy(filterHook.filterTable(t));
++  }
++
++  @Override
+   public List<Table> getTableObjectsByName(String dbName, List<String> tableNames)
+       throws TException {
+     return getTableObjectsByName(getDefaultCatalog(conf), dbName, tableNames);
+   }
+ 
+   @Override
+   public List<Table> getTableObjectsByName(String catName, String dbName,
+                                            List<String> tableNames) throws TException {
+     GetTablesRequest req = new GetTablesRequest(dbName);
+     req.setCatName(catName);
+     req.setTblNames(tableNames);
+     req.setCapabilities(version);
+     List<Table> tabs = client.get_table_objects_by_name_req(req).getTables();
+     return deepCopyTables(filterHook.filterTables(tabs));
+   }
+ 
+   @Override
+   public Materialization getMaterializationInvalidationInfo(CreationMetadata cm, String validTxnList)
+       throws MetaException, InvalidOperationException, UnknownDBException, TException {
+     return client.get_materialization_invalidation_info(cm, validTxnList);
+   }
+ 
+   @Override
+   public void updateCreationMetadata(String dbName, String tableName, CreationMetadata cm)
+       throws MetaException, InvalidOperationException, UnknownDBException, TException {
+     client.update_creation_metadata(getDefaultCatalog(conf), dbName, tableName, cm);
+   }
+ 
+   @Override
+   public void updateCreationMetadata(String catName, String dbName, String tableName,
+                                      CreationMetadata cm) throws MetaException, TException {
+     client.update_creation_metadata(catName, dbName, tableName, cm);
+ 
+   }
+ 
+   /** {@inheritDoc} */
+   @Override
+   public List<String> listTableNamesByFilter(String dbName, String filter, short maxTables)
+       throws TException {
+     return listTableNamesByFilter(getDefaultCatalog(conf), dbName, filter, maxTables);
+   }
+ 
+   @Override
+   public List<String> listTableNamesByFilter(String catName, String dbName, String filter,
+                                              int maxTables) throws TException {
+     return filterHook.filterTableNames(catName, dbName,
+         client.get_table_names_by_filter(prependCatalogToDbName(catName, dbName, conf), filter,
+             shrinkMaxtoShort(maxTables)));
+   }
+ 
+   /**
+    * @param name
+    * @return the type
+    * @throws MetaException
+    * @throws TException
+    * @throws NoSuchObjectException
+    * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#get_type(java.lang.String)
+    */
+   public Type getType(String name) throws NoSuchObjectException, MetaException, TException {
+     return deepCopy(client.get_type(name));
+   }
+ 
+   @Override
+   public List<String> getTables(String dbname, String tablePattern) throws MetaException {
+     try {
+       return getTables(getDefaultCatalog(conf), dbname, tablePattern);
+     } catch (Exception e) {
+       MetaStoreUtils.logAndThrowMetaException(e);
+     }
+     return null;
+   }
+ 
+   @Override
+   public List<String> getTables(String catName, String dbName, String tablePattern)
+       throws TException {
+     return filterHook.filterTableNames(catName, dbName,
+         client.get_tables(prependCatalogToDbName(catName, dbName, conf), tablePattern));
+   }
+ 
+   @Override
+   public List<String> getTables(String dbname, String tablePattern, TableType tableType) throws MetaException {
+     try {
+       return getTables(getDefaultCatalog(conf), dbname, tablePattern, tableType);
+     } catch (Exception e) {
+       MetaStoreUtils.logAndThrowMetaException(e);
+     }
+     return null;
+   }
+ 
+   @Override
+   public List<String> getTables(String catName, String dbName, String tablePattern,
+                                 TableType tableType) throws TException {
+     return filterHook.filterTableNames(catName, dbName,
+         client.get_tables_by_type(prependCatalogToDbName(catName, dbName, conf), tablePattern,
+             tableType.toString()));
+   }
+ 
+   @Override
+   public List<String> getMaterializedViewsForRewriting(String dbName) throws TException {
+     return getMaterializedViewsForRewriting(getDefaultCatalog(conf), dbName);
+   }
+ 
+   @Override
+   public List<String> getMaterializedViewsForRewriting(String catName, String dbname)
+       throws MetaException {
+     try {
+       return filterHook.filterTableNames(catName, dbname,
+           client.get_materialized_views_for_rewriting(prependCatalogToDbName(catName, dbname, conf)));
+     } catch (Exception e) {
+       MetaStoreUtils.logAndThrowMetaException(e);
+     }
+     return null;
+   }
+ 
+   @Override
+   public List<TableMeta> getTableMeta(String dbPatterns, String tablePatterns, List<String> tableTypes)
+       throws MetaException {
+     try {
+       return getTableMeta(getDefaultCatalog(conf), dbPatterns, tablePatterns, tableTypes);
+     } catch (Exception e) {
+       MetaStoreUtils.logAndThrowMetaException(e);
+     }
+     return null;
+   }
+ 
+   @Override
+   public List<TableMeta> getTableMeta(String catName, String dbPatterns, String tablePatterns,
+                                       List<String> tableTypes) throws TException {
+     return filterHook.filterTableMetas(client.get_table_meta(prependCatalogToDbName(
+         catName, dbPatterns, conf), tablePatterns, tableTypes));
+   }
+ 
+   @Override
+   public List<String> getAllTables(String dbname) throws MetaException {
+     try {
+       return getAllTables(getDefaultCatalog(conf), dbname);
+     } catch (Exception e) {
+       MetaStoreUtils.logAndThrowMetaException(e);
+     }
+     return null;
+   }
+ 
+   @Override
+   public List<String> getAllTables(String catName, String dbName) throws TException {
+     return filterHook.filterTableNames(catName, dbName, client.get_all_tables(
+         prependCatalogToDbName(catName, dbName, conf)));
+   }
+ 
+   @Override
+   public boolean tableExists(String databaseName, String tableName) throws TException {
+     return tableExists(getDefaultCatalog(conf), databaseName, tableName);
+   }
+ 
+   @Override
+   public boolean tableExists(String catName, String dbName, String tableName) throws TException {
+     try {
+       GetTableRequest req = new GetTableRequest(dbName, tableName);
+       req.setCatName(catName);
+       req.setCapabilities(version);
+       return filterHook.filterTable(client.get_table_req(req).getTable()) != null;
+     } catch (NoSuchObjectException e) {
+       return false;
+     }
+   }
+ 
+   @Override
+   public List<String> listPartitionNames(String dbName, String tblName,
+       short max) throws NoSuchObjectException, MetaException, TException {
+     return listPartitionNames(getDefaultCatalog(conf), dbName, tblName, max);
+   }
+ 
+   @Override
+   public List<String> listPartitionNames(String catName, String dbName, String tableName,
+                                          int maxParts) throws TException {
+     return filterHook.filterPartitionNames(catName, dbName, tableName,
+         client.get_partition_names(prependCatalogToDbName(catName, dbName, conf), tableName, shrinkMaxtoShort(maxParts)));
+   }
+ 
+   @Override
+   public List<String> listPartitionNames(String db_name, String tbl_name,
+       List<String> part_vals, short max_parts) throws TException {
+     return listPartitionNames(getDefaultCatalog(conf), db_name, tbl_name, part_vals, max_parts);
+   }
+ 
+   @Override
+   public List<String> listPartitionNames(String catName, String db_name, String tbl_name,
+                                          List<String> part_vals, int max_parts) throws TException {
+     return filterHook.filterPartitionNames(catName, db_name, tbl_name,
+         client.get_partition_names_ps(prependCatalogToDbName(catName, db_name, conf), tbl_name,
+             part_vals, shrinkMaxtoShort(max_parts)));
+   }
+ 
+   @Override
+   public int getNumPartitionsByFilter(String db_name, String tbl_name,
+                                       String filter) throws TException {
+     return getNumPartitionsByFilter(getDefaultCatalog(conf), db_name, tbl_name, filter);
+   }
+ 
+   @Override
+   public int getNumPartitionsByFilter(String catName, String dbName, String tableName,
+                                       String filter) throws TException {
+     return client.get_num_partitions_by_filter(prependCatalogToDbName(catName, dbName, conf), tableName,
+         filter);
+   }
+ 
+   @Override
+   public void alter_partition(String dbName, String tblName, Partition newPart)
+       throws InvalidOperationException, MetaException, TException {
+     alter_partition(getDefaultCatalog(conf), dbName, tblName, newPart, null);
+   }
+ 
+   @Override
 -  public void alter_partition(String dbName, String tblName, Partition newPart, EnvironmentContext environmentContext)
++  public void alter_partition(String dbName, String tblName, Partition newPart,
++      EnvironmentContext environmentContext)
+       throws InvalidOperationException, MetaException, TException {
+     alter_partition(getDefaultCatalog(conf), dbName, tblName, newPart, environmentContext);
+   }
+ 
+   @Override
+   public void alter_partition(String catName, String dbName, String tblName, Partition newPart,
+                               EnvironmentContext environmentContext) throws TException {
 -    client.alter_partition_with_environment_context(prependCatalogToDbName(catName, dbName, conf), tblName,
 -        newPart, environmentContext);
++    AlterPartitionsRequest req = new AlterPartitionsRequest(dbName, tblName, Lists.newArrayList(newPart));
++    req.setCatName(catName);
++    req.setEnvironmentContext(environmentContext);
++    client.alter_partitions_req(req);
+   }
+ 
+   @Override
++  public void alter_partition(String dbName, String tblName, Partition newPart,
++      EnvironmentContext environmentContext, long txnId, String writeIdList)
++      throws InvalidOperationException, MetaException, TException {
++    AlterPartitionsRequest req = new AlterPartitionsRequest(
++        dbName, tblName, Lists.newArrayList(newPart));
++    req.setEnvironmentContext(environmentContext);
++    req.setTxnId(txnId);
++    req.setValidWriteIdList(writeIdList);
++    client.alter_partitions_req(req);
++  }
++
++  @Deprecated
++  @Override
+   public void alter_partitions(String dbName, String tblName, List<Partition> newParts)
+       throws TException {
 -    alter_partitions(getDefaultCatalog(conf), dbName, tblName, newParts, null);
++    alter_partitions(
++        getDefaultCatalog(conf), dbName, tblName, newParts, new EnvironmentContext(), -1, null, -1);
+   }
+ 
+   @Override
+   public void alter_partitions(String dbName, String tblName, List<Partition> newParts,
+                                EnvironmentContext environmentContext) throws TException {
 -    alter_partitions(getDefaultCatalog(conf), dbName, tblName, newParts, environmentCo

<TRUNCATED>