You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:31 UTC

[22/50] [abbrv] hive git commit: HIVE-19416 : merge master into branch (Sergey Shelukhin) 0719

http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 0000000,bdcbf41..9eb8424
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@@ -1,0 -1,12207 +1,12509 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.metastore;
+ 
+ import static org.apache.commons.lang.StringUtils.join;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+ import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+ 
+ import java.io.IOException;
+ import java.lang.reflect.Field;
+ import java.net.InetAddress;
+ import java.net.URI;
+ import java.nio.ByteBuffer;
+ import java.sql.Connection;
+ import java.sql.SQLException;
+ import java.sql.SQLIntegrityConstraintViolationException;
+ import java.sql.Statement;
+ import java.time.LocalDateTime;
+ import java.time.format.DateTimeFormatter;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.TreeSet;
+ import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Pattern;
+ 
+ import javax.jdo.JDOCanRetryException;
+ import javax.jdo.JDODataStoreException;
+ import javax.jdo.JDOException;
+ import javax.jdo.JDOHelper;
+ import javax.jdo.JDOObjectNotFoundException;
+ import javax.jdo.PersistenceManager;
+ import javax.jdo.PersistenceManagerFactory;
+ import javax.jdo.Query;
+ import javax.jdo.Transaction;
+ import javax.jdo.datastore.DataStoreCache;
+ import javax.jdo.datastore.JDOConnection;
+ import javax.jdo.identity.IntIdentity;
+ import javax.sql.DataSource;
+ 
+ import com.google.common.base.Strings;
+ 
+ import org.apache.commons.collections.CollectionUtils;
+ import org.apache.commons.lang.ArrayUtils;
+ import org.apache.commons.lang.StringUtils;
+ import org.apache.commons.lang.exception.ExceptionUtils;
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.classification.InterfaceStability;
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.Path;
 -import org.apache.hadoop.hive.common.DatabaseName;
 -import org.apache.hadoop.hive.common.StatsSetupConst;
 -import org.apache.hadoop.hive.common.TableName;
++import org.apache.hadoop.hive.common.*;
+ import org.apache.hadoop.hive.metastore.MetaStoreDirectSql.SqlFilterForPushdown;
 -import org.apache.hadoop.hive.metastore.api.AggrStats;
 -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 -import org.apache.hadoop.hive.metastore.api.Catalog;
 -import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 -import org.apache.hadoop.hive.metastore.api.CreationMetadata;
 -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 -import org.apache.hadoop.hive.metastore.api.Database;
 -import org.apache.hadoop.hive.metastore.api.FieldSchema;
 -import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
 -import org.apache.hadoop.hive.metastore.api.Function;
 -import org.apache.hadoop.hive.metastore.api.FunctionType;
 -import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
 -import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 -import org.apache.hadoop.hive.metastore.api.HiveObjectType;
 -import org.apache.hadoop.hive.metastore.api.ISchema;
 -import org.apache.hadoop.hive.metastore.api.ISchemaName;
 -import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 -import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 -import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 -import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
 -import org.apache.hadoop.hive.metastore.api.MetaException;
 -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 -import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 -import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 -import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
 -import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
 -import org.apache.hadoop.hive.metastore.api.Order;
 -import org.apache.hadoop.hive.metastore.api.Partition;
 -import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 -import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse;
 -import org.apache.hadoop.hive.metastore.api.PartitionValuesRow;
 -import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
 -import org.apache.hadoop.hive.metastore.api.PrincipalType;
 -import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
 -import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo;
 -import org.apache.hadoop.hive.metastore.api.ResourceType;
 -import org.apache.hadoop.hive.metastore.api.ResourceUri;
 -import org.apache.hadoop.hive.metastore.api.Role;
 -import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
 -import org.apache.hadoop.hive.metastore.api.RuntimeStat;
 -import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
 -import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
 -import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 -import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 -import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 -import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 -import org.apache.hadoop.hive.metastore.api.SchemaCompatibility;
 -import org.apache.hadoop.hive.metastore.api.SchemaType;
 -import org.apache.hadoop.hive.metastore.api.SchemaValidation;
 -import org.apache.hadoop.hive.metastore.api.SchemaVersion;
 -import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
 -import org.apache.hadoop.hive.metastore.api.SchemaVersionState;
 -import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 -import org.apache.hadoop.hive.metastore.api.SerdeType;
 -import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 -import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 -import org.apache.hadoop.hive.metastore.api.Table;
 -import org.apache.hadoop.hive.metastore.api.TableMeta;
 -import org.apache.hadoop.hive.metastore.api.Type;
 -import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 -import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 -import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 -import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 -import org.apache.hadoop.hive.metastore.api.WMMapping;
 -import org.apache.hadoop.hive.metastore.api.WMNullablePool;
 -import org.apache.hadoop.hive.metastore.api.WMNullableResourcePlan;
 -import org.apache.hadoop.hive.metastore.api.WMPool;
 -import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
 -import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
 -import org.apache.hadoop.hive.metastore.api.WMResourcePlanStatus;
 -import org.apache.hadoop.hive.metastore.api.WMTrigger;
 -import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
 -import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
++import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
+ import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
+ import org.apache.hadoop.hive.metastore.metrics.Metrics;
+ import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 -import org.apache.hadoop.hive.metastore.model.MCatalog;
 -import org.apache.hadoop.hive.metastore.model.MColumnDescriptor;
 -import org.apache.hadoop.hive.metastore.model.MConstraint;
 -import org.apache.hadoop.hive.metastore.model.MCreationMetadata;
 -import org.apache.hadoop.hive.metastore.model.MDBPrivilege;
 -import org.apache.hadoop.hive.metastore.model.MDatabase;
 -import org.apache.hadoop.hive.metastore.model.MDelegationToken;
 -import org.apache.hadoop.hive.metastore.model.MFieldSchema;
 -import org.apache.hadoop.hive.metastore.model.MFunction;
 -import org.apache.hadoop.hive.metastore.model.MGlobalPrivilege;
 -import org.apache.hadoop.hive.metastore.model.MISchema;
 -import org.apache.hadoop.hive.metastore.model.MMasterKey;
 -import org.apache.hadoop.hive.metastore.model.MMetastoreDBProperties;
 -import org.apache.hadoop.hive.metastore.model.MNotificationLog;
 -import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
 -import org.apache.hadoop.hive.metastore.model.MOrder;
 -import org.apache.hadoop.hive.metastore.model.MPartition;
 -import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege;
 -import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
 -import org.apache.hadoop.hive.metastore.model.MPartitionEvent;
 -import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege;
 -import org.apache.hadoop.hive.metastore.model.MResourceUri;
 -import org.apache.hadoop.hive.metastore.model.MRole;
 -import org.apache.hadoop.hive.metastore.model.MRoleMap;
 -import org.apache.hadoop.hive.metastore.model.MRuntimeStat;
 -import org.apache.hadoop.hive.metastore.model.MSchemaVersion;
 -import org.apache.hadoop.hive.metastore.model.MSerDeInfo;
 -import org.apache.hadoop.hive.metastore.model.MStorageDescriptor;
 -import org.apache.hadoop.hive.metastore.model.MStringList;
 -import org.apache.hadoop.hive.metastore.model.MTable;
 -import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege;
 -import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
 -import org.apache.hadoop.hive.metastore.model.MTablePrivilege;
 -import org.apache.hadoop.hive.metastore.model.MType;
 -import org.apache.hadoop.hive.metastore.model.MVersionTable;
 -import org.apache.hadoop.hive.metastore.model.MWMMapping;
++import org.apache.hadoop.hive.metastore.model.*;
+ import org.apache.hadoop.hive.metastore.model.MWMMapping.EntityType;
 -import org.apache.hadoop.hive.metastore.model.MWMPool;
 -import org.apache.hadoop.hive.metastore.model.MWMResourcePlan;
+ import org.apache.hadoop.hive.metastore.model.MWMResourcePlan.Status;
 -import org.apache.hadoop.hive.metastore.model.MWMTrigger;
 -import org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog;
+ import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
+ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder;
+ import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+ import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
++import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
++import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+ import org.apache.hadoop.hive.metastore.utils.FileUtils;
+ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+ import org.apache.hadoop.hive.metastore.utils.ObjectPair;
++import org.apache.hive.common.util.TxnIdUtils;
+ import org.apache.thrift.TException;
+ import org.datanucleus.AbstractNucleusContext;
+ import org.datanucleus.ClassLoaderResolver;
+ import org.datanucleus.ClassLoaderResolverImpl;
+ import org.datanucleus.NucleusContext;
+ import org.datanucleus.PropertyNames;
+ import org.datanucleus.api.jdo.JDOPersistenceManager;
+ import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
+ import org.datanucleus.store.rdbms.exceptions.MissingTableException;
+ import org.datanucleus.store.scostore.Store;
+ import org.datanucleus.util.WeakValueMap;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.codahale.metrics.Counter;
+ import com.codahale.metrics.MetricRegistry;
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+ 
+ 
+ /**
+  * This class is the interface between the application logic and the database
+  * store that contains the objects. Refrain putting any logic in mode.M* objects
+  * or in this file as former could be auto generated and this class would need
+  * to be made into a interface that can read both from a database and a
+  * filestore.
+  */
+ public class ObjectStore implements RawStore, Configurable {
+   private static Properties prop = null;
+   private static PersistenceManagerFactory pmf = null;
+   private static boolean forTwoMetastoreTesting = false;
+   private int batchSize = Batchable.NO_BATCHING;
+ 
+   private static final DateTimeFormatter YMDHMS_FORMAT = DateTimeFormatter.ofPattern(
+       "yyyy_MM_dd_HH_mm_ss");
+ 
+   private static Lock pmfPropLock = new ReentrantLock();
+   /**
+   * Verify the schema only once per JVM since the db connection info is static
+   */
+   private final static AtomicBoolean isSchemaVerified = new AtomicBoolean(false);
+   private static final Logger LOG = LoggerFactory.getLogger(ObjectStore.class);
+ 
+   private enum TXN_STATUS {
+     NO_STATE, OPEN, COMMITED, ROLLBACK
+   }
+ 
+   private static final Map<String, Class<?>> PINCLASSMAP;
+   private static final String HOSTNAME;
+   private static final String USER;
+   private static final String JDO_PARAM = ":param";
+   static {
+     Map<String, Class<?>> map = new HashMap<>();
+     map.put("table", MTable.class);
+     map.put("storagedescriptor", MStorageDescriptor.class);
+     map.put("serdeinfo", MSerDeInfo.class);
+     map.put("partition", MPartition.class);
+     map.put("database", MDatabase.class);
+     map.put("type", MType.class);
+     map.put("fieldschema", MFieldSchema.class);
+     map.put("order", MOrder.class);
+     PINCLASSMAP = Collections.unmodifiableMap(map);
+     String hostname = "UNKNOWN";
+     try {
+       InetAddress clientAddr = InetAddress.getLocalHost();
+       hostname = clientAddr.getHostAddress();
+     } catch (IOException e) {
+     }
+     HOSTNAME = hostname;
+     String user = System.getenv("USER");
+     USER = org.apache.commons.lang.StringUtils.defaultString(user, "UNKNOWN");
+   }
+ 
+ 
+   private boolean isInitialized = false;
+   private PersistenceManager pm = null;
+   private SQLGenerator sqlGenerator = null;
+   private MetaStoreDirectSql directSql = null;
+   private DatabaseProduct dbType = null;
+   private PartitionExpressionProxy expressionProxy = null;
+   private Configuration conf;
+   private volatile int openTrasactionCalls = 0;
+   private Transaction currentTransaction = null;
+   private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE;
+   private Pattern partitionValidationPattern;
+   private Counter directSqlErrors;
++  private boolean areTxnStatsSupported = false;
+ 
+   /**
+    * A Autocloseable wrapper around Query class to pass the Query object to the caller and let the caller release
+    * the resources when the QueryWrapper goes out of scope
+    */
+   public static class QueryWrapper implements AutoCloseable {
+     public Query query;
+ 
+     /**
+      * Explicitly closes the query object to release the resources
+      */
+     @Override
+     public void close() {
+       if (query != null) {
+         query.closeAll();
+         query = null;
+       }
+     }
+   }
+ 
+   public ObjectStore() {
+   }
+ 
+   @Override
+   public Configuration getConf() {
+     return conf;
+   }
+ 
+   /**
+    * Called whenever this object is instantiated using ReflectionUtils, and also
+    * on connection retries. In cases of connection retries, conf will usually
+    * contain modified values.
+    */
+   @Override
+   @SuppressWarnings("nls")
+   public void setConf(Configuration conf) {
+     // Although an instance of ObjectStore is accessed by one thread, there may
+     // be many threads with ObjectStore instances. So the static variables
+     // pmf and prop need to be protected with locks.
+     pmfPropLock.lock();
+     try {
+       isInitialized = false;
+       this.conf = conf;
++      this.areTxnStatsSupported = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED);
+       configureSSL(conf);
+       Properties propsFromConf = getDataSourceProps(conf);
+       boolean propsChanged = !propsFromConf.equals(prop);
+ 
+       if (propsChanged) {
+         if (pmf != null){
+           clearOutPmfClassLoaderCache(pmf);
+           if (!forTwoMetastoreTesting) {
+             // close the underlying connection pool to avoid leaks
+             pmf.close();
+           }
+         }
+         pmf = null;
+         prop = null;
+       }
+ 
+       assert(!isActiveTransaction());
+       shutdown();
+       // Always want to re-create pm as we don't know if it were created by the
+       // most recent instance of the pmf
+       pm = null;
+       directSql = null;
+       expressionProxy = null;
+       openTrasactionCalls = 0;
+       currentTransaction = null;
+       transactionStatus = TXN_STATUS.NO_STATE;
+ 
+       initialize(propsFromConf);
+ 
+       String partitionValidationRegex =
+           MetastoreConf.getVar(this.conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN);
+       if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) {
+         partitionValidationPattern = Pattern.compile(partitionValidationRegex);
+       } else {
+         partitionValidationPattern = null;
+       }
+ 
+       // Note, if metrics have not been initialized this will return null, which means we aren't
+       // using metrics.  Thus we should always check whether this is non-null before using.
+       MetricRegistry registry = Metrics.getRegistry();
+       if (registry != null) {
+         directSqlErrors = Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS);
+       }
+ 
+       this.batchSize = MetastoreConf.getIntVar(conf, ConfVars.RAWSTORE_PARTITION_BATCH_SIZE);
+ 
+       if (!isInitialized) {
+         throw new RuntimeException(
+         "Unable to create persistence manager. Check dss.log for details");
+       } else {
+         LOG.debug("Initialized ObjectStore");
+       }
+     } finally {
+       pmfPropLock.unlock();
+     }
+   }
+ 
+   private ClassLoader classLoader;
+   {
+     classLoader = Thread.currentThread().getContextClassLoader();
+     if (classLoader == null) {
+       classLoader = ObjectStore.class.getClassLoader();
+     }
+   }
+ 
+   @SuppressWarnings("nls")
+   private void initialize(Properties dsProps) {
+     int retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS);
+     long retryInterval = MetastoreConf.getTimeVar(conf,
+         ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS);
+     int numTries = retryLimit;
+ 
+     while (numTries > 0){
+       try {
+         initializeHelper(dsProps);
+         return; // If we reach here, we succeed.
+       } catch (Exception e){
+         numTries--;
+         boolean retriable = isRetriableException(e);
+         if ((numTries > 0) && retriable){
+           LOG.info("Retriable exception while instantiating ObjectStore, retrying. " +
+               "{} tries left", numTries, e);
+           try {
+             Thread.sleep(retryInterval);
+           } catch (InterruptedException ie) {
+             // Restore the interrupted status, since we do not want to catch it.
+             LOG.debug("Interrupted while sleeping before retrying.", ie);
+             Thread.currentThread().interrupt();
+           }
+           // If we're here, we'll proceed down the next while loop iteration.
+         } else {
+           // we've reached our limit, throw the last one.
+           if (retriable){
+             LOG.warn("Exception retry limit reached, not retrying any longer.",
+               e);
+           } else {
+             LOG.debug("Non-retriable exception during ObjectStore initialize.", e);
+           }
+           throw e;
+         }
+       }
+     }
+   }
+ 
+   private static final Set<Class<? extends Throwable>> retriableExceptionClasses =
+       new HashSet<>(Arrays.asList(JDOCanRetryException.class));
+   /**
+    * Helper function for initialize to determine if we should retry an exception.
+    * We return true if the exception is of a known type of retriable exceptions, or if one
+    * of its recursive .getCause returns a known type of retriable exception.
+    */
+   private boolean isRetriableException(Throwable e) {
+     if (e == null){
+       return false;
+     }
+     if (retriableExceptionClasses.contains(e.getClass())){
+       return true;
+     }
+     for (Class<? extends Throwable> c : retriableExceptionClasses){
+       if (c.isInstance(e)){
+         return true;
+       }
+     }
+ 
+     if (e.getCause() == null){
+       return false;
+     }
+     return isRetriableException(e.getCause());
+   }
+ 
+   /**
+    * private helper to do initialization routine, so we can retry if needed if it fails.
+    * @param dsProps
+    */
+   private void initializeHelper(Properties dsProps) {
+     LOG.debug("ObjectStore, initialize called");
+     prop = dsProps;
+     pm = getPersistenceManager();
+     try {
+       String productName = MetaStoreDirectSql.getProductName(pm);
+       sqlGenerator = new SQLGenerator(DatabaseProduct.determineDatabaseProduct(productName), conf);
+     } catch (SQLException e) {
+       LOG.error("error trying to figure out the database product", e);
+       throw new RuntimeException(e);
+     }
+     isInitialized = pm != null;
+     if (isInitialized) {
+       dbType = determineDatabaseProduct();
+       expressionProxy = createExpressionProxy(conf);
+       if (MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL)) {
+         String schema = prop.getProperty("javax.jdo.mapping.Schema");
+         schema = org.apache.commons.lang.StringUtils.defaultIfBlank(schema, null);
+         directSql = new MetaStoreDirectSql(pm, conf, schema);
+       }
+     }
+     LOG.debug("RawStore: {}, with PersistenceManager: {}" +
+         " created in the thread with id: {}", this, pm, Thread.currentThread().getId());
+   }
+ 
+   private DatabaseProduct determineDatabaseProduct() {
+     try {
+       return DatabaseProduct.determineDatabaseProduct(getProductName(pm));
+     } catch (SQLException e) {
+       LOG.warn("Cannot determine database product; assuming OTHER", e);
+       return DatabaseProduct.OTHER;
+     }
+   }
+ 
+   private static String getProductName(PersistenceManager pm) {
+     JDOConnection jdoConn = pm.getDataStoreConnection();
+     try {
+       return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName();
+     } catch (Throwable t) {
+       LOG.warn("Error retrieving product name", t);
+       return null;
+     } finally {
+       jdoConn.close(); // We must release the connection before we call other pm methods.
+     }
+   }
+ 
+   /**
+    * Creates the proxy used to evaluate expressions. This is here to prevent circular
+    * dependency - ql -&gt; metastore client &lt;-&gt metastore server -&gt ql. If server and
+    * client are split, this can be removed.
+    * @param conf Configuration.
+    * @return The partition expression proxy.
+    */
+   private static PartitionExpressionProxy createExpressionProxy(Configuration conf) {
+     String className = MetastoreConf.getVar(conf, ConfVars.EXPRESSION_PROXY_CLASS);
+     try {
+       Class<? extends PartitionExpressionProxy> clazz =
+            JavaUtils.getClass(className, PartitionExpressionProxy.class);
+       return JavaUtils.newInstance(clazz, new Class<?>[0], new Object[0]);
+     } catch (MetaException e) {
+       LOG.error("Error loading PartitionExpressionProxy", e);
+       throw new RuntimeException("Error loading PartitionExpressionProxy: " + e.getMessage());
+     }
+   }
+ 
+   /**
+    * Configure the SSL properties of the connection from provided config
+    * @param conf
+    */
+   private static void configureSSL(Configuration conf) {
+     // SSL support
+     String sslPropString = MetastoreConf.getVar(conf, ConfVars.DBACCESS_SSL_PROPS);
+     if (org.apache.commons.lang.StringUtils.isNotEmpty(sslPropString)) {
+       LOG.info("Metastore setting SSL properties of the connection to backed DB");
+       for (String sslProp : sslPropString.split(",")) {
+         String[] pair = sslProp.trim().split("=");
+         if (pair != null && pair.length == 2) {
+           System.setProperty(pair[0].trim(), pair[1].trim());
+         } else {
+           LOG.warn("Invalid metastore property value for {}", ConfVars.DBACCESS_SSL_PROPS);
+         }
+       }
+     }
+   }
+ 
+   /**
+    * Properties specified in hive-default.xml override the properties specified
+    * in jpox.properties.
+    */
+   @SuppressWarnings("nls")
+   private static Properties getDataSourceProps(Configuration conf) {
+     Properties prop = new Properties();
+     correctAutoStartMechanism(conf);
+ 
+     // First, go through and set all our values for datanucleus and javax.jdo parameters.  This
+     // has to be a separate first step because we don't set the default values in the config object.
+     for (ConfVars var : MetastoreConf.dataNucleusAndJdoConfs) {
+       String confVal = MetastoreConf.getAsString(conf, var);
+       String varName = var.getVarname();
+       Object prevVal = prop.setProperty(varName, confVal);
+       if (MetastoreConf.isPrintable(varName)) {
+         LOG.debug("Overriding {} value {} from jpox.properties with {}",
+           varName, prevVal, confVal);
+       }
+     }
+ 
+     // Now, we need to look for any values that the user set that MetastoreConf doesn't know about.
+     // TODO Commenting this out for now, as it breaks because the conf values aren't getting properly
+     // interpolated in case of variables.  See HIVE-17788.
+     /*
+     for (Map.Entry<String, String> e : conf) {
+       if (e.getKey().startsWith("datanucleus.") || e.getKey().startsWith("javax.jdo.")) {
+         // We have to handle this differently depending on whether it is a value known to
+         // MetastoreConf or not.  If it is, we need to get the default value if a value isn't
+         // provided.  If not, we just set whatever the user has set.
+         Object prevVal = prop.setProperty(e.getKey(), e.getValue());
+         if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(e.getKey())) {
+           LOG.debug("Overriding " + e.getKey() + " value " + prevVal
+               + " from  jpox.properties with " + e.getValue());
+         }
+       }
+     }
+     */
+ 
+     // Password may no longer be in the conf, use getPassword()
+     try {
+       String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
+       if (org.apache.commons.lang.StringUtils.isNotEmpty(passwd)) {
+         // We can get away with the use of varname here because varname == hiveName for PWD
+         prop.setProperty(ConfVars.PWD.getVarname(), passwd);
+       }
+     } catch (IOException err) {
+       throw new RuntimeException("Error getting metastore password: " + err.getMessage(), err);
+     }
+ 
+     if (LOG.isDebugEnabled()) {
+       for (Entry<Object, Object> e : prop.entrySet()) {
+         if (MetastoreConf.isPrintable(e.getKey().toString())) {
+           LOG.debug("{} = {}", e.getKey(), e.getValue());
+         }
+       }
+     }
+ 
+     return prop;
+   }
+ 
+   /**
+    * Update conf to set datanucleus.autoStartMechanismMode=ignored.
+    * This is necessary to able to use older version of hive against
+    * an upgraded but compatible metastore schema in db from new version
+    * of hive
+    * @param conf
+    */
+   private static void correctAutoStartMechanism(Configuration conf) {
+     final String autoStartKey = "datanucleus.autoStartMechanismMode";
+     final String autoStartIgnore = "ignored";
+     String currentAutoStartVal = conf.get(autoStartKey);
+     if (!autoStartIgnore.equalsIgnoreCase(currentAutoStartVal)) {
+       LOG.warn("{} is set to unsupported value {} . Setting it to value: {}", autoStartKey,
+         conf.get(autoStartKey), autoStartIgnore);
+     }
+     conf.set(autoStartKey, autoStartIgnore);
+   }
+ 
+   private static synchronized PersistenceManagerFactory getPMF() {
+     if (pmf == null) {
+ 
+       Configuration conf = MetastoreConf.newMetastoreConf();
+       DataSourceProvider dsp = DataSourceProviderFactory.hasProviderSpecificConfigurations(conf) ?
+               DataSourceProviderFactory.getDataSourceProvider(conf) : null;
+ 
+       if (dsp == null) {
+         pmf = JDOHelper.getPersistenceManagerFactory(prop);
+       } else {
+         try {
+           DataSource ds = dsp.create(conf);
+           Map<Object, Object> dsProperties = new HashMap<>();
+           //Any preexisting datanucleus property should be passed along
+           dsProperties.putAll(prop);
+           dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds);
+           dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds);
+           dsProperties.put("javax.jdo.PersistenceManagerFactoryClass",
+               "org.datanucleus.api.jdo.JDOPersistenceManagerFactory");
+           pmf = JDOHelper.getPersistenceManagerFactory(dsProperties);
+         } catch (SQLException e) {
+           LOG.warn("Could not create PersistenceManagerFactory using " +
+               "connection pool properties, will fall back", e);
+           pmf = JDOHelper.getPersistenceManagerFactory(prop);
+         }
+       }
+       DataStoreCache dsc = pmf.getDataStoreCache();
+       if (dsc != null) {
+         String objTypes = MetastoreConf.getVar(conf, ConfVars.CACHE_PINOBJTYPES);
+         LOG.info("Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=\"{}\"", objTypes);
+         if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) {
+           String[] typeTokens = objTypes.toLowerCase().split(",");
+           for (String type : typeTokens) {
+             type = type.trim();
+             if (PINCLASSMAP.containsKey(type)) {
+               dsc.pinAll(true, PINCLASSMAP.get(type));
+             } else {
+               LOG.warn("{} is not one of the pinnable object types: {}", type,
+                 org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), " "));
+             }
+           }
+         }
+       } else {
+         LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes");
+       }
+     }
+     return pmf;
+   }
+ 
+   @InterfaceAudience.LimitedPrivate({"HCATALOG"})
+   @InterfaceStability.Evolving
+   public PersistenceManager getPersistenceManager() {
+     return getPMF().getPersistenceManager();
+   }
+ 
+   @Override
+   public void shutdown() {
+     LOG.debug("RawStore: {}, with PersistenceManager: {} will be shutdown", this, pm);
+     if (pm != null) {
+       pm.close();
+       pm = null;
+     }
+   }
+ 
+   /**
+    * Opens a new one or the one already created Every call of this function must
+    * have corresponding commit or rollback function call
+    *
+    * @return an active transaction
+    */
+ 
+   @Override
+   public boolean openTransaction() {
+     openTrasactionCalls++;
+     if (openTrasactionCalls == 1) {
+       currentTransaction = pm.currentTransaction();
+       currentTransaction.begin();
+       transactionStatus = TXN_STATUS.OPEN;
+     } else {
+       // openTransactionCalls > 1 means this is an interior transaction
+       // We should already have a transaction created that is active.
+       if ((currentTransaction == null) || (!currentTransaction.isActive())){
+         throw new RuntimeException("openTransaction called in an interior"
+             + " transaction scope, but currentTransaction is not active.");
+       }
+     }
+ 
+     boolean result = currentTransaction.isActive();
+     debugLog("Open transaction: count = " + openTrasactionCalls + ", isActive = " + result);
+     return result;
+   }
+ 
+   /**
+    * if this is the commit of the first open call then an actual commit is
+    * called.
+    *
+    * @return Always returns true
+    */
+   @Override
+   @SuppressWarnings("nls")
+   public boolean commitTransaction() {
+     if (TXN_STATUS.ROLLBACK == transactionStatus) {
+       debugLog("Commit transaction: rollback");
+       return false;
+     }
+     if (openTrasactionCalls <= 0) {
+       RuntimeException e = new RuntimeException("commitTransaction was called but openTransactionCalls = "
+           + openTrasactionCalls + ". This probably indicates that there are unbalanced " +
+           "calls to openTransaction/commitTransaction");
+       LOG.error("Unbalanced calls to open/commit Transaction", e);
+       throw e;
+     }
+     if (!currentTransaction.isActive()) {
+       RuntimeException e = new RuntimeException("commitTransaction was called but openTransactionCalls = "
+           + openTrasactionCalls + ". This probably indicates that there are unbalanced " +
+           "calls to openTransaction/commitTransaction");
+       LOG.error("Unbalanced calls to open/commit Transaction", e);
+       throw e;
+     }
+     openTrasactionCalls--;
+     debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive());
+ 
+     if ((openTrasactionCalls == 0) && currentTransaction.isActive()) {
+       transactionStatus = TXN_STATUS.COMMITED;
+       currentTransaction.commit();
+     }
+     return true;
+   }
+ 
+   /**
+    * @return true if there is an active transaction. If the current transaction
+    *         is either committed or rolled back it returns false
+    */
+   @Override
+   public boolean isActiveTransaction() {
+     if (currentTransaction == null) {
+       return false;
+     }
+     return currentTransaction.isActive();
+   }
+ 
+   /**
+    * Rolls back the current transaction if it is active
+    */
+   @Override
+   public void rollbackTransaction() {
+     if (openTrasactionCalls < 1) {
+       debugLog("rolling back transaction: no open transactions: " + openTrasactionCalls);
+       return;
+     }
+     debugLog("Rollback transaction, isActive: " + currentTransaction.isActive());
+     try {
+       if (currentTransaction.isActive()
+           && transactionStatus != TXN_STATUS.ROLLBACK) {
+         currentTransaction.rollback();
+       }
+     } finally {
+       openTrasactionCalls = 0;
+       transactionStatus = TXN_STATUS.ROLLBACK;
+       // remove all detached objects from the cache, since the transaction is
+       // being rolled back they are no longer relevant, and this prevents them
+       // from reattaching in future transactions
+       pm.evictAll();
+     }
+   }
+ 
+   @Override
+   public void createCatalog(Catalog cat) throws MetaException {
+     LOG.debug("Creating catalog " + cat.getName());
+     boolean committed = false;
+     MCatalog mCat = catToMCat(cat);
+     try {
+       openTransaction();
+       pm.makePersistent(mCat);
+       committed = commitTransaction();
+     } finally {
+       if (!committed) {
+         rollbackTransaction();
+       }
+     }
+   }
+ 
+   @Override
+   public void alterCatalog(String catName, Catalog cat)
+       throws MetaException, InvalidOperationException {
+     if (!cat.getName().equals(catName)) {
+       throw new InvalidOperationException("You cannot change a catalog's name");
+     }
+     boolean committed = false;
+     try {
+       MCatalog mCat = getMCatalog(catName);
+       if (org.apache.commons.lang.StringUtils.isNotBlank(cat.getLocationUri())) {
+         mCat.setLocationUri(cat.getLocationUri());
+       }
+       if (org.apache.commons.lang.StringUtils.isNotBlank(cat.getDescription())) {
+         mCat.setDescription(cat.getDescription());
+       }
+       openTransaction();
+       pm.makePersistent(mCat);
+       committed = commitTransaction();
+     } finally {
+       if (!committed) {
+         rollbackTransaction();
+       }
+     }
+   }
+ 
+   @Override
+   public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException {
+     LOG.debug("Fetching catalog " + catalogName);
+     MCatalog mCat = getMCatalog(catalogName);
+     if (mCat == null) {
+       throw new NoSuchObjectException("No catalog " + catalogName);
+     }
+     return mCatToCat(mCat);
+   }
+ 
+   @Override
+   public List<String> getCatalogs() throws MetaException {
+     LOG.debug("Fetching all catalog names");
+     boolean commited = false;
+     List<String> catalogs = null;
+ 
+     String queryStr = "select name from org.apache.hadoop.hive.metastore.model.MCatalog";
+     Query query = null;
+ 
+     openTransaction();
+     try {
+       query = pm.newQuery(queryStr);
+       query.setResult("name");
+       catalogs = new ArrayList<>((Collection<String>) query.execute());
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     Collections.sort(catalogs);
+     return catalogs;
+   }
+ 
+   @Override
+   public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException {
+     LOG.debug("Dropping catalog " + catalogName);
+     boolean committed = false;
+     try {
+       openTransaction();
+       MCatalog mCat = getMCatalog(catalogName);
+       pm.retrieve(mCat);
+       if (mCat == null) {
+         throw new NoSuchObjectException("No catalog " + catalogName);
+       }
+       pm.deletePersistent(mCat);
+       committed = commitTransaction();
+     } finally {
+       if (!committed) {
+         rollbackTransaction();
+       }
+     }
+   }
+ 
+   private MCatalog getMCatalog(String catalogName) throws MetaException {
+     boolean committed = false;
+     Query query = null;
+     try {
+       openTransaction();
+       catalogName = normalizeIdentifier(catalogName);
+       query = pm.newQuery(MCatalog.class, "name == catname");
+       query.declareParameters("java.lang.String catname");
+       query.setUnique(true);
+       MCatalog mCat = (MCatalog)query.execute(catalogName);
+       pm.retrieve(mCat);
+       committed = commitTransaction();
+       return mCat;
+     } finally {
+       rollbackAndCleanup(committed, query);
+     }
+   }
+ 
+   private MCatalog catToMCat(Catalog cat) {
+     MCatalog mCat = new MCatalog();
+     mCat.setName(normalizeIdentifier(cat.getName()));
+     if (cat.isSetDescription()) {
+       mCat.setDescription(cat.getDescription());
+     }
+     mCat.setLocationUri(cat.getLocationUri());
+     return mCat;
+   }
+ 
+   private Catalog mCatToCat(MCatalog mCat) {
+     Catalog cat = new Catalog(mCat.getName(), mCat.getLocationUri());
+     if (mCat.getDescription() != null) {
+       cat.setDescription(mCat.getDescription());
+     }
+     return cat;
+   }
+ 
+   @Override
+   public void createDatabase(Database db) throws InvalidObjectException, MetaException {
+     boolean commited = false;
+     MDatabase mdb = new MDatabase();
+     assert db.getCatalogName() != null;
+     mdb.setCatalogName(normalizeIdentifier(db.getCatalogName()));
+     assert mdb.getCatalogName() != null;
+     mdb.setName(db.getName().toLowerCase());
+     mdb.setLocationUri(db.getLocationUri());
+     mdb.setDescription(db.getDescription());
+     mdb.setParameters(db.getParameters());
+     mdb.setOwnerName(db.getOwnerName());
+     PrincipalType ownerType = db.getOwnerType();
+     mdb.setOwnerType((null == ownerType ? PrincipalType.USER.name() : ownerType.name()));
+     try {
+       openTransaction();
+       pm.makePersistent(mdb);
+       commited = commitTransaction();
+     } finally {
+       if (!commited) {
+         rollbackTransaction();
+       }
+     }
+   }
+ 
+   @SuppressWarnings("nls")
+   private MDatabase getMDatabase(String catName, String name) throws NoSuchObjectException {
+     MDatabase mdb = null;
+     boolean commited = false;
+     Query query = null;
+     try {
+       openTransaction();
+       name = normalizeIdentifier(name);
+       catName = normalizeIdentifier(catName);
+       query = pm.newQuery(MDatabase.class, "name == dbname && catalogName == catname");
+       query.declareParameters("java.lang.String dbname, java.lang.String catname");
+       query.setUnique(true);
+       mdb = (MDatabase) query.execute(name, catName);
+       pm.retrieve(mdb);
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     if (mdb == null) {
+       throw new NoSuchObjectException("There is no database " + catName + "." + name);
+     }
+     return mdb;
+   }
+ 
+   @Override
+   public Database getDatabase(String catalogName, String name) throws NoSuchObjectException {
+     MetaException ex = null;
+     Database db = null;
+     try {
+       db = getDatabaseInternal(catalogName, name);
+     } catch (MetaException e) {
+       // Signature restriction to NSOE, and NSOE being a flat exception prevents us from
+       // setting the cause of the NSOE as the MetaException. We should not lose the info
+       // we got here, but it's very likely that the MetaException is irrelevant and is
+       // actually an NSOE message, so we should log it and throw an NSOE with the msg.
+       ex = e;
+     }
+     if (db == null) {
+       LOG.warn("Failed to get database {}.{}, returning NoSuchObjectException",
+           catalogName, name, ex);
+       throw new NoSuchObjectException(name + (ex == null ? "" : (": " + ex.getMessage())));
+     }
+     return db;
+   }
+ 
+   public Database getDatabaseInternal(String catalogName, String name)
+       throws MetaException, NoSuchObjectException {
+     return new GetDbHelper(catalogName, name, true, true) {
+       @Override
+       protected Database getSqlResult(GetHelper<Database> ctx) throws MetaException {
+         return directSql.getDatabase(catalogName, dbName);
+       }
+ 
+       @Override
+       protected Database getJdoResult(GetHelper<Database> ctx) throws MetaException, NoSuchObjectException {
+         return getJDODatabase(catalogName, dbName);
+       }
+     }.run(false);
+    }
+ 
+   public Database getJDODatabase(String catName, String name) throws NoSuchObjectException {
+     MDatabase mdb = null;
+     boolean commited = false;
+     try {
+       openTransaction();
+       mdb = getMDatabase(catName, name);
+       commited = commitTransaction();
+     } finally {
+       if (!commited) {
+         rollbackTransaction();
+       }
+     }
+     Database db = new Database();
+     db.setName(mdb.getName());
+     db.setDescription(mdb.getDescription());
+     db.setLocationUri(mdb.getLocationUri());
+     db.setParameters(convertMap(mdb.getParameters()));
+     db.setOwnerName(mdb.getOwnerName());
+     String type = org.apache.commons.lang.StringUtils.defaultIfBlank(mdb.getOwnerType(), null);
+     PrincipalType principalType = (type == null) ? null : PrincipalType.valueOf(type);
+     db.setOwnerType(principalType);
+     db.setCatalogName(catName);
+     return db;
+   }
+ 
+   /**
+    * Alter the database object in metastore. Currently only the parameters
+    * of the database or the owner can be changed.
+    * @param dbName the database name
+    * @param db the Hive Database object
+    * @throws MetaException
+    * @throws NoSuchObjectException
+    */
+   @Override
+   public boolean alterDatabase(String catName, String dbName, Database db)
+     throws MetaException, NoSuchObjectException {
+ 
+     MDatabase mdb = null;
+     boolean committed = false;
+     try {
+       mdb = getMDatabase(catName, dbName);
+       mdb.setParameters(db.getParameters());
+       mdb.setOwnerName(db.getOwnerName());
+       if (db.getOwnerType() != null) {
+         mdb.setOwnerType(db.getOwnerType().name());
+       }
+       if (org.apache.commons.lang.StringUtils.isNotBlank(db.getDescription())) {
+         mdb.setDescription(db.getDescription());
+       }
+       if (org.apache.commons.lang.StringUtils.isNotBlank(db.getLocationUri())) {
+         mdb.setLocationUri(db.getLocationUri());
+       }
+       openTransaction();
+       pm.makePersistent(mdb);
+       committed = commitTransaction();
+     } finally {
+       if (!committed) {
+         rollbackTransaction();
+         return false;
+       }
+     }
+     return true;
+   }
+ 
+   @Override
+   public boolean dropDatabase(String catName, String dbname)
+       throws NoSuchObjectException, MetaException {
+     boolean success = false;
+     LOG.info("Dropping database {}.{} along with all tables", catName, dbname);
+     dbname = normalizeIdentifier(dbname);
+     catName = normalizeIdentifier(catName);
+     QueryWrapper queryWrapper = new QueryWrapper();
+     try {
+       openTransaction();
+ 
+       // then drop the database
+       MDatabase db = getMDatabase(catName, dbname);
+       pm.retrieve(db);
+       if (db != null) {
+         List<MDBPrivilege> dbGrants = this.listDatabaseGrants(catName, dbname, null, queryWrapper);
+         if (CollectionUtils.isNotEmpty(dbGrants)) {
+           pm.deletePersistentAll(dbGrants);
+         }
+         pm.deletePersistent(db);
+       }
+       success = commitTransaction();
+     } finally {
+       rollbackAndCleanup(success, queryWrapper);
+     }
+     return success;
+   }
+ 
+   @Override
+   public List<String> getDatabases(String catName, String pattern) throws MetaException {
+     if (pattern == null || pattern.equals("*")) {
+       return getAllDatabases(catName);
+     }
+     boolean commited = false;
+     List<String> databases = null;
+     Query query = null;
+     try {
+       openTransaction();
+       // Take the pattern and split it on the | to get all the composing
+       // patterns
+       String[] subpatterns = pattern.trim().split("\\|");
+       StringBuilder filterBuilder = new StringBuilder();
+       List<String> parameterVals = new ArrayList<>(subpatterns.length);
+       appendSimpleCondition(filterBuilder, "catalogName", new String[] {catName}, parameterVals);
+       appendPatternCondition(filterBuilder, "name", subpatterns, parameterVals);
+       query = pm.newQuery(MDatabase.class, filterBuilder.toString());
+       query.setResult("name");
+       query.setOrdering("name ascending");
+       Collection<String> names = (Collection<String>) query.executeWithArray(parameterVals.toArray(new String[0]));
+       databases = new ArrayList<>(names);
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     return databases;
+   }
+ 
+   @Override
+   public List<String> getAllDatabases(String catName) throws MetaException {
+     boolean commited = false;
+     List<String> databases = null;
+ 
+     Query query = null;
+     catName = normalizeIdentifier(catName);
+ 
+     openTransaction();
+     try {
+       query = pm.newQuery("select name from org.apache.hadoop.hive.metastore.model.MDatabase " +
+           "where catalogName == catname");
+       query.declareParameters("java.lang.String catname");
+       query.setResult("name");
+       databases = new ArrayList<>((Collection<String>) query.execute(catName));
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     Collections.sort(databases);
+     return databases;
+   }
+ 
+   private MType getMType(Type type) {
+     List<MFieldSchema> fields = new ArrayList<>();
+     if (type.getFields() != null) {
+       for (FieldSchema field : type.getFields()) {
+         fields.add(new MFieldSchema(field.getName(), field.getType(), field
+             .getComment()));
+       }
+     }
+     return new MType(type.getName(), type.getType1(), type.getType2(), fields);
+   }
+ 
+   private Type getType(MType mtype) {
+     List<FieldSchema> fields = new ArrayList<>();
+     if (mtype.getFields() != null) {
+       for (MFieldSchema field : mtype.getFields()) {
+         fields.add(new FieldSchema(field.getName(), field.getType(), field
+             .getComment()));
+       }
+     }
+     Type ret = new Type();
+     ret.setName(mtype.getName());
+     ret.setType1(mtype.getType1());
+     ret.setType2(mtype.getType2());
+     ret.setFields(fields);
+     return ret;
+   }
+ 
+   @Override
+   public boolean createType(Type type) {
+     boolean success = false;
+     MType mtype = getMType(type);
+     boolean commited = false;
+     try {
+       openTransaction();
+       pm.makePersistent(mtype);
+       commited = commitTransaction();
+       success = true;
+     } finally {
+       if (!commited) {
+         rollbackTransaction();
+       }
+     }
+     return success;
+   }
+ 
+   @Override
+   public Type getType(String typeName) {
+     Type type = null;
+     boolean commited = false;
+     Query query = null;
+     try {
+       openTransaction();
+       query = pm.newQuery(MType.class, "name == typeName");
+       query.declareParameters("java.lang.String typeName");
+       query.setUnique(true);
+       MType mtype = (MType) query.execute(typeName.trim());
+       pm.retrieve(type);
+       if (mtype != null) {
+         type = getType(mtype);
+       }
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     return type;
+   }
+ 
+   @Override
+   public boolean dropType(String typeName) {
+     boolean success = false;
+     Query query = null;
+     try {
+       openTransaction();
+       query = pm.newQuery(MType.class, "name == typeName");
+       query.declareParameters("java.lang.String typeName");
+       query.setUnique(true);
+       MType type = (MType) query.execute(typeName.trim());
+       pm.retrieve(type);
+       if (type != null) {
+         pm.deletePersistent(type);
+       }
+       success = commitTransaction();
+     } catch (JDOObjectNotFoundException e) {
+       success = commitTransaction();
+       LOG.debug("type not found {}", typeName, e);
+     } finally {
+       rollbackAndCleanup(success, query);
+     }
+     return success;
+   }
+ 
+   @Override
+   public List<String> createTableWithConstraints(Table tbl,
+     List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
+     List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints,
+     List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints)
+     throws InvalidObjectException, MetaException {
+     boolean success = false;
+     try {
+       openTransaction();
+       createTable(tbl);
+       // Add constraints.
+       // We need not do a deep retrieval of the Table Column Descriptor while persisting the
+       // constraints since this transaction involving create table is not yet committed.
+       List<String> constraintNames = new ArrayList<>();
+       if (foreignKeys != null) {
+         constraintNames.addAll(addForeignKeys(foreignKeys, false, primaryKeys, uniqueConstraints));
+       }
+       if (primaryKeys != null) {
+         constraintNames.addAll(addPrimaryKeys(primaryKeys, false));
+       }
+       if (uniqueConstraints != null) {
+         constraintNames.addAll(addUniqueConstraints(uniqueConstraints, false));
+       }
+       if (notNullConstraints != null) {
+         constraintNames.addAll(addNotNullConstraints(notNullConstraints, false));
+       }
+       if (defaultConstraints != null) {
+         constraintNames.addAll(addDefaultConstraints(defaultConstraints, false));
+       }
+       if (checkConstraints != null) {
+         constraintNames.addAll(addCheckConstraints(checkConstraints, false));
+       }
+       success = commitTransaction();
+       return constraintNames;
+     } finally {
+       if (!success) {
+         rollbackTransaction();
+       }
+     }
+   }
+ 
+   @Override
+   public void createTable(Table tbl) throws InvalidObjectException, MetaException {
+     boolean commited = false;
++    MTable mtbl = null;
++
+     try {
+       openTransaction();
+ 
 -      MTable mtbl = convertToMTable(tbl);
++      mtbl = convertToMTable(tbl);
++      if (TxnUtils.isTransactionalTable(tbl)) {
++        mtbl.setWriteId(tbl.getWriteId());
++      }
+       pm.makePersistent(mtbl);
+ 
+       if (tbl.getCreationMetadata() != null) {
+         MCreationMetadata mcm = convertToMCreationMetadata(tbl.getCreationMetadata());
+         pm.makePersistent(mcm);
+       }
+ 
+       PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges();
+       List<Object> toPersistPrivObjs = new ArrayList<>();
+       if (principalPrivs != null) {
+         int now = (int)(System.currentTimeMillis()/1000);
+ 
+         Map<String, List<PrivilegeGrantInfo>> userPrivs = principalPrivs.getUserPrivileges();
+         putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, PrincipalType.USER, "SQL");
+ 
+         Map<String, List<PrivilegeGrantInfo>> groupPrivs = principalPrivs.getGroupPrivileges();
+         putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP, "SQL");
+ 
+         Map<String, List<PrivilegeGrantInfo>> rolePrivs = principalPrivs.getRolePrivileges();
+         putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE, "SQL");
+       }
+       pm.makePersistentAll(toPersistPrivObjs);
+       commited = commitTransaction();
+     } finally {
+       if (!commited) {
+         rollbackTransaction();
+       }
+     }
+   }
+ 
+   /**
+    * Convert PrivilegeGrantInfo from privMap to MTablePrivilege, and add all of
+    * them to the toPersistPrivObjs. These privilege objects will be persisted as
+    * part of createTable.
+    *
+    * @param mtbl
+    * @param toPersistPrivObjs
+    * @param now
+    * @param privMap
+    * @param type
+    */
+   private void putPersistentPrivObjects(MTable mtbl, List<Object> toPersistPrivObjs,
+       int now, Map<String, List<PrivilegeGrantInfo>> privMap, PrincipalType type, String authorizer) {
+     if (privMap != null) {
+       for (Map.Entry<String, List<PrivilegeGrantInfo>> entry : privMap
+           .entrySet()) {
+         String principalName = entry.getKey();
+         List<PrivilegeGrantInfo> privs = entry.getValue();
+         for (int i = 0; i < privs.size(); i++) {
+           PrivilegeGrantInfo priv = privs.get(i);
+           if (priv == null) {
+             continue;
+           }
+           MTablePrivilege mTblSec = new MTablePrivilege(
+               principalName, type.toString(), mtbl, priv.getPrivilege(),
+               now, priv.getGrantor(), priv.getGrantorType().toString(), priv
+                   .isGrantOption(), authorizer);
+           toPersistPrivObjs.add(mTblSec);
+         }
+       }
+     }
+   }
+ 
+   @Override
+   public boolean dropTable(String catName, String dbName, String tableName)
+       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
+     boolean materializedView = false;
+     boolean success = false;
+     try {
+       openTransaction();
+       MTable tbl = getMTable(catName, dbName, tableName);
+       pm.retrieve(tbl);
+       if (tbl != null) {
+         materializedView = TableType.MATERIALIZED_VIEW.toString().equals(tbl.getTableType());
+         // first remove all the grants
+         List<MTablePrivilege> tabGrants = listAllTableGrants(catName, dbName, tableName);
+         if (CollectionUtils.isNotEmpty(tabGrants)) {
+           pm.deletePersistentAll(tabGrants);
+         }
+         List<MTableColumnPrivilege> tblColGrants = listTableAllColumnGrants(catName, dbName,
+             tableName);
+         if (CollectionUtils.isNotEmpty(tblColGrants)) {
+           pm.deletePersistentAll(tblColGrants);
+         }
+ 
+         List<MPartitionPrivilege> partGrants = this.listTableAllPartitionGrants(catName, dbName, tableName);
+         if (CollectionUtils.isNotEmpty(partGrants)) {
+           pm.deletePersistentAll(partGrants);
+         }
+ 
+         List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(catName, dbName,
+             tableName);
+         if (CollectionUtils.isNotEmpty(partColGrants)) {
+           pm.deletePersistentAll(partColGrants);
+         }
+         // delete column statistics if present
+         try {
+           deleteTableColumnStatistics(catName, dbName, tableName, null);
+         } catch (NoSuchObjectException e) {
+           LOG.info("Found no table level column statistics associated with {} to delete",
+               TableName.getQualified(catName, dbName, tableName));
+         }
+ 
++        // TODO## remove? unused
++        Table table = convertToTable(tbl);
++
+         List<MConstraint> tabConstraints = listAllTableConstraintsWithOptionalConstraintName(
+                                            catName, dbName, tableName, null);
+         if (CollectionUtils.isNotEmpty(tabConstraints)) {
+           pm.deletePersistentAll(tabConstraints);
+         }
+ 
+         preDropStorageDescriptor(tbl.getSd());
+ 
+         if (materializedView) {
+           dropCreationMetadata(tbl.getDatabase().getCatalogName(),
+               tbl.getDatabase().getName(), tbl.getTableName());
+         }
+ 
+         // then remove the table
+         pm.deletePersistentAll(tbl);
+       }
+       success = commitTransaction();
+     } finally {
+       if (!success) {
+         rollbackTransaction();
+       }
+     }
+     return success;
+   }
+ 
+   private boolean dropCreationMetadata(String catName, String dbName, String tableName) throws MetaException,
+       NoSuchObjectException, InvalidObjectException, InvalidInputException {
+     boolean success = false;
+     dbName = normalizeIdentifier(dbName);
+     tableName = normalizeIdentifier(tableName);
+     try {
+       openTransaction();
+       MCreationMetadata mcm = getCreationMetadata(catName, dbName, tableName);
+       pm.retrieve(mcm);
+       if (mcm != null) {
+         pm.deletePersistentAll(mcm);
+       }
+       success = commitTransaction();
+     } finally {
+       if (!success) {
+         rollbackTransaction();
+       }
+     }
+     return success;
+   }
+ 
+   private List<MConstraint> listAllTableConstraintsWithOptionalConstraintName(
+       String catName, String dbName, String tableName, String constraintname) {
+     catName = normalizeIdentifier(catName);
+     dbName = normalizeIdentifier(dbName);
+     tableName = normalizeIdentifier(tableName);
+     constraintname = constraintname!=null?normalizeIdentifier(constraintname):null;
+     List<MConstraint> mConstraints = null;
+     List<String> constraintNames = new ArrayList<>();
+     Query query = null;
+ 
+     try {
+       query = pm.newQuery("select constraintName from org.apache.hadoop.hive.metastore.model.MConstraint  where "
+         + "((parentTable.tableName == ptblname && parentTable.database.name == pdbname && " +
+               "parentTable.database.catalogName == pcatname) || "
+         + "(childTable != null && childTable.tableName == ctblname &&" +
+               "childTable.database.name == cdbname && childTable.database.catalogName == ccatname)) " +
+           (constraintname != null ? " && constraintName == constraintname" : ""));
+       query.declareParameters("java.lang.String ptblname, java.lang.String pdbname,"
+           + "java.lang.String pcatname, java.lang.String ctblname, java.lang.String cdbname," +
+           "java.lang.String ccatname" +
+         (constraintname != null ? ", java.lang.String constraintname" : ""));
+       Collection<?> constraintNamesColl =
+         constraintname != null ?
+           ((Collection<?>) query.
+             executeWithArray(tableName, dbName, catName, tableName, dbName, catName, constraintname)):
+           ((Collection<?>) query.
+             executeWithArray(tableName, dbName, catName, tableName, dbName, catName));
+       for (Iterator<?> i = constraintNamesColl.iterator(); i.hasNext();) {
+         String currName = (String) i.next();
+         constraintNames.add(currName);
+       }
+       query = pm.newQuery(MConstraint.class);
+       query.setFilter("param.contains(constraintName)");
+       query.declareParameters("java.util.Collection param");
+       Collection<?> constraints = (Collection<?>)query.execute(constraintNames);
+       mConstraints = new ArrayList<>();
+       for (Iterator<?> i = constraints.iterator(); i.hasNext();) {
+         MConstraint currConstraint = (MConstraint) i.next();
+         mConstraints.add(currConstraint);
+       }
+     } finally {
+       if (query != null) {
+         query.closeAll();
+       }
+     }
+     return mConstraints;
+   }
+ 
++  private static String getFullyQualifiedTableName(String dbName, String tblName) {
++    return ((dbName == null || dbName.isEmpty()) ? "" : "\"" + dbName + "\".\"")
++        + "\"" + tblName + "\"";
++  }
++
++  @Override
++  public Table
++  getTable(String catName, String dbName, String tableName)
++      throws MetaException {
++    return getTable(catName, dbName, tableName, -1, null);
++  }
++
+   @Override
 -  public Table getTable(String catName, String dbName, String tableName) throws MetaException {
++  public Table getTable(String catName, String dbName, String tableName,
++                        long txnId, String writeIdList)
++      throws MetaException {
+     boolean commited = false;
+     Table tbl = null;
+     try {
+       openTransaction();
 -      tbl = convertToTable(getMTable(catName, dbName, tableName));
++      MTable mtable = getMTable(catName, dbName, tableName);
++      tbl = convertToTable(mtable);
+       // Retrieve creation metadata if needed
+       if (tbl != null && TableType.MATERIALIZED_VIEW.toString().equals(tbl.getTableType())) {
+         tbl.setCreationMetadata(
 -            convertToCreationMetadata(getCreationMetadata(catName, dbName, tableName)));
++                convertToCreationMetadata(getCreationMetadata(catName, dbName, tableName)));
++      }
++
++      // If transactional non partitioned table,
++      // check whether the current version table statistics
++      // in the metastore comply with the client query's snapshot isolation.
++      // Note: a partitioned table has table stats and table snapshot in MPartiiton.
++      if (writeIdList != null) {
++        boolean isTxn = tbl != null && TxnUtils.isTransactionalTable(tbl);
++        if (isTxn && !areTxnStatsSupported) {
++          StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
++          LOG.info("Removed COLUMN_STATS_ACCURATE from Table's parameters.");
++        } else if (isTxn && tbl.getPartitionKeysSize() == 0) {
++          if (isCurrentStatsValidForTheQuery(mtable, txnId, writeIdList, false)) {
++            tbl.setIsStatsCompliant(true);
++          } else {
++            tbl.setIsStatsCompliant(false);
++            // Do not make persistent the following state since it is the query specific (not global).
++            StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
++            LOG.info("Removed COLUMN_STATS_ACCURATE from Table's parameters.");
++          }
++        }
+       }
+       commited = commitTransaction();
+     } finally {
+       if (!commited) {
+         rollbackTransaction();
+       }
+     }
+     return tbl;
+   }
+ 
+   @Override
+   public List<String> getTables(String catName, String dbName, String pattern)
+       throws MetaException {
+     return getTables(catName, dbName, pattern, null);
+   }
+ 
+   @Override
+   public List<String> getTables(String catName, String dbName, String pattern, TableType tableType)
+       throws MetaException {
+     try {
+       // We only support pattern matching via jdo since pattern matching in Java
+       // might be different than the one used by the metastore backends
+       return getTablesInternal(catName, dbName, pattern, tableType,
+           (pattern == null || pattern.equals(".*")), true);
+     } catch (NoSuchObjectException e) {
+       throw new MetaException(ExceptionUtils.getStackTrace(e));
+     }
+   }
+ 
+   @Override
+   public List<TableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
+     return new GetListHelper<TableName>(null, null, null, true, false) {
+       @Override
+       protected List<TableName> getSqlResult(
+           GetHelper<List<TableName>> ctx) throws MetaException {
+         return directSql.getTableNamesWithStats();
+       }
+ 
+       @Override
+       protected List<TableName> getJdoResult(
+           GetHelper<List<TableName>> ctx) throws MetaException {
+         throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement?
+       }
+     }.run(false);
+   }
+ 
+   @Override
+   public Map<String, List<String>> getPartitionColsWithStats(String catName, String dbName, String tableName)
+       throws MetaException, NoSuchObjectException {
+     return new GetHelper<Map<String, List<String>>>(catName, dbName, null, true, false) {
+       @Override
+       protected Map<String, List<String>> getSqlResult(
+           GetHelper<Map<String, List<String>>> ctx) throws MetaException {
+         try {
+           return directSql.getColAndPartNamesWithStats(catName, dbName, tableName);
+         } catch (Throwable ex) {
+           LOG.error("DirectSQL failed", ex);
+           throw new MetaException(ex.getMessage());
+         }
+       }
+ 
+       @Override
+       protected Map<String, List<String>> getJdoResult(
+           GetHelper<Map<String, List<String>>> ctx) throws MetaException {
+         throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement?
+       }
+ 
+       @Override
+       protected String describeResult() {
+         return results.size() + " partitions";
+       }
+     }.run(false);
+   }
+ 
+   @Override
+   public List<TableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
+     return new GetListHelper<TableName>(null, null, null, true, false) {
+       @Override
+       protected List<TableName> getSqlResult(
+           GetHelper<List<TableName>> ctx) throws MetaException {
+         return directSql.getAllTableNamesForStats();
+       }
+ 
+       @Override
+       protected List<TableName> getJdoResult(
+           GetHelper<List<TableName>> ctx) throws MetaException {
+         boolean commited = false;
+         Query query = null;
+         List<TableName> result = new ArrayList<>();
+         openTransaction();
+         try {
+           String paramStr = "", whereStr = "";
+           for (int i = 0; i < MetaStoreDirectSql.STATS_TABLE_TYPES.length; ++i) {
+             if (i != 0) {
+               paramStr += ", ";
+               whereStr += "||";
+             }
+             paramStr += "java.lang.String tt" + i;
+             whereStr += " tableType == tt" + i;
+           }
+           query = pm.newQuery(MTable.class, whereStr);
+           query.declareParameters(paramStr);
+           @SuppressWarnings("unchecked")
+           Collection<MTable> tbls = (Collection<MTable>) query.executeWithArray(
+               query, MetaStoreDirectSql.STATS_TABLE_TYPES);
+           pm.retrieveAll(tbls);
+           for (MTable tbl : tbls) {
+             result.add(new TableName(
+                 tbl.getDatabase().getCatalogName(), tbl.getDatabase().getName(), tbl.getTableName()));
+           }
+           commited = commitTransaction();
+         } finally {
+           rollbackAndCleanup(commited, query);
+         }
+         return result;
+       }
+     }.run(false);
+   }
+ 
+   protected List<String> getTablesInternal(String catName, String dbName, String pattern,
+                                            TableType tableType, boolean allowSql, boolean allowJdo)
+       throws MetaException, NoSuchObjectException {
+     final String db_name = normalizeIdentifier(dbName);
+     final String cat_name = normalizeIdentifier(catName);
+     return new GetListHelper<String>(cat_name, dbName, null, allowSql, allowJdo) {
+       @Override
+       protected List<String> getSqlResult(GetHelper<List<String>> ctx)
+               throws MetaException {
+         return directSql.getTables(cat_name, db_name, tableType);
+       }
+ 
+       @Override
+       protected List<String> getJdoResult(GetHelper<List<String>> ctx)
+               throws MetaException, NoSuchObjectException {
+         return getTablesInternalViaJdo(cat_name, db_name, pattern, tableType);
+       }
+     }.run(false);
+   }
+ 
+   private List<String> getTablesInternalViaJdo(String catName, String dbName, String pattern,
+                                                TableType tableType) throws MetaException {
+     boolean commited = false;
+     Query query = null;
+     List<String> tbls = null;
+     try {
+       openTransaction();
+       dbName = normalizeIdentifier(dbName);
+       // Take the pattern and split it on the | to get all the composing
+       // patterns
+       List<String> parameterVals = new ArrayList<>();
+       StringBuilder filterBuilder = new StringBuilder();
+       //adds database.name == dbName to the filter
+       appendSimpleCondition(filterBuilder, "database.name", new String[] {dbName}, parameterVals);
+       appendSimpleCondition(filterBuilder, "database.catalogName", new String[] {catName}, parameterVals);
+       if(pattern != null) {
+         appendPatternCondition(filterBuilder, "tableName", pattern, parameterVals);
+       }
+       if(tableType != null) {
+         appendPatternCondition(filterBuilder, "tableType", new String[] {tableType.toString()}, parameterVals);
+       }
+ 
+       query = pm.newQuery(MTable.class, filterBuilder.toString());
+       query.setResult("tableName");
+       query.setOrdering("tableName ascending");
+       Collection<String> names = (Collection<String>) query.executeWithArray(parameterVals.toArray(new String[0]));
+       tbls = new ArrayList<>(names);
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     return tbls;
+   }
+ 
+   @Override
+   public List<String> getMaterializedViewsForRewriting(String catName, String dbName)
+       throws MetaException, NoSuchObjectException {
+     final String db_name = normalizeIdentifier(dbName);
+     catName = normalizeIdentifier(catName);
+     boolean commited = false;
+     Query<?> query = null;
+     List<String> tbls = null;
+     try {
+       openTransaction();
+       dbName = normalizeIdentifier(dbName);
+       query = pm.newQuery(MTable.class,
+           "database.name == db && database.catalogName == cat && tableType == tt && rewriteEnabled == re");
+       query.declareParameters(
+           "java.lang.String db, java.lang.String cat, java.lang.String tt, boolean re");
+       query.setResult("tableName");
+       Collection<String> names = (Collection<String>) query.executeWithArray(
+           db_name, catName, TableType.MATERIALIZED_VIEW.toString(), true);
+       tbls = new ArrayList<>(names);
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     return tbls;
+   }
+ 
+   @Override
+   public int getDatabaseCount() throws MetaException {
+     return getObjectCount("name", MDatabase.class.getName());
+   }
+ 
+   @Override
+   public int getPartitionCount() throws MetaException {
+     return getObjectCount("partitionName", MPartition.class.getName());
+   }
+ 
+   @Override
+   public int getTableCount() throws MetaException {
+     return getObjectCount("tableName", MTable.class.getName());
+   }
+ 
+   private int getObjectCount(String fieldName, String objName) {
+     Long result = 0L;
+     boolean commited = false;
+     Query query = null;
+     try {
+       openTransaction();
+       String queryStr =
+         "select count(" + fieldName + ") from " + objName;
+       query = pm.newQuery(queryStr);
+       result = (Long) query.execute();
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     return result.intValue();
+   }
+ 
+   @Override
+   public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames,
+                                       List<String> tableTypes) throws MetaException {
+ 
+     boolean commited = false;
+     Query query = null;
+     List<TableMeta> metas = new ArrayList<>();
+     try {
+       openTransaction();
+       // Take the pattern and split it on the | to get all the composing
+       // patterns
+       StringBuilder filterBuilder = new StringBuilder();
+       List<String> parameterVals = new ArrayList<>();
+       appendSimpleCondition(filterBuilder, "database.catalogName", new String[] {catName}, parameterVals);
+       if (dbNames != null && !dbNames.equals("*")) {
+         appendPatternCondition(filterBuilder, "database.name", dbNames, parameterVals);
+       }
+       if (tableNames != null && !tableNames.equals("*")) {
+         appendPatternCondition(filterBuilder, "tableName", tableNames, parameterVals);
+       }
+       if (tableTypes != null && !tableTypes.isEmpty()) {
+         appendSimpleCondition(filterBuilder, "tableType", tableTypes.toArray(new String[0]), parameterVals);
+       }
+ 
+       if (LOG.isDebugEnabled()) {
+         LOG.debug("getTableMeta with filter " + filterBuilder.toString() + " params: " +
+             StringUtils.join(parameterVals, ", "));
+       }
+       query = pm.newQuery(MTable.class, filterBuilder.toString());
+       Collection<MTable> tables = (Collection<MTable>) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
+       for (MTable table : tables) {
+         TableMeta metaData = new TableMeta(
+             table.getDatabase().getName(), table.getTableName(), table.getTableType());
+         metaData.setComments(table.getParameters().get("comment"));
+         metas.add(metaData);
+       }
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     return metas;
+   }
+ 
+   private StringBuilder appendPatternCondition(StringBuilder filterBuilder, String fieldName,
+       String[] elements, List<String> parameterVals) {
+     return appendCondition(filterBuilder, fieldName, elements, true, parameterVals);
+   }
+ 
+   private StringBuilder appendPatternCondition(StringBuilder builder,
+       String fieldName, String elements, List<String> parameters) {
+       elements = normalizeIdentifier(elements);
+     return appendCondition(builder, fieldName, elements.split("\\|"), true, parameters);
+   }
+ 
+   private StringBuilder appendSimpleCondition(StringBuilder builder,
+       String fieldName, String[] elements, List<String> parameters) {
+     return appendCondition(builder, fieldName, elements, false, parameters);
+   }
+ 
+   private StringBuilder appendCondition(StringBuilder builder,
+       String fieldName, String[] elements, boolean pattern, List<String> parameters) {
+     if (builder.length() > 0) {
+       builder.append(" && ");
+     }
+     builder.append(" (");
+     int length = builder.length();
+     for (String element : elements) {
+       if (pattern) {
+         element = "(?i)" + element.replaceAll("\\*", ".*");
+       }
+       parameters.add(element);
+       if (builder.length() > length) {
+         builder.append(" || ");
+       }
+       builder.append(fieldName);
+       if (pattern) {
+         builder.append(".matches(").append(JDO_PARAM).append(parameters.size()).append(")");
+       } else {
+         builder.append(" == ").append(JDO_PARAM).append(parameters.size());
+       }
+     }
+     builder.append(" )");
+     return builder;
+   }
+ 
+   @Override
+   public List<String> getAllTables(String catName, String dbName) throws MetaException {
+     return getTables(catName, dbName, ".*");
+   }
+ 
+   class AttachedMTableInfo {
+     MTable mtbl;
+     MColumnDescriptor mcd;
+ 
+     public AttachedMTableInfo() {}
+ 
+     public AttachedMTableInfo(MTable mtbl, MColumnDescriptor mcd) {
+       this.mtbl = mtbl;
+       this.mcd = mcd;
+     }
+   }
+ 
+   private AttachedMTableInfo getMTable(String catName, String db, String table,
+                                        boolean retrieveCD) {
+     AttachedMTableInfo nmtbl = new AttachedMTableInfo();
+     MTable mtbl = null;
+     boolean commited = false;
+     Query query = null;
+     try {
+       openTransaction();
+       catName = normalizeIdentifier(catName);
+       db = normalizeIdentifier(db);
+       table = normalizeIdentifier(table);
+       query = pm.newQuery(MTable.class,
+           "tableName == table && database.name == db && database.catalogName == catname");
+       query.declareParameters(
+           "java.lang.String table, java.lang.String db, java.lang.String catname");
+       query.setUnique(true);
+       LOG.debug("Executing getMTable for " +
+           TableName.getQualified(catName, db, table));
+       mtbl = (MTable) query.execute(table, db, catName);
+       pm.retrieve(mtbl);
+       // Retrieving CD can be expensive and unnecessary, so do it only when required.
+       if (mtbl != null && retrieveCD) {
+         pm.retrieve(mtbl.getSd());
+         pm.retrieveAll(mtbl.getSd().getCD());
+         nmtbl.mcd = mtbl.getSd().getCD();
+       }
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     nmtbl.mtbl = mtbl;
+     return nmtbl;
+   }
+ 
+   private MCreationMetadata getCreationMetadata(String catName, String dbName, String tblName) {
+     boolean commited = false;
+     MCreationMetadata mcm = null;
+     Query query = null;
+     try {
+       openTransaction();
+       query = pm.newQuery(
+           MCreationMetadata.class, "tblName == table && dbName == db && catalogName == cat");
+       query.declareParameters("java.lang.String table, java.lang.String db, java.lang.String cat");
+       query.setUnique(true);
+       mcm = (MCreationMetadata) query.execute(tblName, dbName, catName);
+       pm.retrieve(mcm);
+       commited = commitTransaction();
+     } finally {
+       rollbackAndCleanup(commited, query);
+     }
+     return mcm;
+   }
+ 
+   private MTable getMTable(String catName, String db, String table) {
+     AttachedMTableInfo nmtbl = getMTable(catName, db, table, false);
+     return nmtbl.mtbl;
+   }
+ 
+   @Override
+   public List<Table> getTableObjectsByName(String catName, String db, List<String> tbl_names)
+       throws MetaException, UnknownDBException {
+     List<Table> tables = new ArrayList<>();
+     boolean committed = false;
+     Query dbExistsQuery = null;
+     Query query = null;
+     try {
+       openTransaction();
+       db = normalizeIdentifier(db);
+       catName = normalizeIdentifier(catName);
+ 
+       List<String> lowered_tbl_names = new ArrayList<>(tbl_names.size());
+       for (String t : tbl_names) {
+         lowered_tbl_names.add(normalizeIdentifier(t));
+       }
+       query = pm.newQuery(MTable.class);
+       query.setFilter("database.name == db && database.catalogName == cat && tbl_names.contains(tableName)");
+       query.declareParameters("java.lang.String db, java.lang.String cat, java.util.Collection tbl_names");
+       Collection mtables = (Collection) query.execute(db, catName, lowered_tbl_names);
+       if (mtables == null || mtables.isEmpty()) {
+         // Need to differentiate between an unmatched pattern and a non-existent database
+         dbExistsQuery = pm.newQuery(MDatabase.class, "name == db && catalogName == cat");
+         dbExistsQuery.declareParameters("java.lang.String db, java.lang.String cat");
+         dbExistsQuery.setUnique(true);
+         dbExistsQuery.setResult("name");
+         String dbNameIfExists = (String) dbExistsQuery.execute(db, catName);
+         if (org.apache.commons.lang.StringUtils.isEmpty(dbNameIfExists)) {
+           throw new UnknownDBException("Could not find database " +
+               DatabaseName.getQualified(catName, db));
+         }
+       } else {
+         for (Iterator iter = mtables.iterator(); iter.hasNext(); ) {
+           Table tbl = convertToTable((MTable) iter.next());
+           // Retrieve creation metadata if needed
+           if (TableType.MATERIALIZED_VIEW.toString().equals(tbl.getTableType())) {
+             tbl.setCreationMetadata(
+                 convertToCreationMetadata(
+                     getCreationMetadata(tbl.getCatName(), tbl.getDbName(), tbl.getTableName())));
+           }
+           tables.add(tbl);
+         }
+       }
+       committed = commitTransaction();
+     } finally {
+       rollbackAndCleanup(committed, query);
+       if (dbExistsQuery != null) {
+         dbExistsQuery.closeAll();
+       }
+     }
+     return tables;
+   }
+ 
+   /** Makes shallow copy of a list to avoid DataNucleus mucking with our objects. */
+   private <T> List<T> convertList(List<T> dnList) {
+     return (dnList == null) ? null : Lists.newArrayList(dnList);
+   }
+ 
+   /** Makes shallow copy of a map to avoid DataNucleus mucking with our objects. */
+   private Map<String, String> convertMap(Map<String, String> dnMap) {
+     return MetaStoreUtils.trimMapNulls(dnMap,
+         MetastoreConf.getBoolVar(getConf(), ConfVars.ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS));
+   }
+ 
+   private Table convertToTable(MTable mtbl) throws MetaException {
+     if (mtbl == null) {
+       return null;
+     }
+     String tableType = mtbl.getTableType();
+     if (tableType == null) {
+       // for backwards compatibility with old metastore persistence
+       if (mtbl.getViewOriginalText() != null) {
+         tableType = TableType.VIRTUAL_VIEW.toString();
+       } else if (Boolean.parseBoolean(mtbl.getParameters().get("EXTERNAL"))) {
+         tableType = TableType.EXTERNAL_TABLE.toString();
+       } else {
+         tableType = TableType.MANAGED_TABLE.toString();
+       }
+     }
+     final Table t = new Table(mtbl.getTableName(), mtbl.getDatabase().getName(), mtbl
+         .getOwner(), mtbl.getCreateTime(), mtbl.getLastAccessTime(), mtbl
+         .getRetention(), convertToStorageDescriptor(mtbl.getSd()),
+         convertToFieldSchemas(mtbl.getPartitionKeys()), convertMap(mtbl.getParameters()),
+         mtbl.getViewOriginalText(), mtbl.getViewExpandedText(), tableType);
+ 
+     if (Strings.isNullOrEmpty(mtbl.getOwnerType())) {
+       // Before the ownerType exists in an old Hive schema, USER was the default type for owner.
+       // Let's set the default to USER to keep backward compatibility.
+       t.setOwnerType(PrincipalType.USER);
+     } else {
+       t.setOwnerType(PrincipalType.valueOf(mtbl.getOwnerType()));
+     }
+ 
+     t.setRewriteEnabled(mtbl.isRewriteEnabled());
+     t.setCatName(mtbl.getDatabase().getCatalogName());
++    t.setWriteId(mtbl.getWriteId());
+     return t;
+   }
+ 
+   private MTable convertToMTable(Table tbl) throws InvalidObjectException,
+       MetaException {
++    // NOTE: we don't set writeId in this method. Write ID is only set after validating the
++    //       existing write ID against the caller's valid list.
+     if (tbl == null) {
+       return null;
+     }
+     MDatabase mdb = null;
+     String catName = tbl.isSetCatName() ? tbl.getCatName() : getDefaultCatalog(conf);
+     try {
+       mdb = getMDatabase(catName, tbl.getDbName());
+     } catch (NoSuchObjectException e) {
+       LOG.error("Could not convert to MTable", e);
+       throw new InvalidObjectException("Database " +
+           DatabaseName.getQualified(catName, tbl.getDbName()) + " doesn't exist.");
+     }
+ 
+     // If the table has property EXTERNAL set, update table type
+     // accordingly
+     String tableType = tbl.getTableType();
+     boolean isExternal = Boolean.parseBoolean(tbl.getParameters().get("EXTERNAL"));
+     if (TableType.MANAGED_TABLE.toString().equals(tableType)) {
+       if (isExternal) {
+         tableType = TableType.EXTERNAL_TABLE.toString();
+       }
+     }
+     if (TableType.EXTERNAL_TABLE.toString().equals(tableType)) {
+       if (!isExternal) {
+         tableType = TableType.MANAGED_TABLE.toString();
+       }
+     }
+ 
+     PrincipalType ownerPrincipalType = tbl.getOwnerType();
+     String ownerType = (ownerPrincipalType == null) ? PrincipalType.USER.name() : ownerPrincipalType.name();
+ 
+     // A new table is always created with a new column descriptor
 -    return new MTable(normalizeIdentifier(tbl.getTableName()), mdb,
++    MTable mtable = new MTable(normalizeIdentifier(tbl.getTableName()), mdb,
+         convertToMStorageDescriptor(tbl.getSd()), tbl.getOwner(), ownerType, tbl
+         .getCreateTime(), tbl.getLastAccessTime(), tbl.getRetention(),
+         convertToMFieldSchemas(tbl.getPartitionKeys()), tbl.getParameters(),
+         tbl.getViewOriginalText(), tbl.getViewExpandedText(), tbl.isRewriteEnabled(),
+         tableType);
++    return mtable;
+   }
+ 
+   private List<MFieldSchema> convertToMFieldSchemas(List<FieldSchema> keys) {
+     List<MFieldSchema> mkeys = null;
+     if (keys != null) {
+       mkeys = new ArrayList<>(keys.size());
+       for (FieldSchema part : keys) {
+         mkeys.add(new MFieldSchema(part.getName().toLowerCase(),
+             part.getType(), part.getComment()));
+       }
+     }
+     return mkeys;
+   }
+ 
+   private List<FieldSchema> convertToFieldSchemas(List<MFieldSchema> mkeys) {
+     List<FieldSchema> keys = null;
+     if (mkeys != null) {
+       keys = new ArrayList<>(mkeys.size());
+       for (MFieldSchema part : mkeys) {
+         keys.add(new FieldSchema(part.getName(), part.getType(), part
+             .getComment()));
+       }
+     }
+     return keys;
+   }
+ 
+   private List<MOrder> convertToMOrders(List<Order> keys) {
+     List<MOrder> mkeys = null;
+     if (keys != null) {
+       mkeys = new ArrayList<>(keys.size());
+       for (Order part : keys) {
+         mkeys.add(new MOrder(normalizeIdentifier(part.getCol()), part.getOrder()));
+       }
+     }
+     return mkeys;
+   }
+ 
+   private List<Order> convertToOrders(List<MOrder> mkeys) {
+     List<Order> keys = null;
+     if (mkeys != null) {
+       keys = new ArrayList<>(mkeys.size());
+       for (MOrder part : mkeys) {
+         keys.add(new Order(part.getCol(), part.getOrder()));
+       }
+     }
+     return keys;
+   }
+ 
+   private SerDeInfo convertToSerDeInfo(MSerDeInfo ms) throws MetaException {
+     if (ms == null) {
+       throw new MetaException("Invalid SerDeInfo object");
+     }
+     SerDeInfo serde =
+         new SerDeInfo(ms.getName(), ms.getSerializationLib(), convertMap(ms.getParameters()));
+     if (ms.getDescription() != null) {
+       serde.setDescription(ms.getDescription());
+     }
+     if (ms.getSerializerClass() != null) {
+       serde.setSerializerClass(ms.getSerializerClass());
+     }
+     if (ms.getDeserializerClass() != null) {
+       serde.setDeserializerClass(ms.getDeserializerClass());
+     }
+     if (ms.getSerdeType() > 0) {
+       serde.setSerdeType(SerdeType.findByValue(ms.getSerdeType()));
+     }
+     return serde;
+   }
+ 
+   private MSerDeInfo convertToMSerDeInfo(SerDeInfo ms) throws MetaException {
+     if (ms == null) {
+       throw new MetaException("Invalid SerDeInfo object");
+     }
+     return new MSerDeInfo(ms.getName(), ms.getSerializationLib(), ms.getParameters(),
+         ms.getDescription(), ms.getSerializerClass(), ms.getDeserializerClass(),
+         ms.getSerdeType() == null ? 0 : ms.

<TRUNCATED>