You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/10/10 17:40:30 UTC

[1/3] hive git commit: HIVE-20306 : Addendum to remove .orig files

Repository: hive
Updated Branches:
  refs/heads/master 390afb5ef -> 64bef36a3


http://git-wip-us.apache.org/repos/asf/hive/blob/64bef36a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java.orig
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java.orig b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java.orig
deleted file mode 100644
index 5372714..0000000
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java.orig
+++ /dev/null
@@ -1,12514 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
-import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.SQLIntegrityConstraintViolationException;
-import java.sql.Statement;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
-import javax.jdo.JDOCanRetryException;
-import javax.jdo.JDODataStoreException;
-import javax.jdo.JDOException;
-import javax.jdo.JDOHelper;
-import javax.jdo.JDOObjectNotFoundException;
-import javax.jdo.PersistenceManager;
-import javax.jdo.PersistenceManagerFactory;
-import javax.jdo.Query;
-import javax.jdo.Transaction;
-import javax.jdo.datastore.DataStoreCache;
-import javax.jdo.datastore.JDOConnection;
-import javax.jdo.identity.IntIdentity;
-import javax.sql.DataSource;
-
-import com.google.common.base.Strings;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.*;
-import org.apache.hadoop.hive.metastore.MetaStoreDirectSql.SqlFilterForPushdown;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
-import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
-import org.apache.hadoop.hive.metastore.metrics.Metrics;
-import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
-import org.apache.hadoop.hive.metastore.model.*;
-import org.apache.hadoop.hive.metastore.model.MWMMapping.EntityType;
-import org.apache.hadoop.hive.metastore.model.MWMResourcePlan.Status;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder;
-import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.FileUtils;
-import org.apache.hadoop.hive.metastore.utils.JavaUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.ObjectPair;
-import org.apache.thrift.TException;
-import org.datanucleus.AbstractNucleusContext;
-import org.datanucleus.ClassLoaderResolver;
-import org.datanucleus.ClassLoaderResolverImpl;
-import org.datanucleus.NucleusContext;
-import org.datanucleus.PropertyNames;
-import org.datanucleus.api.jdo.JDOPersistenceManager;
-import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
-import org.datanucleus.store.rdbms.exceptions.MissingTableException;
-import org.datanucleus.store.scostore.Store;
-import org.datanucleus.util.WeakValueMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-
-/**
- * This class is the interface between the application logic and the database
- * store that contains the objects. Refrain putting any logic in mode.M* objects
- * or in this file as former could be auto generated and this class would need
- * to be made into a interface that can read both from a database and a
- * filestore.
- */
-public class ObjectStore implements RawStore, Configurable {
-  private static Properties prop = null;
-  private static PersistenceManagerFactory pmf = null;
-  private static boolean forTwoMetastoreTesting = false;
-  private int batchSize = Batchable.NO_BATCHING;
-
-  private static final DateTimeFormatter YMDHMS_FORMAT = DateTimeFormatter.ofPattern(
-      "yyyy_MM_dd_HH_mm_ss");
-
-  private static Lock pmfPropLock = new ReentrantLock();
-  /**
-  * Verify the schema only once per JVM since the db connection info is static
-  */
-  private final static AtomicBoolean isSchemaVerified = new AtomicBoolean(false);
-  private static final Logger LOG = LoggerFactory.getLogger(ObjectStore.class);
-
-  private enum TXN_STATUS {
-    NO_STATE, OPEN, COMMITED, ROLLBACK
-  }
-
-  private static final Map<String, Class<?>> PINCLASSMAP;
-  private static final String HOSTNAME;
-  private static final String USER;
-  private static final String JDO_PARAM = ":param";
-  static {
-    Map<String, Class<?>> map = new HashMap<>();
-    map.put("table", MTable.class);
-    map.put("storagedescriptor", MStorageDescriptor.class);
-    map.put("serdeinfo", MSerDeInfo.class);
-    map.put("partition", MPartition.class);
-    map.put("database", MDatabase.class);
-    map.put("type", MType.class);
-    map.put("fieldschema", MFieldSchema.class);
-    map.put("order", MOrder.class);
-    PINCLASSMAP = Collections.unmodifiableMap(map);
-    String hostname = "UNKNOWN";
-    try {
-      InetAddress clientAddr = InetAddress.getLocalHost();
-      hostname = clientAddr.getHostAddress();
-    } catch (IOException e) {
-    }
-    HOSTNAME = hostname;
-    String user = System.getenv("USER");
-    USER = org.apache.commons.lang.StringUtils.defaultString(user, "UNKNOWN");
-  }
-
-
-  private boolean isInitialized = false;
-  private PersistenceManager pm = null;
-  private SQLGenerator sqlGenerator = null;
-  private MetaStoreDirectSql directSql = null;
-  private DatabaseProduct dbType = null;
-  private PartitionExpressionProxy expressionProxy = null;
-  private Configuration conf;
-  private volatile int openTrasactionCalls = 0;
-  private Transaction currentTransaction = null;
-  private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE;
-  private Pattern partitionValidationPattern;
-  private Counter directSqlErrors;
-  private boolean areTxnStatsSupported = false;
-
-  /**
-   * A Autocloseable wrapper around Query class to pass the Query object to the caller and let the caller release
-   * the resources when the QueryWrapper goes out of scope
-   */
-  public static class QueryWrapper implements AutoCloseable {
-    public Query query;
-
-    /**
-     * Explicitly closes the query object to release the resources
-     */
-    @Override
-    public void close() {
-      if (query != null) {
-        query.closeAll();
-        query = null;
-      }
-    }
-  }
-
-  public ObjectStore() {
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  /**
-   * Called whenever this object is instantiated using ReflectionUtils, and also
-   * on connection retries. In cases of connection retries, conf will usually
-   * contain modified values.
-   */
-  @Override
-  @SuppressWarnings("nls")
-  public void setConf(Configuration conf) {
-    // Although an instance of ObjectStore is accessed by one thread, there may
-    // be many threads with ObjectStore instances. So the static variables
-    // pmf and prop need to be protected with locks.
-    pmfPropLock.lock();
-    try {
-      isInitialized = false;
-      this.conf = conf;
-      this.areTxnStatsSupported = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED);
-      configureSSL(conf);
-      Properties propsFromConf = getDataSourceProps(conf);
-      boolean propsChanged = !propsFromConf.equals(prop);
-
-      if (propsChanged) {
-        if (pmf != null){
-          clearOutPmfClassLoaderCache(pmf);
-          if (!forTwoMetastoreTesting) {
-            // close the underlying connection pool to avoid leaks
-            pmf.close();
-          }
-        }
-        pmf = null;
-        prop = null;
-      }
-
-      assert(!isActiveTransaction());
-      shutdown();
-      // Always want to re-create pm as we don't know if it were created by the
-      // most recent instance of the pmf
-      pm = null;
-      directSql = null;
-      expressionProxy = null;
-      openTrasactionCalls = 0;
-      currentTransaction = null;
-      transactionStatus = TXN_STATUS.NO_STATE;
-
-      initialize(propsFromConf);
-
-      String partitionValidationRegex =
-          MetastoreConf.getVar(this.conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN);
-      if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) {
-        partitionValidationPattern = Pattern.compile(partitionValidationRegex);
-      } else {
-        partitionValidationPattern = null;
-      }
-
-      // Note, if metrics have not been initialized this will return null, which means we aren't
-      // using metrics.  Thus we should always check whether this is non-null before using.
-      MetricRegistry registry = Metrics.getRegistry();
-      if (registry != null) {
-        directSqlErrors = Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS);
-      }
-
-      this.batchSize = MetastoreConf.getIntVar(conf, ConfVars.RAWSTORE_PARTITION_BATCH_SIZE);
-
-      if (!isInitialized) {
-        throw new RuntimeException(
-        "Unable to create persistence manager. Check dss.log for details");
-      } else {
-        LOG.debug("Initialized ObjectStore");
-      }
-    } finally {
-      pmfPropLock.unlock();
-    }
-  }
-
-  private ClassLoader classLoader;
-  {
-    classLoader = Thread.currentThread().getContextClassLoader();
-    if (classLoader == null) {
-      classLoader = ObjectStore.class.getClassLoader();
-    }
-  }
-
-  @SuppressWarnings("nls")
-  private void initialize(Properties dsProps) {
-    int retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS);
-    long retryInterval = MetastoreConf.getTimeVar(conf,
-        ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS);
-    int numTries = retryLimit;
-
-    while (numTries > 0){
-      try {
-        initializeHelper(dsProps);
-        return; // If we reach here, we succeed.
-      } catch (Exception e){
-        shutdown();
-        numTries--;
-        boolean retriable = isRetriableException(e);
-        if ((numTries > 0) && retriable){
-          LOG.info("Retriable exception while instantiating ObjectStore, retrying. " +
-              "{} tries left", numTries, e);
-          try {
-            Thread.sleep(retryInterval);
-          } catch (InterruptedException ie) {
-            // Restore the interrupted status, since we do not want to catch it.
-            LOG.debug("Interrupted while sleeping before retrying.", ie);
-            Thread.currentThread().interrupt();
-          }
-          // If we're here, we'll proceed down the next while loop iteration.
-        } else {
-          // we've reached our limit, throw the last one.
-          if (retriable){
-            LOG.warn("Exception retry limit reached, not retrying any longer.",
-              e);
-          } else {
-            LOG.debug("Non-retriable exception during ObjectStore initialize.", e);
-          }
-          throw e;
-        }
-      }
-    }
-  }
-
-  private static final Set<Class<? extends Throwable>> retriableExceptionClasses =
-      new HashSet<>(Arrays.asList(JDOCanRetryException.class));
-  /**
-   * Helper function for initialize to determine if we should retry an exception.
-   * We return true if the exception is of a known type of retriable exceptions, or if one
-   * of its recursive .getCause returns a known type of retriable exception.
-   */
-  private boolean isRetriableException(Throwable e) {
-    if (e == null){
-      return false;
-    }
-    if (retriableExceptionClasses.contains(e.getClass())){
-      return true;
-    }
-    for (Class<? extends Throwable> c : retriableExceptionClasses){
-      if (c.isInstance(e)){
-        return true;
-      }
-    }
-
-    if (e.getCause() == null){
-      return false;
-    }
-    return isRetriableException(e.getCause());
-  }
-
-  /**
-   * private helper to do initialization routine, so we can retry if needed if it fails.
-   * @param dsProps
-   */
-  private void initializeHelper(Properties dsProps) {
-    LOG.debug("ObjectStore, initialize called");
-    prop = dsProps;
-    pm = getPersistenceManager();
-    LOG.info("RawStore: {}, with PersistenceManager: {}" +
-            " created in the thread with id: {}", this, pm, Thread.currentThread().getId());
-    try {
-      String productName = MetaStoreDirectSql.getProductName(pm);
-      sqlGenerator = new SQLGenerator(DatabaseProduct.determineDatabaseProduct(productName), conf);
-    } catch (SQLException e) {
-      LOG.error("error trying to figure out the database product", e);
-      throw new RuntimeException(e);
-    }
-    isInitialized = pm != null;
-    if (isInitialized) {
-      dbType = determineDatabaseProduct();
-      expressionProxy = createExpressionProxy(conf);
-      if (MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL)) {
-        String schema = prop.getProperty("javax.jdo.mapping.Schema");
-        schema = org.apache.commons.lang.StringUtils.defaultIfBlank(schema, null);
-        directSql = new MetaStoreDirectSql(pm, conf, schema);
-      }
-    }
-  }
-
-  private DatabaseProduct determineDatabaseProduct() {
-    try {
-      return DatabaseProduct.determineDatabaseProduct(getProductName(pm));
-    } catch (SQLException e) {
-      LOG.warn("Cannot determine database product; assuming OTHER", e);
-      return DatabaseProduct.OTHER;
-    }
-  }
-
-  private static String getProductName(PersistenceManager pm) {
-    JDOConnection jdoConn = pm.getDataStoreConnection();
-    try {
-      return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName();
-    } catch (Throwable t) {
-      LOG.warn("Error retrieving product name", t);
-      return null;
-    } finally {
-      jdoConn.close(); // We must release the connection before we call other pm methods.
-    }
-  }
-
-  /**
-   * Creates the proxy used to evaluate expressions. This is here to prevent circular
-   * dependency - ql -&gt; metastore client &lt;-&gt metastore server -&gt ql. If server and
-   * client are split, this can be removed.
-   * @param conf Configuration.
-   * @return The partition expression proxy.
-   */
-  private static PartitionExpressionProxy createExpressionProxy(Configuration conf) {
-    String className = MetastoreConf.getVar(conf, ConfVars.EXPRESSION_PROXY_CLASS);
-    try {
-      Class<? extends PartitionExpressionProxy> clazz =
-           JavaUtils.getClass(className, PartitionExpressionProxy.class);
-      return JavaUtils.newInstance(clazz, new Class<?>[0], new Object[0]);
-    } catch (MetaException e) {
-      LOG.error("Error loading PartitionExpressionProxy", e);
-      throw new RuntimeException("Error loading PartitionExpressionProxy: " + e.getMessage());
-    }
-  }
-
-  /**
-   * Configure the SSL properties of the connection from provided config
-   * @param conf
-   */
-  private static void configureSSL(Configuration conf) {
-    // SSL support
-    String sslPropString = MetastoreConf.getVar(conf, ConfVars.DBACCESS_SSL_PROPS);
-    if (org.apache.commons.lang.StringUtils.isNotEmpty(sslPropString)) {
-      LOG.info("Metastore setting SSL properties of the connection to backed DB");
-      for (String sslProp : sslPropString.split(",")) {
-        String[] pair = sslProp.trim().split("=");
-        if (pair != null && pair.length == 2) {
-          System.setProperty(pair[0].trim(), pair[1].trim());
-        } else {
-          LOG.warn("Invalid metastore property value for {}", ConfVars.DBACCESS_SSL_PROPS);
-        }
-      }
-    }
-  }
-
-  /**
-   * Properties specified in hive-default.xml override the properties specified
-   * in jpox.properties.
-   */
-  @SuppressWarnings("nls")
-  private static Properties getDataSourceProps(Configuration conf) {
-    Properties prop = new Properties();
-    correctAutoStartMechanism(conf);
-
-    // First, go through and set all our values for datanucleus and javax.jdo parameters.  This
-    // has to be a separate first step because we don't set the default values in the config object.
-    for (ConfVars var : MetastoreConf.dataNucleusAndJdoConfs) {
-      String confVal = MetastoreConf.getAsString(conf, var);
-      String varName = var.getVarname();
-      Object prevVal = prop.setProperty(varName, confVal);
-      if (MetastoreConf.isPrintable(varName)) {
-        LOG.debug("Overriding {} value {} from jpox.properties with {}",
-          varName, prevVal, confVal);
-      }
-    }
-
-    // Now, we need to look for any values that the user set that MetastoreConf doesn't know about.
-    // TODO Commenting this out for now, as it breaks because the conf values aren't getting properly
-    // interpolated in case of variables.  See HIVE-17788.
-    /*
-    for (Map.Entry<String, String> e : conf) {
-      if (e.getKey().startsWith("datanucleus.") || e.getKey().startsWith("javax.jdo.")) {
-        // We have to handle this differently depending on whether it is a value known to
-        // MetastoreConf or not.  If it is, we need to get the default value if a value isn't
-        // provided.  If not, we just set whatever the user has set.
-        Object prevVal = prop.setProperty(e.getKey(), e.getValue());
-        if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(e.getKey())) {
-          LOG.debug("Overriding " + e.getKey() + " value " + prevVal
-              + " from  jpox.properties with " + e.getValue());
-        }
-      }
-    }
-    */
-
-    // Password may no longer be in the conf, use getPassword()
-    try {
-      String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
-      if (org.apache.commons.lang.StringUtils.isNotEmpty(passwd)) {
-        // We can get away with the use of varname here because varname == hiveName for PWD
-        prop.setProperty(ConfVars.PWD.getVarname(), passwd);
-      }
-    } catch (IOException err) {
-      throw new RuntimeException("Error getting metastore password: " + err.getMessage(), err);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      for (Entry<Object, Object> e : prop.entrySet()) {
-        if (MetastoreConf.isPrintable(e.getKey().toString())) {
-          LOG.debug("{} = {}", e.getKey(), e.getValue());
-        }
-      }
-    }
-
-    return prop;
-  }
-
-  /**
-   * Update conf to set datanucleus.autoStartMechanismMode=ignored.
-   * This is necessary to able to use older version of hive against
-   * an upgraded but compatible metastore schema in db from new version
-   * of hive
-   * @param conf
-   */
-  private static void correctAutoStartMechanism(Configuration conf) {
-    final String autoStartKey = "datanucleus.autoStartMechanismMode";
-    final String autoStartIgnore = "ignored";
-    String currentAutoStartVal = conf.get(autoStartKey);
-    if (!autoStartIgnore.equalsIgnoreCase(currentAutoStartVal)) {
-      LOG.warn("{} is set to unsupported value {} . Setting it to value: {}", autoStartKey,
-        conf.get(autoStartKey), autoStartIgnore);
-    }
-    conf.set(autoStartKey, autoStartIgnore);
-  }
-
-  private static synchronized PersistenceManagerFactory getPMF() {
-    if (pmf == null) {
-
-      Configuration conf = MetastoreConf.newMetastoreConf();
-      DataSourceProvider dsp = DataSourceProviderFactory.hasProviderSpecificConfigurations(conf) ?
-              DataSourceProviderFactory.getDataSourceProvider(conf) : null;
-
-      if (dsp == null) {
-        pmf = JDOHelper.getPersistenceManagerFactory(prop);
-      } else {
-        try {
-          DataSource ds = dsp.create(conf);
-          Map<Object, Object> dsProperties = new HashMap<>();
-          //Any preexisting datanucleus property should be passed along
-          dsProperties.putAll(prop);
-          dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds);
-          dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds);
-          dsProperties.put("javax.jdo.PersistenceManagerFactoryClass",
-              "org.datanucleus.api.jdo.JDOPersistenceManagerFactory");
-          pmf = JDOHelper.getPersistenceManagerFactory(dsProperties);
-        } catch (SQLException e) {
-          LOG.warn("Could not create PersistenceManagerFactory using " +
-              "connection pool properties, will fall back", e);
-          pmf = JDOHelper.getPersistenceManagerFactory(prop);
-        }
-      }
-      DataStoreCache dsc = pmf.getDataStoreCache();
-      if (dsc != null) {
-        String objTypes = MetastoreConf.getVar(conf, ConfVars.CACHE_PINOBJTYPES);
-        LOG.info("Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=\"{}\"", objTypes);
-        if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) {
-          String[] typeTokens = objTypes.toLowerCase().split(",");
-          for (String type : typeTokens) {
-            type = type.trim();
-            if (PINCLASSMAP.containsKey(type)) {
-              dsc.pinAll(true, PINCLASSMAP.get(type));
-            } else {
-              LOG.warn("{} is not one of the pinnable object types: {}", type,
-                org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), " "));
-            }
-          }
-        }
-      } else {
-        LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes");
-      }
-    }
-    return pmf;
-  }
-
-  @InterfaceAudience.LimitedPrivate({"HCATALOG"})
-  @InterfaceStability.Evolving
-  public PersistenceManager getPersistenceManager() {
-    return getPMF().getPersistenceManager();
-  }
-
-  @Override
-  public void shutdown() {
-    LOG.info("RawStore: {}, with PersistenceManager: {} will be shutdown", this, pm);
-    if (pm != null) {
-      pm.close();
-      pm = null;
-    }
-  }
-
-  /**
-   * Opens a new one or the one already created Every call of this function must
-   * have corresponding commit or rollback function call
-   *
-   * @return an active transaction
-   */
-
-  @Override
-  public boolean openTransaction() {
-    openTrasactionCalls++;
-    if (openTrasactionCalls == 1) {
-      currentTransaction = pm.currentTransaction();
-      currentTransaction.begin();
-      transactionStatus = TXN_STATUS.OPEN;
-    } else {
-      // openTransactionCalls > 1 means this is an interior transaction
-      // We should already have a transaction created that is active.
-      if ((currentTransaction == null) || (!currentTransaction.isActive())){
-        throw new RuntimeException("openTransaction called in an interior"
-            + " transaction scope, but currentTransaction is not active.");
-      }
-    }
-
-    boolean result = currentTransaction.isActive();
-    debugLog("Open transaction: count = " + openTrasactionCalls + ", isActive = " + result);
-    return result;
-  }
-
-  /**
-   * if this is the commit of the first open call then an actual commit is
-   * called.
-   *
-   * @return Always returns true
-   */
-  @Override
-  @SuppressWarnings("nls")
-  public boolean commitTransaction() {
-    if (TXN_STATUS.ROLLBACK == transactionStatus) {
-      debugLog("Commit transaction: rollback");
-      return false;
-    }
-    if (openTrasactionCalls <= 0) {
-      RuntimeException e = new RuntimeException("commitTransaction was called but openTransactionCalls = "
-          + openTrasactionCalls + ". This probably indicates that there are unbalanced " +
-          "calls to openTransaction/commitTransaction");
-      LOG.error("Unbalanced calls to open/commit Transaction", e);
-      throw e;
-    }
-    if (!currentTransaction.isActive()) {
-      RuntimeException e = new RuntimeException("commitTransaction was called but openTransactionCalls = "
-          + openTrasactionCalls + ". This probably indicates that there are unbalanced " +
-          "calls to openTransaction/commitTransaction");
-      LOG.error("Unbalanced calls to open/commit Transaction", e);
-      throw e;
-    }
-    openTrasactionCalls--;
-    debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive());
-
-    if ((openTrasactionCalls == 0) && currentTransaction.isActive()) {
-      transactionStatus = TXN_STATUS.COMMITED;
-      currentTransaction.commit();
-    }
-    return true;
-  }
-
-  /**
-   * @return true if there is an active transaction. If the current transaction
-   *         is either committed or rolled back it returns false
-   */
-  @Override
-  public boolean isActiveTransaction() {
-    if (currentTransaction == null) {
-      return false;
-    }
-    return currentTransaction.isActive();
-  }
-
-  /**
-   * Rolls back the current transaction if it is active
-   */
-  @Override
-  public void rollbackTransaction() {
-    if (openTrasactionCalls < 1) {
-      debugLog("rolling back transaction: no open transactions: " + openTrasactionCalls);
-      return;
-    }
-    debugLog("Rollback transaction, isActive: " + currentTransaction.isActive());
-    try {
-      if (currentTransaction.isActive()
-          && transactionStatus != TXN_STATUS.ROLLBACK) {
-        currentTransaction.rollback();
-      }
-    } finally {
-      openTrasactionCalls = 0;
-      transactionStatus = TXN_STATUS.ROLLBACK;
-      // remove all detached objects from the cache, since the transaction is
-      // being rolled back they are no longer relevant, and this prevents them
-      // from reattaching in future transactions
-      pm.evictAll();
-    }
-  }
-
-  @Override
-  public void createCatalog(Catalog cat) throws MetaException {
-    LOG.debug("Creating catalog " + cat.getName());
-    boolean committed = false;
-    MCatalog mCat = catToMCat(cat);
-    try {
-      openTransaction();
-      pm.makePersistent(mCat);
-      committed = commitTransaction();
-    } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-    }
-  }
-
-  @Override
-  public void alterCatalog(String catName, Catalog cat)
-      throws MetaException, InvalidOperationException {
-    if (!cat.getName().equals(catName)) {
-      throw new InvalidOperationException("You cannot change a catalog's name");
-    }
-    boolean committed = false;
-    try {
-      MCatalog mCat = getMCatalog(catName);
-      if (org.apache.commons.lang.StringUtils.isNotBlank(cat.getLocationUri())) {
-        mCat.setLocationUri(cat.getLocationUri());
-      }
-      if (org.apache.commons.lang.StringUtils.isNotBlank(cat.getDescription())) {
-        mCat.setDescription(cat.getDescription());
-      }
-      openTransaction();
-      pm.makePersistent(mCat);
-      committed = commitTransaction();
-    } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-    }
-  }
-
-  @Override
-  public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException {
-    LOG.debug("Fetching catalog " + catalogName);
-    MCatalog mCat = getMCatalog(catalogName);
-    if (mCat == null) {
-      throw new NoSuchObjectException("No catalog " + catalogName);
-    }
-    return mCatToCat(mCat);
-  }
-
-  @Override
-  public List<String> getCatalogs() throws MetaException {
-    LOG.debug("Fetching all catalog names");
-    boolean commited = false;
-    List<String> catalogs = null;
-
-    String queryStr = "select name from org.apache.hadoop.hive.metastore.model.MCatalog";
-    Query query = null;
-
-    openTransaction();
-    try {
-      query = pm.newQuery(queryStr);
-      query.setResult("name");
-      catalogs = new ArrayList<>((Collection<String>) query.execute());
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    Collections.sort(catalogs);
-    return catalogs;
-  }
-
-  @Override
-  public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException {
-    LOG.debug("Dropping catalog " + catalogName);
-    boolean committed = false;
-    try {
-      openTransaction();
-      MCatalog mCat = getMCatalog(catalogName);
-      pm.retrieve(mCat);
-      if (mCat == null) {
-        throw new NoSuchObjectException("No catalog " + catalogName);
-      }
-      pm.deletePersistent(mCat);
-      committed = commitTransaction();
-    } finally {
-      if (!committed) {
-        rollbackTransaction();
-      }
-    }
-  }
-
-  private MCatalog getMCatalog(String catalogName) throws MetaException {
-    boolean committed = false;
-    Query query = null;
-    try {
-      openTransaction();
-      catalogName = normalizeIdentifier(catalogName);
-      query = pm.newQuery(MCatalog.class, "name == catname");
-      query.declareParameters("java.lang.String catname");
-      query.setUnique(true);
-      MCatalog mCat = (MCatalog)query.execute(catalogName);
-      pm.retrieve(mCat);
-      committed = commitTransaction();
-      return mCat;
-    } finally {
-      rollbackAndCleanup(committed, query);
-    }
-  }
-
-  private MCatalog catToMCat(Catalog cat) {
-    MCatalog mCat = new MCatalog();
-    mCat.setName(normalizeIdentifier(cat.getName()));
-    if (cat.isSetDescription()) {
-      mCat.setDescription(cat.getDescription());
-    }
-    mCat.setLocationUri(cat.getLocationUri());
-    return mCat;
-  }
-
-  private Catalog mCatToCat(MCatalog mCat) {
-    Catalog cat = new Catalog(mCat.getName(), mCat.getLocationUri());
-    if (mCat.getDescription() != null) {
-      cat.setDescription(mCat.getDescription());
-    }
-    return cat;
-  }
-
-  @Override
-  public void createDatabase(Database db) throws InvalidObjectException, MetaException {
-    boolean commited = false;
-    MDatabase mdb = new MDatabase();
-    assert db.getCatalogName() != null;
-    mdb.setCatalogName(normalizeIdentifier(db.getCatalogName()));
-    assert mdb.getCatalogName() != null;
-    mdb.setName(db.getName().toLowerCase());
-    mdb.setLocationUri(db.getLocationUri());
-    mdb.setDescription(db.getDescription());
-    mdb.setParameters(db.getParameters());
-    mdb.setOwnerName(db.getOwnerName());
-    PrincipalType ownerType = db.getOwnerType();
-    mdb.setOwnerType((null == ownerType ? PrincipalType.USER.name() : ownerType.name()));
-    try {
-      openTransaction();
-      pm.makePersistent(mdb);
-      commited = commitTransaction();
-    } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-    }
-  }
-
-  @SuppressWarnings("nls")
-  private MDatabase getMDatabase(String catName, String name) throws NoSuchObjectException {
-    MDatabase mdb = null;
-    boolean commited = false;
-    Query query = null;
-    try {
-      openTransaction();
-      name = normalizeIdentifier(name);
-      catName = normalizeIdentifier(catName);
-      query = pm.newQuery(MDatabase.class, "name == dbname && catalogName == catname");
-      query.declareParameters("java.lang.String dbname, java.lang.String catname");
-      query.setUnique(true);
-      mdb = (MDatabase) query.execute(name, catName);
-      pm.retrieve(mdb);
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    if (mdb == null) {
-      throw new NoSuchObjectException("There is no database " + catName + "." + name);
-    }
-    return mdb;
-  }
-
-  @Override
-  public Database getDatabase(String catalogName, String name) throws NoSuchObjectException {
-    MetaException ex = null;
-    Database db = null;
-    try {
-      db = getDatabaseInternal(catalogName, name);
-    } catch (MetaException e) {
-      // Signature restriction to NSOE, and NSOE being a flat exception prevents us from
-      // setting the cause of the NSOE as the MetaException. We should not lose the info
-      // we got here, but it's very likely that the MetaException is irrelevant and is
-      // actually an NSOE message, so we should log it and throw an NSOE with the msg.
-      ex = e;
-    }
-    if (db == null) {
-      LOG.warn("Failed to get database {}.{}, returning NoSuchObjectException",
-          catalogName, name, ex);
-      throw new NoSuchObjectException(name + (ex == null ? "" : (": " + ex.getMessage())));
-    }
-    return db;
-  }
-
-  public Database getDatabaseInternal(String catalogName, String name)
-      throws MetaException, NoSuchObjectException {
-    return new GetDbHelper(catalogName, name, true, true) {
-      @Override
-      protected Database getSqlResult(GetHelper<Database> ctx) throws MetaException {
-        return directSql.getDatabase(catalogName, dbName);
-      }
-
-      @Override
-      protected Database getJdoResult(GetHelper<Database> ctx) throws MetaException, NoSuchObjectException {
-        return getJDODatabase(catalogName, dbName);
-      }
-    }.run(false);
-   }
-
-  public Database getJDODatabase(String catName, String name) throws NoSuchObjectException {
-    MDatabase mdb = null;
-    boolean commited = false;
-    try {
-      openTransaction();
-      mdb = getMDatabase(catName, name);
-      commited = commitTransaction();
-    } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-    }
-    Database db = new Database();
-    db.setName(mdb.getName());
-    db.setDescription(mdb.getDescription());
-    db.setLocationUri(mdb.getLocationUri());
-    db.setParameters(convertMap(mdb.getParameters()));
-    db.setOwnerName(mdb.getOwnerName());
-    String type = org.apache.commons.lang.StringUtils.defaultIfBlank(mdb.getOwnerType(), null);
-    PrincipalType principalType = (type == null) ? null : PrincipalType.valueOf(type);
-    db.setOwnerType(principalType);
-    db.setCatalogName(catName);
-    return db;
-  }
-
-  /**
-   * Alter the database object in metastore. Currently only the parameters
-   * of the database or the owner can be changed.
-   * @param dbName the database name
-   * @param db the Hive Database object
-   * @throws MetaException
-   * @throws NoSuchObjectException
-   */
-  @Override
-  public boolean alterDatabase(String catName, String dbName, Database db)
-    throws MetaException, NoSuchObjectException {
-
-    MDatabase mdb = null;
-    boolean committed = false;
-    try {
-      mdb = getMDatabase(catName, dbName);
-      mdb.setParameters(db.getParameters());
-      mdb.setOwnerName(db.getOwnerName());
-      if (db.getOwnerType() != null) {
-        mdb.setOwnerType(db.getOwnerType().name());
-      }
-      if (org.apache.commons.lang.StringUtils.isNotBlank(db.getDescription())) {
-        mdb.setDescription(db.getDescription());
-      }
-      if (org.apache.commons.lang.StringUtils.isNotBlank(db.getLocationUri())) {
-        mdb.setLocationUri(db.getLocationUri());
-      }
-      openTransaction();
-      pm.makePersistent(mdb);
-      committed = commitTransaction();
-    } finally {
-      if (!committed) {
-        rollbackTransaction();
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public boolean dropDatabase(String catName, String dbname)
-      throws NoSuchObjectException, MetaException {
-    boolean success = false;
-    LOG.info("Dropping database {}.{} along with all tables", catName, dbname);
-    dbname = normalizeIdentifier(dbname);
-    catName = normalizeIdentifier(catName);
-    QueryWrapper queryWrapper = new QueryWrapper();
-    try {
-      openTransaction();
-
-      // then drop the database
-      MDatabase db = getMDatabase(catName, dbname);
-      pm.retrieve(db);
-      if (db != null) {
-        List<MDBPrivilege> dbGrants = this.listDatabaseGrants(catName, dbname, null, queryWrapper);
-        if (CollectionUtils.isNotEmpty(dbGrants)) {
-          pm.deletePersistentAll(dbGrants);
-        }
-        pm.deletePersistent(db);
-      }
-      success = commitTransaction();
-    } finally {
-      rollbackAndCleanup(success, queryWrapper);
-    }
-    return success;
-  }
-
-  @Override
-  public List<String> getDatabases(String catName, String pattern) throws MetaException {
-    if (pattern == null || pattern.equals("*")) {
-      return getAllDatabases(catName);
-    }
-    boolean commited = false;
-    List<String> databases = null;
-    Query query = null;
-    try {
-      openTransaction();
-      // Take the pattern and split it on the | to get all the composing
-      // patterns
-      String[] subpatterns = pattern.trim().split("\\|");
-      StringBuilder filterBuilder = new StringBuilder();
-      List<String> parameterVals = new ArrayList<>(subpatterns.length);
-      appendSimpleCondition(filterBuilder, "catalogName", new String[] {catName}, parameterVals);
-      appendPatternCondition(filterBuilder, "name", subpatterns, parameterVals);
-      query = pm.newQuery(MDatabase.class, filterBuilder.toString());
-      query.setResult("name");
-      query.setOrdering("name ascending");
-      Collection<String> names = (Collection<String>) query.executeWithArray(parameterVals.toArray(new String[0]));
-      databases = new ArrayList<>(names);
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    return databases;
-  }
-
-  @Override
-  public List<String> getAllDatabases(String catName) throws MetaException {
-    boolean commited = false;
-    List<String> databases = null;
-
-    Query query = null;
-    catName = normalizeIdentifier(catName);
-
-    openTransaction();
-    try {
-      query = pm.newQuery("select name from org.apache.hadoop.hive.metastore.model.MDatabase " +
-          "where catalogName == catname");
-      query.declareParameters("java.lang.String catname");
-      query.setResult("name");
-      databases = new ArrayList<>((Collection<String>) query.execute(catName));
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    Collections.sort(databases);
-    return databases;
-  }
-
-  private MType getMType(Type type) {
-    List<MFieldSchema> fields = new ArrayList<>();
-    if (type.getFields() != null) {
-      for (FieldSchema field : type.getFields()) {
-        fields.add(new MFieldSchema(field.getName(), field.getType(), field
-            .getComment()));
-      }
-    }
-    return new MType(type.getName(), type.getType1(), type.getType2(), fields);
-  }
-
-  private Type getType(MType mtype) {
-    List<FieldSchema> fields = new ArrayList<>();
-    if (mtype.getFields() != null) {
-      for (MFieldSchema field : mtype.getFields()) {
-        fields.add(new FieldSchema(field.getName(), field.getType(), field
-            .getComment()));
-      }
-    }
-    Type ret = new Type();
-    ret.setName(mtype.getName());
-    ret.setType1(mtype.getType1());
-    ret.setType2(mtype.getType2());
-    ret.setFields(fields);
-    return ret;
-  }
-
-  @Override
-  public boolean createType(Type type) {
-    boolean success = false;
-    MType mtype = getMType(type);
-    boolean commited = false;
-    try {
-      openTransaction();
-      pm.makePersistent(mtype);
-      commited = commitTransaction();
-      success = true;
-    } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-    }
-    return success;
-  }
-
-  @Override
-  public Type getType(String typeName) {
-    Type type = null;
-    boolean commited = false;
-    Query query = null;
-    try {
-      openTransaction();
-      query = pm.newQuery(MType.class, "name == typeName");
-      query.declareParameters("java.lang.String typeName");
-      query.setUnique(true);
-      MType mtype = (MType) query.execute(typeName.trim());
-      pm.retrieve(type);
-      if (mtype != null) {
-        type = getType(mtype);
-      }
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    return type;
-  }
-
-  @Override
-  public boolean dropType(String typeName) {
-    boolean success = false;
-    Query query = null;
-    try {
-      openTransaction();
-      query = pm.newQuery(MType.class, "name == typeName");
-      query.declareParameters("java.lang.String typeName");
-      query.setUnique(true);
-      MType type = (MType) query.execute(typeName.trim());
-      pm.retrieve(type);
-      if (type != null) {
-        pm.deletePersistent(type);
-      }
-      success = commitTransaction();
-    } catch (JDOObjectNotFoundException e) {
-      success = commitTransaction();
-      LOG.debug("type not found {}", typeName, e);
-    } finally {
-      rollbackAndCleanup(success, query);
-    }
-    return success;
-  }
-
-  @Override
-  public List<String> createTableWithConstraints(Table tbl,
-    List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
-    List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints,
-    List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints)
-    throws InvalidObjectException, MetaException {
-    boolean success = false;
-    try {
-      openTransaction();
-      createTable(tbl);
-      // Add constraints.
-      // We need not do a deep retrieval of the Table Column Descriptor while persisting the
-      // constraints since this transaction involving create table is not yet committed.
-      List<String> constraintNames = new ArrayList<>();
-      if (foreignKeys != null) {
-        constraintNames.addAll(addForeignKeys(foreignKeys, false, primaryKeys, uniqueConstraints));
-      }
-      if (primaryKeys != null) {
-        constraintNames.addAll(addPrimaryKeys(primaryKeys, false));
-      }
-      if (uniqueConstraints != null) {
-        constraintNames.addAll(addUniqueConstraints(uniqueConstraints, false));
-      }
-      if (notNullConstraints != null) {
-        constraintNames.addAll(addNotNullConstraints(notNullConstraints, false));
-      }
-      if (defaultConstraints != null) {
-        constraintNames.addAll(addDefaultConstraints(defaultConstraints, false));
-      }
-      if (checkConstraints != null) {
-        constraintNames.addAll(addCheckConstraints(checkConstraints, false));
-      }
-      success = commitTransaction();
-      return constraintNames;
-    } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-    }
-  }
-
-  @Override
-  public void createTable(Table tbl) throws InvalidObjectException, MetaException {
-    boolean commited = false;
-    MTable mtbl = null;
-
-    try {
-      openTransaction();
-
-      mtbl = convertToMTable(tbl);
-      if (TxnUtils.isTransactionalTable(tbl)) {
-        mtbl.setWriteId(tbl.getWriteId());
-      }
-      pm.makePersistent(mtbl);
-
-      if (tbl.getCreationMetadata() != null) {
-        MCreationMetadata mcm = convertToMCreationMetadata(tbl.getCreationMetadata());
-        pm.makePersistent(mcm);
-      }
-      tbl.setId(mtbl.getId());
-
-      PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges();
-      List<Object> toPersistPrivObjs = new ArrayList<>();
-      if (principalPrivs != null) {
-        int now = (int)(System.currentTimeMillis()/1000);
-
-        Map<String, List<PrivilegeGrantInfo>> userPrivs = principalPrivs.getUserPrivileges();
-        putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, PrincipalType.USER, "SQL");
-
-        Map<String, List<PrivilegeGrantInfo>> groupPrivs = principalPrivs.getGroupPrivileges();
-        putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP, "SQL");
-
-        Map<String, List<PrivilegeGrantInfo>> rolePrivs = principalPrivs.getRolePrivileges();
-        putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE, "SQL");
-      }
-      pm.makePersistentAll(toPersistPrivObjs);
-      commited = commitTransaction();
-    } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-    }
-  }
-
-  /**
-   * Convert PrivilegeGrantInfo from privMap to MTablePrivilege, and add all of
-   * them to the toPersistPrivObjs. These privilege objects will be persisted as
-   * part of createTable.
-   *
-   * @param mtbl
-   * @param toPersistPrivObjs
-   * @param now
-   * @param privMap
-   * @param type
-   */
-  private void putPersistentPrivObjects(MTable mtbl, List<Object> toPersistPrivObjs,
-      int now, Map<String, List<PrivilegeGrantInfo>> privMap, PrincipalType type, String authorizer) {
-    if (privMap != null) {
-      for (Map.Entry<String, List<PrivilegeGrantInfo>> entry : privMap
-          .entrySet()) {
-        String principalName = entry.getKey();
-        List<PrivilegeGrantInfo> privs = entry.getValue();
-        for (int i = 0; i < privs.size(); i++) {
-          PrivilegeGrantInfo priv = privs.get(i);
-          if (priv == null) {
-            continue;
-          }
-          MTablePrivilege mTblSec = new MTablePrivilege(
-              principalName, type.toString(), mtbl, priv.getPrivilege(),
-              now, priv.getGrantor(), priv.getGrantorType().toString(), priv
-                  .isGrantOption(), authorizer);
-          toPersistPrivObjs.add(mTblSec);
-        }
-      }
-    }
-  }
-
-  @Override
-  public boolean dropTable(String catName, String dbName, String tableName)
-      throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
-    boolean materializedView = false;
-    boolean success = false;
-    try {
-      openTransaction();
-      MTable tbl = getMTable(catName, dbName, tableName);
-      pm.retrieve(tbl);
-      if (tbl != null) {
-        materializedView = TableType.MATERIALIZED_VIEW.toString().equals(tbl.getTableType());
-        // first remove all the grants
-        List<MTablePrivilege> tabGrants = listAllTableGrants(catName, dbName, tableName);
-        if (CollectionUtils.isNotEmpty(tabGrants)) {
-          pm.deletePersistentAll(tabGrants);
-        }
-        List<MTableColumnPrivilege> tblColGrants = listTableAllColumnGrants(catName, dbName,
-            tableName);
-        if (CollectionUtils.isNotEmpty(tblColGrants)) {
-          pm.deletePersistentAll(tblColGrants);
-        }
-
-        List<MPartitionPrivilege> partGrants = this.listTableAllPartitionGrants(catName, dbName, tableName);
-        if (CollectionUtils.isNotEmpty(partGrants)) {
-          pm.deletePersistentAll(partGrants);
-        }
-
-        List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(catName, dbName,
-            tableName);
-        if (CollectionUtils.isNotEmpty(partColGrants)) {
-          pm.deletePersistentAll(partColGrants);
-        }
-        // delete column statistics if present
-        try {
-          deleteTableColumnStatistics(catName, dbName, tableName, null);
-        } catch (NoSuchObjectException e) {
-          LOG.info("Found no table level column statistics associated with {} to delete",
-              TableName.getQualified(catName, dbName, tableName));
-        }
-
-        List<MConstraint> tabConstraints = listAllTableConstraintsWithOptionalConstraintName(
-                                           catName, dbName, tableName, null);
-        if (CollectionUtils.isNotEmpty(tabConstraints)) {
-          pm.deletePersistentAll(tabConstraints);
-        }
-
-        preDropStorageDescriptor(tbl.getSd());
-
-        if (materializedView) {
-          dropCreationMetadata(tbl.getDatabase().getCatalogName(),
-              tbl.getDatabase().getName(), tbl.getTableName());
-        }
-
-        // then remove the table
-        pm.deletePersistentAll(tbl);
-      }
-      success = commitTransaction();
-    } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-    }
-    return success;
-  }
-
-  private boolean dropCreationMetadata(String catName, String dbName, String tableName) throws MetaException,
-      NoSuchObjectException, InvalidObjectException, InvalidInputException {
-    boolean success = false;
-    try {
-      openTransaction();
-      MCreationMetadata mcm = getCreationMetadata(catName, dbName, tableName);
-      pm.retrieve(mcm);
-      if (mcm != null) {
-        pm.deletePersistentAll(mcm);
-      }
-      success = commitTransaction();
-    } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-    }
-    return success;
-  }
-
-  private List<MConstraint> listAllTableConstraintsWithOptionalConstraintName(
-      String catName, String dbName, String tableName, String constraintname) {
-    catName = normalizeIdentifier(catName);
-    dbName = normalizeIdentifier(dbName);
-    tableName = normalizeIdentifier(tableName);
-    constraintname = constraintname!=null?normalizeIdentifier(constraintname):null;
-    List<MConstraint> mConstraints = null;
-    List<String> constraintNames = new ArrayList<>();
-    Query query = null;
-
-    try {
-      query = pm.newQuery("select constraintName from org.apache.hadoop.hive.metastore.model.MConstraint  where "
-        + "((parentTable.tableName == ptblname && parentTable.database.name == pdbname && " +
-              "parentTable.database.catalogName == pcatname) || "
-        + "(childTable != null && childTable.tableName == ctblname &&" +
-              "childTable.database.name == cdbname && childTable.database.catalogName == ccatname)) " +
-          (constraintname != null ? " && constraintName == constraintname" : ""));
-      query.declareParameters("java.lang.String ptblname, java.lang.String pdbname,"
-          + "java.lang.String pcatname, java.lang.String ctblname, java.lang.String cdbname," +
-          "java.lang.String ccatname" +
-        (constraintname != null ? ", java.lang.String constraintname" : ""));
-      Collection<?> constraintNamesColl =
-        constraintname != null ?
-          ((Collection<?>) query.
-            executeWithArray(tableName, dbName, catName, tableName, dbName, catName, constraintname)):
-          ((Collection<?>) query.
-            executeWithArray(tableName, dbName, catName, tableName, dbName, catName));
-      for (Iterator<?> i = constraintNamesColl.iterator(); i.hasNext();) {
-        String currName = (String) i.next();
-        constraintNames.add(currName);
-      }
-      query = pm.newQuery(MConstraint.class);
-      query.setFilter("param.contains(constraintName)");
-      query.declareParameters("java.util.Collection param");
-      Collection<?> constraints = (Collection<?>)query.execute(constraintNames);
-      mConstraints = new ArrayList<>();
-      for (Iterator<?> i = constraints.iterator(); i.hasNext();) {
-        MConstraint currConstraint = (MConstraint) i.next();
-        mConstraints.add(currConstraint);
-      }
-    } finally {
-      if (query != null) {
-        query.closeAll();
-      }
-    }
-    return mConstraints;
-  }
-
-  private static String getFullyQualifiedTableName(String dbName, String tblName) {
-    return ((dbName == null || dbName.isEmpty()) ? "" : "\"" + dbName + "\".\"")
-        + "\"" + tblName + "\"";
-  }
-
-  @Override
-  public Table
-  getTable(String catName, String dbName, String tableName)
-      throws MetaException {
-    return getTable(catName, dbName, tableName, null);
-  }
-
-  @Override
-  public Table getTable(String catName, String dbName, String tableName,
-                        String writeIdList)
-      throws MetaException {
-    boolean commited = false;
-    Table tbl = null;
-    try {
-      openTransaction();
-      MTable mtable = getMTable(catName, dbName, tableName);
-      tbl = convertToTable(mtable);
-      // Retrieve creation metadata if needed
-      if (tbl != null && TableType.MATERIALIZED_VIEW.toString().equals(tbl.getTableType())) {
-        tbl.setCreationMetadata(
-                convertToCreationMetadata(getCreationMetadata(catName, dbName, tableName)));
-      }
-
-      // If transactional non partitioned table,
-      // check whether the current version table statistics
-      // in the metastore comply with the client query's snapshot isolation.
-      // Note: a partitioned table has table stats and table snapshot in MPartiiton.
-      if (writeIdList != null) {
-        boolean isTxn = tbl != null && TxnUtils.isTransactionalTable(tbl);
-        if (isTxn && !areTxnStatsSupported) {
-          StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
-          LOG.info("Removed COLUMN_STATS_ACCURATE from Table's parameters.");
-        } else if (isTxn && tbl.getPartitionKeysSize() == 0) {
-          if (isCurrentStatsValidForTheQuery(mtable, writeIdList, false)) {
-            tbl.setIsStatsCompliant(true);
-          } else {
-            tbl.setIsStatsCompliant(false);
-            // Do not make persistent the following state since it is the query specific (not global).
-            StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
-            LOG.info("Removed COLUMN_STATS_ACCURATE from Table's parameters.");
-          }
-        }
-      }
-      commited = commitTransaction();
-    } finally {
-      if (!commited) {
-        rollbackTransaction();
-      }
-    }
-    return tbl;
-  }
-
-  @Override
-  public List<String> getTables(String catName, String dbName, String pattern)
-      throws MetaException {
-    return getTables(catName, dbName, pattern, null);
-  }
-
-  @Override
-  public List<String> getTables(String catName, String dbName, String pattern, TableType tableType)
-      throws MetaException {
-    try {
-      // We only support pattern matching via jdo since pattern matching in Java
-      // might be different than the one used by the metastore backends
-      return getTablesInternal(catName, dbName, pattern, tableType,
-          (pattern == null || pattern.equals(".*")), true);
-    } catch (NoSuchObjectException e) {
-      throw new MetaException(ExceptionUtils.getStackTrace(e));
-    }
-  }
-
-  @Override
-  public List<TableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
-    return new GetListHelper<TableName>(null, null, null, true, false) {
-      @Override
-      protected List<TableName> getSqlResult(
-          GetHelper<List<TableName>> ctx) throws MetaException {
-        return directSql.getTableNamesWithStats();
-      }
-
-      @Override
-      protected List<TableName> getJdoResult(
-          GetHelper<List<TableName>> ctx) throws MetaException {
-        throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement?
-      }
-    }.run(false);
-  }
-
-  @Override
-  public Map<String, List<String>> getPartitionColsWithStats(String catName, String dbName, String tableName)
-      throws MetaException, NoSuchObjectException {
-    return new GetHelper<Map<String, List<String>>>(catName, dbName, null, true, false) {
-      @Override
-      protected Map<String, List<String>> getSqlResult(
-          GetHelper<Map<String, List<String>>> ctx) throws MetaException {
-        try {
-          return directSql.getColAndPartNamesWithStats(catName, dbName, tableName);
-        } catch (Throwable ex) {
-          LOG.error("DirectSQL failed", ex);
-          throw new MetaException(ex.getMessage());
-        }
-      }
-
-      @Override
-      protected Map<String, List<String>> getJdoResult(
-          GetHelper<Map<String, List<String>>> ctx) throws MetaException {
-        throw new UnsupportedOperationException("UnsupportedOperationException"); // TODO: implement?
-      }
-
-      @Override
-      protected String describeResult() {
-        return results.size() + " partitions";
-      }
-    }.run(false);
-  }
-
-  @Override
-  public List<TableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
-    return new GetListHelper<TableName>(null, null, null, true, false) {
-      @Override
-      protected List<TableName> getSqlResult(
-          GetHelper<List<TableName>> ctx) throws MetaException {
-        return directSql.getAllTableNamesForStats();
-      }
-
-      @Override
-      protected List<TableName> getJdoResult(
-          GetHelper<List<TableName>> ctx) throws MetaException {
-        boolean commited = false;
-        Query query = null;
-        List<TableName> result = new ArrayList<>();
-        openTransaction();
-        try {
-          String paramStr = "", whereStr = "";
-          for (int i = 0; i < MetaStoreDirectSql.STATS_TABLE_TYPES.length; ++i) {
-            if (i != 0) {
-              paramStr += ", ";
-              whereStr += "||";
-            }
-            paramStr += "java.lang.String tt" + i;
-            whereStr += " tableType == tt" + i;
-          }
-          query = pm.newQuery(MTable.class, whereStr);
-          query.declareParameters(paramStr);
-          @SuppressWarnings("unchecked")
-          Collection<MTable> tbls = (Collection<MTable>) query.executeWithArray(
-              query, MetaStoreDirectSql.STATS_TABLE_TYPES);
-          pm.retrieveAll(tbls);
-          for (MTable tbl : tbls) {
-            result.add(new TableName(
-                tbl.getDatabase().getCatalogName(), tbl.getDatabase().getName(), tbl.getTableName()));
-          }
-          commited = commitTransaction();
-        } finally {
-          rollbackAndCleanup(commited, query);
-        }
-        return result;
-      }
-    }.run(false);
-  }
-
-  protected List<String> getTablesInternal(String catName, String dbName, String pattern,
-                                           TableType tableType, boolean allowSql, boolean allowJdo)
-      throws MetaException, NoSuchObjectException {
-    final String db_name = normalizeIdentifier(dbName);
-    final String cat_name = normalizeIdentifier(catName);
-    return new GetListHelper<String>(cat_name, dbName, null, allowSql, allowJdo) {
-      @Override
-      protected List<String> getSqlResult(GetHelper<List<String>> ctx)
-              throws MetaException {
-        return directSql.getTables(cat_name, db_name, tableType);
-      }
-
-      @Override
-      protected List<String> getJdoResult(GetHelper<List<String>> ctx)
-              throws MetaException, NoSuchObjectException {
-        return getTablesInternalViaJdo(cat_name, db_name, pattern, tableType);
-      }
-    }.run(false);
-  }
-
-  private List<String> getTablesInternalViaJdo(String catName, String dbName, String pattern,
-                                               TableType tableType) throws MetaException {
-    boolean commited = false;
-    Query query = null;
-    List<String> tbls = null;
-    try {
-      openTransaction();
-      dbName = normalizeIdentifier(dbName);
-      // Take the pattern and split it on the | to get all the composing
-      // patterns
-      List<String> parameterVals = new ArrayList<>();
-      StringBuilder filterBuilder = new StringBuilder();
-      //adds database.name == dbName to the filter
-      appendSimpleCondition(filterBuilder, "database.name", new String[] {dbName}, parameterVals);
-      appendSimpleCondition(filterBuilder, "database.catalogName", new String[] {catName}, parameterVals);
-      if(pattern != null) {
-        appendPatternCondition(filterBuilder, "tableName", pattern, parameterVals);
-      }
-      if(tableType != null) {
-        appendPatternCondition(filterBuilder, "tableType", new String[] {tableType.toString()}, parameterVals);
-      }
-
-      query = pm.newQuery(MTable.class, filterBuilder.toString());
-      query.setResult("tableName");
-      query.setOrdering("tableName ascending");
-      Collection<String> names = (Collection<String>) query.executeWithArray(parameterVals.toArray(new String[0]));
-      tbls = new ArrayList<>(names);
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    return tbls;
-  }
-
-  @Override
-  public List<String> getMaterializedViewsForRewriting(String catName, String dbName)
-      throws MetaException, NoSuchObjectException {
-    final String db_name = normalizeIdentifier(dbName);
-    catName = normalizeIdentifier(catName);
-    boolean commited = false;
-    Query<?> query = null;
-    List<String> tbls = null;
-    try {
-      openTransaction();
-      dbName = normalizeIdentifier(dbName);
-      query = pm.newQuery(MTable.class,
-          "database.name == db && database.catalogName == cat && tableType == tt && rewriteEnabled == re");
-      query.declareParameters(
-          "java.lang.String db, java.lang.String cat, java.lang.String tt, boolean re");
-      query.setResult("tableName");
-      Collection<String> names = (Collection<String>) query.executeWithArray(
-          db_name, catName, TableType.MATERIALIZED_VIEW.toString(), true);
-      tbls = new ArrayList<>(names);
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    return tbls;
-  }
-
-  @Override
-  public int getDatabaseCount() throws MetaException {
-    return getObjectCount("name", MDatabase.class.getName());
-  }
-
-  @Override
-  public int getPartitionCount() throws MetaException {
-    return getObjectCount("partitionName", MPartition.class.getName());
-  }
-
-  @Override
-  public int getTableCount() throws MetaException {
-    return getObjectCount("tableName", MTable.class.getName());
-  }
-
-  private int getObjectCount(String fieldName, String objName) {
-    Long result = 0L;
-    boolean commited = false;
-    Query query = null;
-    try {
-      openTransaction();
-      String queryStr =
-        "select count(" + fieldName + ") from " + objName;
-      query = pm.newQuery(queryStr);
-      result = (Long) query.execute();
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    return result.intValue();
-  }
-
-  @Override
-  public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames,
-                                      List<String> tableTypes) throws MetaException {
-
-    boolean commited = false;
-    Query query = null;
-    List<TableMeta> metas = new ArrayList<>();
-    try {
-      openTransaction();
-      // Take the pattern and split it on the | to get all the composing
-      // patterns
-      StringBuilder filterBuilder = new StringBuilder();
-      List<String> parameterVals = new ArrayList<>();
-      appendSimpleCondition(filterBuilder, "database.catalogName", new String[] {catName}, parameterVals);
-      if (dbNames != null && !dbNames.equals("*")) {
-        appendPatternCondition(filterBuilder, "database.name", dbNames, parameterVals);
-      }
-      if (tableNames != null && !tableNames.equals("*")) {
-        appendPatternCondition(filterBuilder, "tableName", tableNames, parameterVals);
-      }
-      if (tableTypes != null && !tableTypes.isEmpty()) {
-        appendSimpleCondition(filterBuilder, "tableType", tableTypes.toArray(new String[0]), parameterVals);
-      }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("getTableMeta with filter " + filterBuilder.toString() + " params: " +
-            StringUtils.join(parameterVals, ", "));
-      }
-      query = pm.newQuery(MTable.class, filterBuilder.toString());
-      Collection<MTable> tables = (Collection<MTable>) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
-      for (MTable table : tables) {
-        TableMeta metaData = new TableMeta(
-            table.getDatabase().getName(), table.getTableName(), table.getTableType());
-        metaData.setComments(table.getParameters().get("comment"));
-        metas.add(metaData);
-      }
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    return metas;
-  }
-
-  private StringBuilder appendPatternCondition(StringBuilder filterBuilder, String fieldName,
-      String[] elements, List<String> parameterVals) {
-    return appendCondition(filterBuilder, fieldName, elements, true, parameterVals);
-  }
-
-  private StringBuilder appendPatternCondition(StringBuilder builder,
-      String fieldName, String elements, List<String> parameters) {
-      elements = normalizeIdentifier(elements);
-    return appendCondition(builder, fieldName, elements.split("\\|"), true, parameters);
-  }
-
-  private StringBuilder appendSimpleCondition(StringBuilder builder,
-      String fieldName, String[] elements, List<String> parameters) {
-    return appendCondition(builder, fieldName, elements, false, parameters);
-  }
-
-  private StringBuilder appendCondition(StringBuilder builder,
-      String fieldName, String[] elements, boolean pattern, List<String> parameters) {
-    if (builder.length() > 0) {
-      builder.append(" && ");
-    }
-    builder.append(" (");
-    int length = builder.length();
-    for (String element : elements) {
-      if (pattern) {
-        element = "(?i)" + element.replaceAll("\\*", ".*");
-      }
-      parameters.add(element);
-      if (builder.length() > length) {
-        builder.append(" || ");
-      }
-      builder.append(fieldName);
-      if (pattern) {
-        builder.append(".matches(").append(JDO_PARAM).append(parameters.size()).append(")");
-      } else {
-        builder.append(" == ").append(JDO_PARAM).append(parameters.size());
-      }
-    }
-    builder.append(" )");
-    return builder;
-  }
-
-  @Override
-  public List<String> getAllTables(String catName, String dbName) throws MetaException {
-    return getTables(catName, dbName, ".*");
-  }
-
-  class AttachedMTableInfo {
-    MTable mtbl;
-    MColumnDescriptor mcd;
-
-    public AttachedMTableInfo() {}
-
-    public AttachedMTableInfo(MTable mtbl, MColumnDescriptor mcd) {
-      this.mtbl = mtbl;
-      this.mcd = mcd;
-    }
-  }
-
-  private AttachedMTableInfo getMTable(String catName, String db, String table,
-                                       boolean retrieveCD) {
-    AttachedMTableInfo nmtbl = new AttachedMTableInfo();
-    MTable mtbl = null;
-    boolean commited = false;
-    Query query = null;
-    try {
-      openTransaction();
-      catName = normalizeIdentifier(catName);
-      db = normalizeIdentifier(db);
-      table = normalizeIdentifier(table);
-      query = pm.newQuery(MTable.class,
-          "tableName == table && database.name == db && database.catalogName == catname");
-      query.declareParameters(
-          "java.lang.String table, java.lang.String db, java.lang.String catname");
-      query.setUnique(true);
-      LOG.debug("Executing getMTable for " +
-          TableName.getQualified(catName, db, table));
-      mtbl = (MTable) query.execute(table, db, catName);
-      pm.retrieve(mtbl);
-      // Retrieving CD can be expensive and unnecessary, so do it only when required.
-      if (mtbl != null && retrieveCD) {
-        pm.retrieve(mtbl.getSd());
-        pm.retrieveAll(mtbl.getSd().getCD());
-        nmtbl.mcd = mtbl.getSd().getCD();
-      }
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    nmtbl.mtbl = mtbl;
-    return nmtbl;
-  }
-
-  private MCreationMetadata getCreationMetadata(String catName, String dbName, String tblName) {
-    boolean commited = false;
-    MCreationMetadata mcm = null;
-    Query query = null;
-    catName = normalizeIdentifier(catName);
-    dbName = normalizeIdentifier(dbName);
-    tblName = normalizeIdentifier(tblName);
-    try {
-      openTransaction();
-      query = pm.newQuery(
-          MCreationMetadata.class, "tblName == table && dbName == db && catalogName == cat");
-      query.declareParameters("java.lang.String table, java.lang.String db, java.lang.String cat");
-      query.setUnique(true);
-      mcm = (MCreationMetadata) query.execute(tblName, dbName, catName);
-      pm.retrieve(mcm);
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
-    return mcm;
-  }
-
-  private MTable getMTable(String catName, String db, String table) {
-    AttachedMTableInfo nmtbl = getMTable(catName, db, table, false);
-    return nmtbl.mtbl;
-  }
-
-  @Override
-  public List<Table> getTableObjectsByName(String catName, String db, List<String> tbl_names)
-      throws MetaException, UnknownDBException {
-    List<Table> tables = new ArrayList<>();
-    boolean committed = false;
-    Query dbExistsQuery = null;
-    Query query = null;
-    try {
-      openTransaction();
-      db = normalizeIdentifier(db);
-      catName = normalizeIdentifier(catName);
-
-      List<String> lowered_tbl_names = new ArrayList<>(tbl_names.size());
-      for (String t : tbl_names) {
-        lowered_tbl_names.add(normalizeIdentifier(t));
-      }
-      query = pm.newQuery(MTable.class);
-      query.setFilter("database.name == db && database.catalogName == cat && tbl_names.contains(tableName)");
-      query.declareParameters("java.lang.String db, java.lang.String cat, java.util.Collection tbl_names");
-      Collection mtables = (Collection) query.execute(db, catName, lowered_tbl_names);
-      if (mtables == null || mtables.isEmpty()) {
-        // Need to differentiate between an unmatched pattern and a non-existent database
-        dbExistsQuery = pm.newQuery(MDatabase.class, "name == db && catalogName == cat");
-        dbExistsQuery.declareParameters("java.lang.String db, java.lang.String cat");
-        dbExistsQuery.setUnique(true);
-        dbExistsQuery.setResult("name");
-        String dbNameIfExists = (String) dbExistsQuery.execute(db, catName);
-        if (org.apache.commons.lang.StringUtils.isEmpty(dbNameIfExists)) {
-          throw new UnknownDBException("Could not find database " +
-              DatabaseName.getQualified(catName, db));
-        }
-      } else {
-        for (Iterator iter = mtables.iterator(); iter.hasNext(); ) {
-          Table tbl = convertToTable((MTable) iter.next());
-          // Retrieve creation metadata if needed
-          if (TableType.MATERIALIZED_VIEW.toString().equals(tbl.getTableType())) {
-            tbl.setCreationMetadata(
-                convertToCreationMetadata(
-                    getCreationMetadata(tbl.getCatName(), tbl.getDbName(), tbl.getTableName())));
-          }
-          tables.add(tbl);
-        }
-      }
-      committed = commitTransaction();
-    } finally {
-      rollbackAndCleanup(committed, query);
-      if (dbExistsQuery != null) {
-        dbExistsQuery.closeAll();
-      }
-    }
-    return tables;
-  }
-
-  /** Makes shallow copy of a list to avoid DataNucleus mucking with our objects. */
-  private <T> List<T> convertList(List<T> dnList) {
-    return (dnList == null) ? null : Lists.newArrayList(dnList);
-  }
-
-  /** Makes shallow copy of a map to avoid DataNucleus mucking with our objects. */
-  private Map<String, String> convertMap(Map<String, String> dnMap) {
-    return MetaStoreServerUtils.trimMapNulls(dnMap,
-        MetastoreConf.getBoolVar(getConf(), ConfVars.ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS));
-  }
-
-  private Table convertToTable(MTable mtbl) throws MetaException {
-    if (mtbl == null) {
-      return null;
-    }
-    String tableType = mtbl.getTableType();
-    if (tableType == null) {
-      // for backwards compatibility with old metastore persistence
-      if (mtbl.getViewOriginalText() != null) {
-        tableType = TableType.VIRTUAL_VIEW.toString();
-      } else if (Boolean.parseBoolean(mtbl.getParameters().get("EXTERNAL"))) {
-        tableType = TableType.EXTERNAL_TABLE.toString();
-      } else {
-        tableType = TableType.MANAGED_TABLE.toString();
-      }
-    }
-    final Table t = new Table(mtbl.getTableName(), mtbl.getDatabase().getName(), mtbl
-        .getOwner(), mtbl.getCreateTime(), mtbl.getLastAccessTime(), mtbl
-        .getRetention(), convertToStorageDescriptor(mtbl.getSd()),
-        convertToFieldSchemas(mtbl.getPartitionKeys()), convertMap(mtbl.getParameters()),
-        mtbl.getViewOriginalText(), mtbl.getViewExpandedText(), tableType);
-
-    if (Strings.isNullOrEmpty(mtbl.getOwnerType())) {
-      // Before the ownerType exists in an old Hive schema, USER was the default type for owner.
-      // Let's set the default to USER to keep backward compatibility.
-      t.setOwnerType(PrincipalType.USER);
-    } else {
-      t.setOwnerType(PrincipalType.valueOf(mtbl.getOwnerType()));
-    }
-
-    t.setId(mtbl.getId());
-    t.setRewriteEnabled(mtbl.isRewriteEnabled());
-    t.setCatName(mtbl.getDatabase().getCatalogName());
-    t.setWriteId(mtbl.getWriteId());
-    return t;
-  }
-
-  private MTable convertToMTable(Table tbl) throws InvalidObjectException,
-      MetaException {
-    // NOTE: we don't set writeId in this method. Write ID is only set after validating the
-    //       existing write ID against the caller's valid list.
-    if (tbl == null) {
-      return null;
-    }
-    MDatabase mdb = null;
-    String catName = tbl.isSetCatName() ? tbl.getCatName() : getDefaultCatalog(conf);
-    try {
-      mdb = getMDatabase(catName, tbl.getDbName());
-    } catch (NoSuchObjectException e) {
-      LOG.error("Could not convert to MTable", e);
-      throw new InvalidObjectException("Database " +
-          DatabaseName.getQualified(catName, tbl.getDbName()) + " doesn't exist.");
-    }
-
-    // If the table has property EXTERNAL set, update table type
-    // accordingly
-    String tableType = tbl.getTableType();
-    boolean isExternal = Boolean.parseBoolean(tbl.getParameters().get("EXTERNAL"));
-    if (TableType.MANAGED_TABLE.toString().equals(tableType)) {
-      if (isExternal) {
-        tableType = TableType.EXTERNAL_TABLE.toString();
-      }
-    }
-    if (TableType.EXTERNAL_TABLE.toString().equals(tableType)) {
-      if (!isExternal) {
-        tableType = TableType.MANAGED_TABLE.toString();
-      }
-    }
-
-    PrincipalType ownerPrincipalType = tbl.getOwnerType();
-    String ownerType = (ownerPrincipalType == null) ? PrincipalType.USER.name() : ownerPrincipalType.name();
-
-    // A new table is always created with a new column descriptor
-    MTable mtable = new MTable(normalizeIdentifier(tbl.getTableName()), mdb,
-        convertToMStorageDescriptor(tbl.getSd()), tbl.getOwner(), ownerType, tbl
-        .getCreateTime(), tbl.getLastAccessTime(), tbl.getRetention(),
-        convertToMFieldSchemas(tbl.getPartitionKeys()), tbl.getParameters(),
-        tbl.getViewOriginalText(), tbl.getViewExpandedText(), tbl.isRewriteEnabled(),
-        tableType);
-    return mtable;
-  }
-
-  private List<MFieldSchema> convertToMFieldSchemas(List<FieldSchema> keys) {
-    List<MFieldSchema> mkeys = null;
-    if (keys != null) {
-      mkeys = new ArrayList<>(keys.size());
-      for (FieldSchema part : keys) {
-        mkeys.add(new MFieldSchema(part.getName().toLowerCase(),
-            part.getType(), part.getComment()));
-      }
-    }
-    return mkeys;
-  }
-
-  private List<FieldSchema> convertToFieldSchemas(List<MFieldSchema> mkeys) {
-    List<FieldSchema> keys = null;
-    if (mkeys != null) {
-      keys = new ArrayList<>(mkeys.size());
-      for (MFieldSchema part : mkeys) {
-        keys.add(new FieldSchema(part.getName(), part.getType(), part
-            .getComment()));
-      }
-    }
-    return keys;
-  }
-
-  private List<MOrder> convertToMOrders(List<Order> keys) {
-    List<MOrder> mkeys = null;
-    if (keys != null) {
-      mkeys = new ArrayList<>(keys.size());
-      for (Order part : keys) {
-        mkeys.add(new MOrder(normalizeIdentifier(part.getCol()), part.getOrder()));
-      }
-    }
-    return mkeys;
-  }
-
-  private List<Order> convertToOrders(List<MOrder> mkeys) {
-    List<Order> keys = null;
-    if (mkeys != null) {
-      keys = new ArrayList<>(mkeys.size());
-      for (MOrder part : mkeys) {
-        keys.add(new Order(part.getCol(), part.getOrder()));
-      }
-    }
-    return keys;
-  }
-
-  private SerDeInfo convertToSerDeInfo(MSerDeInfo ms) throws MetaException {
-    if (ms == null) {
-      throw new MetaException("Invalid SerDeInfo object");
-    }
-    SerDeInfo serde =
-        new SerDeInfo(ms.getName(), ms.getSerializationLib(), convertMap(ms.getParameters()));
-    if (ms.getDescription() != null) {
-      serde.setDescription(ms.getDescription());
-    }
-    if (ms.getSerializerClass() != null) {
-      serde.setSerializerClass(ms.getSerializerClass());
-    }
-    if (ms.getDeserializerClass() != null) {
-      serde.setDeserializerClass(ms.getDeserializerClass());
-    }
-    if (ms.getSerdeType() > 0) {
-      serde.setSerdeType(SerdeType.findByValue(ms.getSerdeType()));
-    }
-    return serde;
-  }
-
-  private MSerDeInfo convertToMSerDeInfo(SerDeInfo ms) throws MetaException {
-    if (ms == null) {
-      throw new MetaException("Invalid SerDeInfo object");
-    }
-    return new MSerDeInfo(ms.getName(), ms.getSerializationLib(), ms.getParameters(),
-        ms.getDescription(), ms.getSerializerClass(), ms.getDeserializerClass(),
-        ms.getSerdeType() == null ? 0 : ms.getSerdeType().getValue());
-  }
-
-  /**
-   * Given a list of model field schemas, create a new model column descriptor.
-   * @param cols the columns the column descriptor contains
-   * @return a new column descriptor db-backed object
-   */
-  private MColumnDescriptor createNewMColumnDescriptor(List<MFieldSchema> cols) {
-    if (cols == null) {
-      return null;
-    }
-    return new MColumnDescriptor(cols);
-  }
-
-  // MSD and SD should be same objects. Not sure how to make then same right now
-  // MSerdeInfo *& SerdeInfo should be same as well
-  private StorageDescriptor convertToStorageDescriptor(
-      MStorageDescriptor msd,
-      boolean noFS) throws MetaException {
-    if (msd == null) {
-      return null;
-    }
-    List<MFieldSchema> mFieldSchemas = msd.getCD() == null ? null : msd.getCD().getCols();
-
-    StorageDescriptor sd = new StorageDescriptor(noFS ? null : convertToFieldSchemas(mFieldSchemas),
-        msd.getLocation(), msd.getInputFormat(), msd.getOutputFormat(), msd
-        .isCompressed(), msd.getNumBuckets(), convertToSerDeInfo(msd
-        .getSerDeInfo()), convertList(msd.getBucketCols()), convertToOrders(msd
-        .getSortCols()), convertMap(msd.getParameters()));
-    SkewedInfo skewedInfo = new SkewedInfo(convertList(msd.getSkewedColNames()),
-        convertToSkewedValues(msd.getSkewedColValues()),
-        covertToSkewedMap(msd.getSkewedColValueLocationMaps()));
-    sd.setSkewedInfo(skewedInfo);
-    sd.setStoredAsSubDirectories(msd.isStoredAsSubDirectories());
-    return sd;
-  }
-
-  private StorageDescriptor convertToStorageDescriptor(MStorageDescriptor msd)
-      throws MetaException {
-    return convertToStorageDescriptor(msd, false);
-  }
-
-  /**
-   * Convert a list of MStringList to a list of list string
-   *
-   * @param mLists
-   * @return
-   */
-  private List<List<String>> convertToSkewedValues(List<MStringList> mLists) {
-    List<List<String>> lists = null;
-    if (mLists != null) {
-      lists = new ArrayList<>(mLists.size());
-      for (MStringList element : mLists) {
-        lists.add(new ArrayList<>(element.getInternalList()));
-      }
-    }
-    return lists;
-  }
-
-  private List<MStringList> convertToMStringLists(List<List<String>> mLists) {
-    List<MStringList> lists = null ;
-    if (null != mLists) {
-      lists = new ArrayList<>();
-      for (List<String> mList : mLists) {
-        lists.add(new MStringList(mList));
-      }
-    }
-    return lists;
-  }
-
-  /**
-   * Convert a MStringList Map to a Map
-   * @param mMap
-   * @return
-   */
-  private Map<List<String>, String> covertToSkewedMap(Map<MStringList, String> mMap) {
-    Map<List<String>, String> map = null;
-    if (mMap != null) {
-      map = new HashMap<>(mMap.size());
-      Set<MStringList> keys = mMap.keySet();
-      for (MStringList key : keys) {
-        map.put(new ArrayList<>(key.getInternalList()), mMap.get(key));
-      }
-    }
-    return map;
-  }
-
-  /**
-   * Covert a Map to a MStringList Map
-   * @param mMap
-   * @return
-   */
-  private Map<MStringList, String> covertToMapMStringList(Map<List<String>, String> mMap) {
-    Map<MStringList, String> map = null;
-    if (mMap != null) {
-      map = new HashMap<>(mMap.size());
-      Set<List<String>> keys = mMap.keySet();
-      for (List<String> key : keys) {
-        map.put(new MStringList(key), mMap.get(key));
-      }
-    }
-    return map;
-  }
-
-  /**
-   * Converts a storage descriptor to a db-backed storage descriptor.  Creates a
-   *   new db-backed column descriptor object for this SD.
-   * @param sd the storage descriptor to wrap in a db-backed object
-   * @return the storage descriptor db-backed object
-   * @throws MetaException
-   */
-  private MStorageDescriptor convertToMStorageDescriptor(StorageDescriptor sd)
-      throws MetaException {
-    if (sd == null) {
-      return null;
-    }
-    MColumnDescriptor mcd = createNewMColumnDescriptor(convertToMFieldSchemas(sd.getCols()));
-    return convertToMStorageDescriptor(sd, mcd);
-  }
-
-  /**
-   * Converts a storage descriptor to a db-backed storage descriptor.  It points the
-   * storage descriptor's column descriptor to the one passed as an argument,
-   * so it does not create a new mcolumn descriptor object.
-   * @param sd the storage descriptor to wrap in a db-backed object
-   * @param mcd the db-backed column descriptor
-   * @return the db-backed storage descriptor object
-   * @throws MetaException
-   */
-  private MStorageDescriptor convertToMStorageDescriptor(StorageDescriptor sd,
-      MColumnDescriptor mcd) throws MetaException {
-    if (sd == null) {
-      return null;
-    }
-    return new MStorageDescriptor(mcd, sd
-        .getLocation(), sd.getInputFormat(), sd.getOutputFormat(), sd
-        .isCompressed(), sd.getNumBuckets(), convertToMSerDeInfo(sd
-        .getSerdeInfo()), sd.getBucketCols(),
-        convertToMOrders(sd.getSortCols()), sd.getParameters(),
-        (null == sd.getSkewedInfo()) ? null
-            : sd.getSkewedInfo().getSkewedColNames(),
-        convertToMStringLists((null == sd.getSkewedInfo()) ? null : sd.getSkewedInfo()
-            .getSkewedColValues()),
-        covertToMapMStringList((null == sd.getSkewedInfo()) ? null : sd.getSkewedInfo()
-            .getSkewedColValueLocationMaps()), sd.isStoredAsSubDirectories());
-  }
-
-  private MCreationMetadata convertToMCreationMetadata(
-      CreationMetadata m) throws MetaException {
-    if (m == null) {
-      return null;
-    }
-    assert !m.isSetMaterializationTime();
-    Set<MTable> tablesUsed = new HashSet<>();
-    for (String fullyQualifiedName : m.getTablesUsed()) {
-      String[] names =  fullyQualifiedName.split("\\.");
-      tablesUsed.add(getMTable(m.getCatName(), names[0], names[1], false).mtbl);
-    }
-    return new MCreationMetadata(normalizeIdentifier(m.getCatName()),
-            normalizeIdentifier(m.getDbName()), normalizeIdentifier(m.getTblName()),
-        tablesUsed, m.getValidTxnList(), System.currentTimeMillis());
-  }
-
-  private CreationMetadata convertToCreationMetadata(
-      MCreationMetadata s) throws MetaException {
-    if (s == null) {
-      return null;
-    }
-    Set<String> tablesUsed = new HashSet<>();
-    for (MTable mtbl : s.getTables()) {
-      tablesUsed.add(
-          Warehouse.getQualifiedName(
-              mtbl.getDatabase().getName(), mtbl.getTableName()));
-    }
-    CreationMetadata r = new CreationMetadata(s.getCatalogName(),
-        s.getDbName(), s.getTblName(), tablesUsed);
-    r.setMaterializationTime(s.getMaterializationTime());
-    if (s.getTxnList() != null) {
-      r.setValidTxnList(s.getTxnList());
-    }
-    return r;
-  }
-
-  @Override
-  public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
-      throws InvalidObjectException, MetaException {
-    boolean success = false;
-    openTransaction();
-    try {
-      List<MTablePrivilege> tabGrants = null;
-      List<MTableColumnPrivilege> tabColumnGrants = null;
-      MTable table = this.getMTable(catName, dbName, tblName);
-      if ("TRUE".equalsIgnoreCase(table.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) {
-        tabGrants = this.listAllTableGrants(catName, dbName, tblName);
-        tabColumnGrants = this.listTableAllColumnGrants(catName, dbName, tblName);
-      }
-      List<Object> toPersist = new ArrayList<>();
-      for (Partition part : parts) {
-        if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) {
-          throw new MetaException("Partition does not belong to target table "
-              + dbName + "." + tblName + ": " + part);
-        }
-        MPartition mpart = convertToMPart(part, table, true);
-
-        toPersist.add(mpart);
-        int now = (int)(System.currentTimeMillis()/1000);
-        if (tabGrants != null) {
-          for (MTablePrivilege tab: tabGrants) {
-            toPersist.add(new MPartitionPrivilege(tab.getPrincipalName(),
-                tab.getPrincipalType(), mpart, tab.getPrivilege(), now,
-                tab.getGrantor(), tab.getGrantorType(), tab.getGrantOption(),
-                tab.getAuthorizer()));
-          }
-        }
-
-        if (tabColumnGrants != null) {
-          for (MTableColumnPrivilege col : tabColumnGrants) {
-            toPersist.add(new MPartitionColumnPrivilege(col.getPrincipalName(),
-                col.getPrincipalType(), mpart, col.getColumnName(), col.getPrivilege(),
-                now, col.getGrantor(), col.getGrantorType(), col.getGrantOption(),
-                col.getAuthorizer()));
-          }
-        }
-      }
-      if (CollectionUtils.isNotEmpty(toPersist)) {
-        pm.makePersistentAll(toPersist);
-        pm.flush();
-      }
-
-      success = commitTransaction();
-    } finally {
-      if (!success) {
-        rollbackTransaction();
-      }
-    }
-    return success;
-  }
-
-  private boolean isValidPartition(
-      Partition part, List<FieldSchema> partitionKeys, boolean ifNotExists) throws MetaException {
-    MetaStoreServerUtils.validatePartitionNameCharacters(part.getValues(),
-        partitionValidationPattern);
-    boolean doesExist = doesPartitionExist(part.getCatName(),
-        part.getDbName(), part.getTableName(), partitionKeys, part.getValues());
-    if (doesExist && !ifNotExists) {
-      throw new MetaException("Partition already exists: " + part);
-    }
-    return !doesExist;
-  }
-
-  @Override
-  public boolean addPartitions(String catName, String dbName, String tblName,
-                               PartitionSpecProxy partitionSpec, boolean ifNotExists)
-      throws InvalidObjectException, MetaException {
-    boolean success = false;
-    openTransaction();
-    try {
-      List<MTablePrivilege> tabGrants = null;
-      List<MTableColumnPrivilege> tabColumnGrants = null;
-      MTable table = this.getMTable(catName, dbName, tblName);
-     

<TRUNCATED>

[3/3] hive git commit: HIVE-20306 : Addendum to remove .orig files

Posted by vi...@apache.org.
HIVE-20306 : Addendum to remove .orig files


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/64bef36a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/64bef36a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/64bef36a

Branch: refs/heads/master
Commit: 64bef36a36bcbb3dd644ffdc394673fdad41eb93
Parents: 390afb5
Author: Vihang Karajgaonkar <vi...@apache.org>
Authored: Wed Oct 10 10:28:31 2018 -0700
Committer: Vihang Karajgaonkar <vi...@apache.org>
Committed: Wed Oct 10 10:40:13 2018 -0700

----------------------------------------------------------------------
 .../hive/metastore/MetaStoreDirectSql.java.orig |  2845 ----
 .../hadoop/hive/metastore/ObjectStore.java.orig | 12514 -----------------
 2 files changed, 15359 deletions(-)
----------------------------------------------------------------------



[2/3] hive git commit: HIVE-20306 : Addendum to remove .orig files

Posted by vi...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/64bef36a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java.orig
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java.orig b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java.orig
deleted file mode 100644
index 0a25b77..0000000
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java.orig
+++ /dev/null
@@ -1,2845 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.commons.lang.StringUtils.normalizeSpace;
-import static org.apache.commons.lang.StringUtils.repeat;
-import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
-
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
-
-import javax.jdo.PersistenceManager;
-import javax.jdo.Query;
-import javax.jdo.Transaction;
-import javax.jdo.datastore.JDOConnection;
-
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats;
-import org.apache.hadoop.hive.metastore.api.AggrStats;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
-import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
-import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
-import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
-import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SkewedInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.model.MConstraint;
-import org.apache.hadoop.hive.metastore.model.MCreationMetadata;
-import org.apache.hadoop.hive.metastore.model.MDatabase;
-import org.apache.hadoop.hive.metastore.model.MNotificationLog;
-import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
-import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege;
-import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
-import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege;
-import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
-import org.apache.hadoop.hive.metastore.model.MWMResourcePlan;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LogicalOperator;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
-import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.ColStatsObjWithSourceInfo;
-import org.apache.hive.common.util.BloomFilter;
-import org.datanucleus.store.rdbms.query.ForwardQueryResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * This class contains the optimizations for MetaStore that rely on direct SQL access to
- * the underlying database. It should use ANSI SQL and be compatible with common databases
- * such as MySQL (note that MySQL doesn't use full ANSI mode by default), Postgres, etc.
- *
- * As of now, only the partition retrieval is done this way to improve job startup time;
- * JDOQL partition retrieval is still present so as not to limit the ORM solution we have
- * to SQL stores only. There's always a way to do without direct SQL.
- */
-class MetaStoreDirectSql {
-  private static final int NO_BATCHING = -1, DETECT_BATCHING = 0;
-
-  private static final Logger LOG = LoggerFactory.getLogger(MetaStoreDirectSql.class);
-  private final PersistenceManager pm;
-  private final Configuration conf;
-  private final String schema;
-
-  /**
-   * We want to avoid db-specific code in this class and stick with ANSI SQL. However:
-   * 1) mysql and postgres are differently ansi-incompatible (mysql by default doesn't support
-   * quoted identifiers, and postgres contravenes ANSI by coercing unquoted ones to lower case).
-   * MySQL's way of working around this is simpler (just set ansi quotes mode on), so we will
-   * use that. MySQL detection is done by actually issuing the set-ansi-quotes command;
-   *
-   * Use sparingly, we don't want to devolve into another DataNucleus...
-   */
-  private final DatabaseProduct dbType;
-  private final int batchSize;
-  private final boolean convertMapNullsToEmptyStrings;
-  private final String defaultPartName;
-
-  /**
-   * Whether direct SQL can be used with the current datastore backing {@link #pm}.
-   */
-  private final boolean isCompatibleDatastore;
-  private final boolean isAggregateStatsCacheEnabled;
-  private AggregateStatsCache aggrStatsCache;
-
-  @java.lang.annotation.Target(java.lang.annotation.ElementType.FIELD)
-  @java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
-  private @interface TableName {}
-
-  // Table names with schema name, if necessary
-  @TableName
-  private String DBS, TBLS, PARTITIONS, DATABASE_PARAMS, PARTITION_PARAMS, SORT_COLS, SD_PARAMS,
-      SDS, SERDES, SKEWED_STRING_LIST_VALUES, SKEWED_VALUES, BUCKETING_COLS, SKEWED_COL_NAMES,
-      SKEWED_COL_VALUE_LOC_MAP, COLUMNS_V2, PARTITION_KEYS, SERDE_PARAMS, PART_COL_STATS, KEY_CONSTRAINTS,
-      TAB_COL_STATS, PARTITION_KEY_VALS, PART_PRIVS, PART_COL_PRIVS, SKEWED_STRING_LIST, CDS;
-
-
-  public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String schema) {
-    this.pm = pm;
-    this.conf = conf;
-    this.schema = schema;
-    DatabaseProduct dbType = null;
-    try {
-      dbType = DatabaseProduct.determineDatabaseProduct(getProductName(pm));
-    } catch (SQLException e) {
-      LOG.warn("Cannot determine database product; assuming OTHER", e);
-      dbType = DatabaseProduct.OTHER;
-    }
-    this.dbType = dbType;
-    int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE);
-    if (batchSize == DETECT_BATCHING) {
-      batchSize = DatabaseProduct.needsInBatching(dbType) ? 1000 : NO_BATCHING;
-    }
-    this.batchSize = batchSize;
-
-    for (java.lang.reflect.Field f : this.getClass().getDeclaredFields()) {
-      if (f.getAnnotation(TableName.class) == null) continue;
-      try {
-        f.set(this, getFullyQualifiedName(schema, f.getName()));
-      } catch (IllegalArgumentException | IllegalAccessException e) {
-        throw new RuntimeException("Internal error, cannot set " + f.getName());
-      }
-    }
-
-    convertMapNullsToEmptyStrings =
-        MetastoreConf.getBoolVar(conf, ConfVars.ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS);
-    defaultPartName = MetastoreConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
-
-    String jdoIdFactory = MetastoreConf.getVar(conf, ConfVars.IDENTIFIER_FACTORY);
-    if (! ("datanucleus1".equalsIgnoreCase(jdoIdFactory))){
-      LOG.warn("Underlying metastore does not use 'datanucleus1' for its ORM naming scheme."
-          + " Disabling directSQL as it uses hand-hardcoded SQL with that assumption.");
-      isCompatibleDatastore = false;
-    } else {
-      boolean isInTest = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
-      isCompatibleDatastore = (!isInTest || ensureDbInit()) && runTestQuery();
-      if (isCompatibleDatastore) {
-        LOG.debug("Using direct SQL, underlying DB is " + dbType);
-      }
-    }
-
-    isAggregateStatsCacheEnabled = MetastoreConf.getBoolVar(
-        conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED);
-    if (isAggregateStatsCacheEnabled) {
-      aggrStatsCache = AggregateStatsCache.getInstance(conf);
-    }
-  }
-
-  private static String getFullyQualifiedName(String schema, String tblName) {
-    return ((schema == null || schema.isEmpty()) ? "" : "\"" + schema + "\".\"")
-        + "\"" + tblName + "\"";
-  }
-
-
-  public MetaStoreDirectSql(PersistenceManager pm, Configuration conf) {
-    this(pm, conf, "");
-  }
-
-  static String getProductName(PersistenceManager pm) {
-    JDOConnection jdoConn = pm.getDataStoreConnection();
-    try {
-      return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName();
-    } catch (Throwable t) {
-      LOG.warn("Error retrieving product name", t);
-      return null;
-    } finally {
-      jdoConn.close(); // We must release the connection before we call other pm methods.
-    }
-  }
-
-  private boolean ensureDbInit() {
-    Transaction tx = pm.currentTransaction();
-    boolean doCommit = false;
-    if (!tx.isActive()) {
-      tx.begin();
-      doCommit = true;
-    }
-    LinkedList<Query> initQueries = new LinkedList<>();
-
-    try {
-      // Force the underlying db to initialize.
-      initQueries.add(pm.newQuery(MDatabase.class, "name == ''"));
-      initQueries.add(pm.newQuery(MTableColumnStatistics.class, "dbName == ''"));
-      initQueries.add(pm.newQuery(MPartitionColumnStatistics.class, "dbName == ''"));
-      initQueries.add(pm.newQuery(MConstraint.class, "childIntegerIndex < 0"));
-      initQueries.add(pm.newQuery(MNotificationLog.class, "dbName == ''"));
-      initQueries.add(pm.newQuery(MNotificationNextId.class, "nextEventId < -1"));
-      initQueries.add(pm.newQuery(MWMResourcePlan.class, "name == ''"));
-      initQueries.add(pm.newQuery(MCreationMetadata.class, "dbName == ''"));
-      initQueries.add(pm.newQuery(MPartitionPrivilege.class, "principalName == ''"));
-      initQueries.add(pm.newQuery(MPartitionColumnPrivilege.class, "principalName == ''"));
-      Query q;
-      while ((q = initQueries.peekFirst()) != null) {
-        q.execute();
-        initQueries.pollFirst();
-      }
-
-      return true;
-    } catch (Exception ex) {
-      doCommit = false;
-      LOG.warn("Database initialization failed; direct SQL is disabled", ex);
-      tx.rollback();
-      return false;
-    } finally {
-      if (doCommit) {
-        tx.commit();
-      }
-      for (Query q : initQueries) {
-        try {
-          q.closeAll();
-        } catch (Throwable t) {
-        }
-      }
-    }
-  }
-
-  private boolean runTestQuery() {
-    Transaction tx = pm.currentTransaction();
-    boolean doCommit = false;
-    if (!tx.isActive()) {
-      tx.begin();
-      doCommit = true;
-    }
-    Query query = null;
-    // Run a self-test query. If it doesn't work, we will self-disable. What a PITA...
-    String selfTestQuery = "select \"DB_ID\" from " + DBS + "";
-    try {
-      prepareTxn();
-      query = pm.newQuery("javax.jdo.query.SQL", selfTestQuery);
-      query.execute();
-      return true;
-    } catch (Throwable t) {
-      doCommit = false;
-      LOG.warn("Self-test query [" + selfTestQuery + "] failed; direct SQL is disabled", t);
-      tx.rollback();
-      return false;
-    } finally {
-      if (doCommit) {
-        tx.commit();
-      }
-      if (query != null) {
-        query.closeAll();
-      }
-    }
-  }
-
-  public String getSchema() {
-    return schema;
-  }
-
-  public boolean isCompatibleDatastore() {
-    return isCompatibleDatastore;
-  }
-
-  private void executeNoResult(final String queryText) throws SQLException {
-    JDOConnection jdoConn = pm.getDataStoreConnection();
-    Statement statement = null;
-    boolean doTrace = LOG.isDebugEnabled();
-    try {
-      long start = doTrace ? System.nanoTime() : 0;
-      statement = ((Connection)jdoConn.getNativeConnection()).createStatement();
-      statement.execute(queryText);
-      timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0);
-    } finally {
-      if(statement != null){
-          statement.close();
-      }
-      jdoConn.close(); // We must release the connection before we call other pm methods.
-    }
-  }
-
-  public Database getDatabase(String catName, String dbName) throws MetaException{
-    Query queryDbSelector = null;
-    Query queryDbParams = null;
-    try {
-      dbName = dbName.toLowerCase();
-      catName = catName.toLowerCase();
-
-      String queryTextDbSelector= "select "
-          + "\"DB_ID\", \"NAME\", \"DB_LOCATION_URI\", \"DESC\", "
-          + "\"OWNER_NAME\", \"OWNER_TYPE\", \"CTLG_NAME\" "
-          + "FROM "+ DBS
-          + " where \"NAME\" = ? and \"CTLG_NAME\" = ? ";
-      Object[] params = new Object[] { dbName, catName };
-      queryDbSelector = pm.newQuery("javax.jdo.query.SQL", queryTextDbSelector);
-
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("getDatabase:query instantiated : " + queryTextDbSelector
-            + " with param [" + params[0] + "]");
-      }
-
-      List<Object[]> sqlResult = executeWithArray(
-          queryDbSelector, params, queryTextDbSelector);
-      if ((sqlResult == null) || sqlResult.isEmpty()) {
-        return null;
-      }
-
-      assert(sqlResult.size() == 1);
-      if (sqlResult.get(0) == null) {
-        return null;
-      }
-
-      Object[] dbline = sqlResult.get(0);
-      Long dbid = extractSqlLong(dbline[0]);
-
-      String queryTextDbParams = "select \"PARAM_KEY\", \"PARAM_VALUE\" "
-          + " from " + DATABASE_PARAMS + " "
-          + " WHERE \"DB_ID\" = ? "
-          + " AND \"PARAM_KEY\" IS NOT NULL";
-      params[0] = dbid;
-      queryDbParams = pm.newQuery("javax.jdo.query.SQL", queryTextDbParams);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("getDatabase:query2 instantiated : " + queryTextDbParams
-            + " with param [" + params[0] + "]");
-      }
-
-      Map<String,String> dbParams = new HashMap<String,String>();
-      List<Object[]> sqlResult2 = ensureList(executeWithArray(
-          queryDbParams, params, queryTextDbParams));
-      if (!sqlResult2.isEmpty()) {
-        for (Object[] line : sqlResult2) {
-          dbParams.put(extractSqlString(line[0]), extractSqlString(line[1]));
-        }
-      }
-      Database db = new Database();
-      db.setName(extractSqlString(dbline[1]));
-      db.setLocationUri(extractSqlString(dbline[2]));
-      db.setDescription(extractSqlString(dbline[3]));
-      db.setOwnerName(extractSqlString(dbline[4]));
-      String type = extractSqlString(dbline[5]);
-      db.setOwnerType(
-          (null == type || type.trim().isEmpty()) ? null : PrincipalType.valueOf(type));
-      db.setCatalogName(extractSqlString(dbline[6]));
-      db.setParameters(MetaStoreServerUtils.trimMapNulls(dbParams,convertMapNullsToEmptyStrings));
-      if (LOG.isDebugEnabled()){
-        LOG.debug("getDatabase: directsql returning db " + db.getName()
-            + " locn["+db.getLocationUri()  +"] desc [" +db.getDescription()
-            + "] owner [" + db.getOwnerName() + "] ownertype ["+ db.getOwnerType() +"]");
-      }
-      return db;
-    } finally {
-      if (queryDbSelector != null){
-        queryDbSelector.closeAll();
-      }
-      if (queryDbParams != null){
-        queryDbParams.closeAll();
-      }
-    }
-  }
-
-  /**
-   * Get table names by using direct SQL queries.
-   * @param catName catalog name
-   * @param dbName Metastore database namme
-   * @param tableType Table type, or null if we want to get all tables
-   * @return list of table names
-   */
-  public List<String> getTables(String catName, String dbName, TableType tableType)
-      throws MetaException {
-    String queryText = "SELECT " + TBLS + ".\"TBL_NAME\""
-      + " FROM " + TBLS + " "
-      + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
-      + " WHERE " + DBS + ".\"NAME\" = ? AND " + DBS + ".\"CTLG_NAME\" = ? "
-      + (tableType == null ? "" : "AND " + TBLS + ".\"TBL_TYPE\" = ? ") ;
-
-    List<String> pms = new ArrayList<>();
-    pms.add(dbName);
-    pms.add(catName);
-    if (tableType != null) {
-      pms.add(tableType.toString());
-    }
-
-    Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
-    return executeWithArray(
-        queryParams, pms.toArray(), queryText);
-  }
-
-  /**
-   * Get table names by using direct SQL queries.
-   *
-   * @param dbName Metastore database namme
-   * @return list of table names
-   */
-  public List<String> getMaterializedViewsForRewriting(String dbName) throws MetaException {
-    String queryText = "SELECT " + TBLS + ".\"TBL_NAME\""
-      + " FROM " + TBLS + " "
-      + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
-      + " WHERE " + DBS + ".\"NAME\" = ? AND " + TBLS + ".\"TBL_TYPE\" = ? " ;
-
-    List<String> pms = new ArrayList<String>();
-    pms.add(dbName);
-    pms.add(TableType.MATERIALIZED_VIEW.toString());
-
-    Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
-    return executeWithArray(
-        queryParams, pms.toArray(), queryText);
-  }
-
-  /**
-   * Gets partitions by using direct SQL queries.
-   * @param catName Metastore catalog name.
-   * @param dbName Metastore db name.
-   * @param tblName Metastore table name.
-   * @param partNames Partition names to get.
-   * @return List of partitions.
-   */
-  public List<Partition> getPartitionsViaSqlFilter(final String catName, final String dbName,
-                                                   final String tblName, List<String> partNames)
-      throws MetaException {
-    if (partNames.isEmpty()) {
-      return Collections.emptyList();
-    }
-    return Batchable.runBatched(batchSize, partNames, new Batchable<String, Partition>() {
-      @Override
-      public List<Partition> run(List<String> input) throws MetaException {
-        String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
-        List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName,
-            filter, input, Collections.<String>emptyList(), null);
-        if (partitionIds.isEmpty()) {
-          return Collections.emptyList(); // no partitions, bail early.
-        }
-        return getPartitionsFromPartitionIds(catName, dbName, tblName, null, partitionIds);
-      }
-    });
-  }
-
-  /**
-   * Gets partitions by using direct SQL queries.
-   * @param filter The filter.
-   * @param max The maximum number of partitions to return.
-   * @return List of partitions.
-   */
-  public List<Partition> getPartitionsViaSqlFilter(
-      SqlFilterForPushdown filter, Integer max) throws MetaException {
-    Boolean isViewTable = isViewTable(filter.table);
-    String catName = filter.table.isSetCatName() ? filter.table.getCatName() :
-        DEFAULT_CATALOG_NAME;
-    List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName,
-        filter.table.getDbName(), filter.table.getTableName(), filter.filter, filter.params,
-        filter.joins, max);
-    if (partitionIds.isEmpty()) {
-      return Collections.emptyList(); // no partitions, bail early.
-    }
-    return Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() {
-      @Override
-      public List<Partition> run(List<Object> input) throws MetaException {
-        return getPartitionsFromPartitionIds(catName, filter.table.getDbName(),
-            filter.table.getTableName(), isViewTable, input);
-      }
-    });
-  }
-
-  public static class SqlFilterForPushdown {
-    private final List<Object> params = new ArrayList<>();
-    private final List<String> joins = new ArrayList<>();
-    private String filter;
-    private Table table;
-  }
-
-  public boolean generateSqlFilterForPushdown(
-      Table table, ExpressionTree tree, SqlFilterForPushdown result) throws MetaException {
-    return generateSqlFilterForPushdown(table, tree, null, result);
-  }
-
-  public boolean generateSqlFilterForPushdown(Table table, ExpressionTree tree, String defaultPartitionName,
-                                              SqlFilterForPushdown result) throws MetaException {
-    // Derby and Oracle do not interpret filters ANSI-properly in some cases and need a workaround.
-    boolean dbHasJoinCastBug = DatabaseProduct.hasJoinOperationOrderBug(dbType);
-    result.table = table;
-    result.filter = PartitionFilterGenerator.generateSqlFilter(table, tree, result.params,
-            result.joins, dbHasJoinCastBug, ((defaultPartitionName == null) ? defaultPartName : defaultPartitionName),
-            dbType, schema);
-    return result.filter != null;
-  }
-
-  /**
-   * Gets all partitions of a table by using direct SQL queries.
-   * @param catName Metastore catalog name.
-   * @param dbName Metastore db name.
-   * @param tblName Metastore table name.
-   * @param max The maximum number of partitions to return.
-   * @return List of partitions.
-   */
-  public List<Partition> getPartitions(String catName,
-      String dbName, String tblName, Integer max) throws MetaException {
-    List<Object> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName,
-        tblName, null, Collections.<String>emptyList(), Collections.<String>emptyList(), max);
-    if (partitionIds.isEmpty()) {
-      return Collections.emptyList(); // no partitions, bail early.
-    }
-
-    // Get full objects. For Oracle/etc. do it in batches.
-    List<Partition> result = Batchable.runBatched(batchSize, partitionIds, new Batchable<Object, Partition>() {
-      @Override
-      public List<Partition> run(List<Object> input) throws MetaException {
-        return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input);
-      }
-    });
-    return result;
-  }
-
-  private static Boolean isViewTable(Table t) {
-    return t.isSetTableType() ?
-        t.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) : null;
-  }
-
-  private boolean isViewTable(String catName, String dbName, String tblName) throws MetaException {
-    Query query = null;
-    try {
-      String queryText = "select \"TBL_TYPE\" from " + TBLS + "" +
-          " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" " +
-          " where " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + DBS + ".\"CTLG_NAME\" = ?";
-      Object[] params = new Object[] { tblName, dbName, catName };
-      query = pm.newQuery("javax.jdo.query.SQL", queryText);
-      query.setUnique(true);
-      Object result = executeWithArray(query, params, queryText);
-      return (result != null) && result.toString().equals(TableType.VIRTUAL_VIEW.toString());
-    } finally {
-      if (query != null) {
-        query.closeAll();
-      }
-    }
-  }
-
-  /**
-   * Get partition ids for the query using direct SQL queries, to avoid bazillion
-   * queries created by DN retrieving stuff for each object individually.
-   * @param catName MetaStore catalog name
-   * @param dbName MetaStore db name
-   * @param tblName MetaStore table name
-   * @param sqlFilter SQL filter to use. Better be SQL92-compliant.
-   * @param paramsForFilter params for ?-s in SQL filter text. Params must be in order.
-   * @param joinsForFilter if the filter needs additional join statement, they must be in
-   *                       this list. Better be SQL92-compliant.
-   * @param max The maximum number of partitions to return.
-   * @return List of partition objects.
-   */
-  private List<Object> getPartitionIdsViaSqlFilter(
-      String catName, String dbName, String tblName, String sqlFilter,
-      List<? extends Object> paramsForFilter, List<String> joinsForFilter, Integer max)
-      throws MetaException {
-    boolean doTrace = LOG.isDebugEnabled();
-    final String dbNameLcase = dbName.toLowerCase();
-    final String tblNameLcase = tblName.toLowerCase();
-    final String catNameLcase = normalizeSpace(catName).toLowerCase();
-
-    // We have to be mindful of order during filtering if we are not returning all partitions.
-    String orderForFilter = (max != null) ? " order by \"PART_NAME\" asc" : "";
-
-    String queryText =
-        "select " + PARTITIONS + ".\"PART_ID\" from " + PARTITIONS + ""
-      + "  inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" "
-      + "    and " + TBLS + ".\"TBL_NAME\" = ? "
-      + "  inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
-      + "     and " + DBS + ".\"NAME\" = ? "
-      + join(joinsForFilter, ' ')
-      + " where " + DBS + ".\"CTLG_NAME\" = ? "
-      + (StringUtils.isBlank(sqlFilter) ? "" : (" and " + sqlFilter)) + orderForFilter;
-    Object[] params = new Object[paramsForFilter.size() + 3];
-    params[0] = tblNameLcase;
-    params[1] = dbNameLcase;
-    params[2] = catNameLcase;
-    for (int i = 0; i < paramsForFilter.size(); ++i) {
-      params[i + 3] = paramsForFilter.get(i);
-    }
-
-    long start = doTrace ? System.nanoTime() : 0;
-    Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    if (max != null) {
-      query.setRange(0, max.shortValue());
-    }
-    List<Object> sqlResult = executeWithArray(query, params, queryText);
-    long queryTime = doTrace ? System.nanoTime() : 0;
-    timingTrace(doTrace, queryText, start, queryTime);
-    if (sqlResult.isEmpty()) {
-      return Collections.emptyList(); // no partitions, bail early.
-    }
-
-    List<Object> result = new ArrayList<Object>(sqlResult.size());
-    for (Object fields : sqlResult) {
-      result.add(extractSqlLong(fields));
-    }
-    query.closeAll();
-    return result;
-  }
-
-  /** Should be called with the list short enough to not trip up Oracle/etc. */
-  private List<Partition> getPartitionsFromPartitionIds(String catName, String dbName, String tblName,
-      Boolean isView, List<Object> partIdList) throws MetaException {
-    boolean doTrace = LOG.isDebugEnabled();
-
-    int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma
-    int sbCapacity = partIdList.size() * idStringWidth;
-
-    String partIds = getIdListForIn(partIdList);
-
-    // Get most of the fields for the IDs provided.
-    // Assume db and table names are the same for all partition, as provided in arguments.
-    String queryText =
-      "select " + PARTITIONS + ".\"PART_ID\", " + SDS + ".\"SD_ID\", " + SDS + ".\"CD_ID\","
-    + " " + SERDES + ".\"SERDE_ID\", " + PARTITIONS + ".\"CREATE_TIME\","
-    + " " + PARTITIONS + ".\"LAST_ACCESS_TIME\", " + SDS + ".\"INPUT_FORMAT\", " + SDS + ".\"IS_COMPRESSED\","
-    + " " + SDS + ".\"IS_STOREDASSUBDIRECTORIES\", " + SDS + ".\"LOCATION\", " + SDS + ".\"NUM_BUCKETS\","
-    + " " + SDS + ".\"OUTPUT_FORMAT\", " + SERDES + ".\"NAME\", " + SERDES + ".\"SLIB\", " + PARTITIONS
-    + ".\"WRITE_ID\"" + " from " + PARTITIONS + ""
-    + "  left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" "
-    + "  left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES + ".\"SERDE_ID\" "
-    + "where \"PART_ID\" in (" + partIds + ") order by \"PART_NAME\" asc";
-    long start = doTrace ? System.nanoTime() : 0;
-    Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    List<Object[]> sqlResult = executeWithArray(query, null, queryText);
-    long queryTime = doTrace ? System.nanoTime() : 0;
-    Deadline.checkTimeout();
-
-    // Read all the fields and create partitions, SDs and serdes.
-    TreeMap<Long, Partition> partitions = new TreeMap<Long, Partition>();
-    TreeMap<Long, StorageDescriptor> sds = new TreeMap<Long, StorageDescriptor>();
-    TreeMap<Long, SerDeInfo> serdes = new TreeMap<Long, SerDeInfo>();
-    TreeMap<Long, List<FieldSchema>> colss = new TreeMap<Long, List<FieldSchema>>();
-    // Keep order by name, consistent with JDO.
-    ArrayList<Partition> orderedResult = new ArrayList<Partition>(partIdList.size());
-
-    // Prepare StringBuilder-s for "in (...)" lists to use in one-to-many queries.
-    StringBuilder sdSb = new StringBuilder(sbCapacity), serdeSb = new StringBuilder(sbCapacity);
-    StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema.
-    tblName = tblName.toLowerCase();
-    dbName = dbName.toLowerCase();
-    catName = normalizeSpace(catName).toLowerCase();
-    for (Object[] fields : sqlResult) {
-      // Here comes the ugly part...
-      long partitionId = extractSqlLong(fields[0]);
-      Long sdId = extractSqlLong(fields[1]);
-      Long colId = extractSqlLong(fields[2]);
-      Long serdeId = extractSqlLong(fields[3]);
-      // A partition must have at least sdId and serdeId set, or nothing set if it's a view.
-      if (sdId == null || serdeId == null) {
-        if (isView == null) {
-          isView = isViewTable(catName, dbName, tblName);
-        }
-        if ((sdId != null || colId != null || serdeId != null) || !isView) {
-          throw new MetaException("Unexpected null for one of the IDs, SD " + sdId +
-                  ", serde " + serdeId + " for a " + (isView ? "" : "non-") + " view");
-        }
-      }
-
-      Partition part = new Partition();
-      orderedResult.add(part);
-      // Set the collection fields; some code might not check presence before accessing them.
-      part.setParameters(new HashMap<>());
-      part.setValues(new ArrayList<String>());
-      part.setCatName(catName);
-      part.setDbName(dbName);
-      part.setTableName(tblName);
-      if (fields[4] != null) part.setCreateTime(extractSqlInt(fields[4]));
-      if (fields[5] != null) part.setLastAccessTime(extractSqlInt(fields[5]));
-      Long writeId = extractSqlLong(fields[14]);
-      if (writeId != null) {
-        part.setWriteId(writeId);
-      }
-      partitions.put(partitionId, part);
-
-
-      if (sdId == null) continue; // Probably a view.
-      assert serdeId != null;
-
-      // We assume each partition has an unique SD.
-      StorageDescriptor sd = new StorageDescriptor();
-      StorageDescriptor oldSd = sds.put(sdId, sd);
-      if (oldSd != null) {
-        throw new MetaException("Partitions reuse SDs; we don't expect that");
-      }
-      // Set the collection fields; some code might not check presence before accessing them.
-      sd.setSortCols(new ArrayList<Order>());
-      sd.setBucketCols(new ArrayList<String>());
-      sd.setParameters(new HashMap<String, String>());
-      sd.setSkewedInfo(new SkewedInfo(new ArrayList<String>(),
-          new ArrayList<List<String>>(), new HashMap<List<String>, String>()));
-      sd.setInputFormat((String)fields[6]);
-      Boolean tmpBoolean = extractSqlBoolean(fields[7]);
-      if (tmpBoolean != null) sd.setCompressed(tmpBoolean);
-      tmpBoolean = extractSqlBoolean(fields[8]);
-      if (tmpBoolean != null) sd.setStoredAsSubDirectories(tmpBoolean);
-      sd.setLocation((String)fields[9]);
-      if (fields[10] != null) sd.setNumBuckets(extractSqlInt(fields[10]));
-      sd.setOutputFormat((String)fields[11]);
-      sdSb.append(sdId).append(",");
-      part.setSd(sd);
-
-      if (colId != null) {
-        List<FieldSchema> cols = colss.get(colId);
-        // We expect that colId will be the same for all (or many) SDs.
-        if (cols == null) {
-          cols = new ArrayList<FieldSchema>();
-          colss.put(colId, cols);
-          colsSb.append(colId).append(",");
-        }
-        sd.setCols(cols);
-      }
-
-      // We assume each SD has an unique serde.
-      SerDeInfo serde = new SerDeInfo();
-      SerDeInfo oldSerde = serdes.put(serdeId, serde);
-      if (oldSerde != null) {
-        throw new MetaException("SDs reuse serdes; we don't expect that");
-      }
-      serde.setParameters(new HashMap<String, String>());
-      serde.setName((String)fields[12]);
-      serde.setSerializationLib((String)fields[13]);
-      serdeSb.append(serdeId).append(",");
-      sd.setSerdeInfo(serde);
-
-      Deadline.checkTimeout();
-    }
-    query.closeAll();
-    timingTrace(doTrace, queryText, start, queryTime);
-
-    // Now get all the one-to-many things. Start with partitions.
-    queryText = "select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + PARTITION_PARAMS + ""
-        + " where \"PART_ID\" in (" + partIds + ") and \"PARAM_KEY\" is not null"
-        + " order by \"PART_ID\" asc";
-    loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
-      @Override
-      public void apply(Partition t, Object[] fields) {
-        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
-      }});
-    // Perform conversion of null map values
-    for (Partition t : partitions.values()) {
-      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
-    }
-
-    queryText = "select \"PART_ID\", \"PART_KEY_VAL\" from " + PARTITION_KEY_VALS + ""
-        + " where \"PART_ID\" in (" + partIds + ")"
-        + " order by \"PART_ID\" asc, \"INTEGER_IDX\" asc";
-    loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
-      @Override
-      public void apply(Partition t, Object[] fields) {
-        t.addToValues((String)fields[1]);
-      }});
-
-    // Prepare IN (blah) lists for the following queries. Cut off the final ','s.
-    if (sdSb.length() == 0) {
-      assert serdeSb.length() == 0 && colsSb.length() == 0;
-      return orderedResult; // No SDs, probably a view.
-    }
-
-    String sdIds = trimCommaList(sdSb);
-    String serdeIds = trimCommaList(serdeSb);
-    String colIds = trimCommaList(colsSb);
-
-    // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
-    queryText = "select \"SD_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SD_PARAMS + ""
-        + " where \"SD_ID\" in (" + sdIds + ") and \"PARAM_KEY\" is not null"
-        + " order by \"SD_ID\" asc";
-    loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-      @Override
-      public void apply(StorageDescriptor t, Object[] fields) {
-        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
-      }});
-    // Perform conversion of null map values
-    for (StorageDescriptor t : sds.values()) {
-      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
-    }
-
-    queryText = "select \"SD_ID\", \"COLUMN_NAME\", " + SORT_COLS + ".\"ORDER\""
-        + " from " + SORT_COLS + ""
-        + " where \"SD_ID\" in (" + sdIds + ")"
-        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
-    loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-      @Override
-      public void apply(StorageDescriptor t, Object[] fields) {
-        if (fields[2] == null) return;
-        t.addToSortCols(new Order((String)fields[1], extractSqlInt(fields[2])));
-      }});
-
-    queryText = "select \"SD_ID\", \"BUCKET_COL_NAME\" from " + BUCKETING_COLS + ""
-        + " where \"SD_ID\" in (" + sdIds + ")"
-        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
-    loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-      @Override
-      public void apply(StorageDescriptor t, Object[] fields) {
-        t.addToBucketCols((String)fields[1]);
-      }});
-
-    // Skewed columns stuff.
-    queryText = "select \"SD_ID\", \"SKEWED_COL_NAME\" from " + SKEWED_COL_NAMES + ""
-        + " where \"SD_ID\" in (" + sdIds + ")"
-        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
-    boolean hasSkewedColumns =
-      loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-        @Override
-        public void apply(StorageDescriptor t, Object[] fields) {
-          if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
-          t.getSkewedInfo().addToSkewedColNames((String)fields[1]);
-        }}) > 0;
-
-    // Assume we don't need to fetch the rest of the skewed column data if we have no columns.
-    if (hasSkewedColumns) {
-      // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
-      queryText =
-            "select " + SKEWED_VALUES + ".\"SD_ID_OID\","
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\","
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
-          + "from " + SKEWED_VALUES + " "
-          + "  left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_VALUES + "."
-          + "\"STRING_LIST_ID_EID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
-          + "where " + SKEWED_VALUES + ".\"SD_ID_OID\" in (" + sdIds + ") "
-          + "  and " + SKEWED_VALUES + ".\"STRING_LIST_ID_EID\" is not null "
-          + "  and " + SKEWED_VALUES + ".\"INTEGER_IDX\" >= 0 "
-          + "order by " + SKEWED_VALUES + ".\"SD_ID_OID\" asc, " + SKEWED_VALUES + ".\"INTEGER_IDX\" asc,"
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
-      loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-        private Long currentListId;
-        private List<String> currentList;
-        @Override
-        public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
-          if (!t.isSetSkewedInfo()) t.setSkewedInfo(new SkewedInfo());
-          // Note that this is not a typical list accumulator - there's no call to finalize
-          // the last list. Instead we add list to SD first, as well as locally to add elements.
-          if (fields[1] == null) {
-            currentList = null; // left outer join produced a list with no values
-            currentListId = null;
-            t.getSkewedInfo().addToSkewedColValues(Collections.<String>emptyList());
-          } else {
-            long fieldsListId = extractSqlLong(fields[1]);
-            if (currentListId == null || fieldsListId != currentListId) {
-              currentList = new ArrayList<String>();
-              currentListId = fieldsListId;
-              t.getSkewedInfo().addToSkewedColValues(currentList);
-            }
-            currentList.add((String)fields[2]);
-          }
-        }});
-
-      // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
-      queryText =
-            "select " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\","
-          + " " + SKEWED_STRING_LIST_VALUES + ".STRING_LIST_ID,"
-          + " " + SKEWED_COL_VALUE_LOC_MAP + ".\"LOCATION\","
-          + " " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_VALUE\" "
-          + "from " + SKEWED_COL_VALUE_LOC_MAP + ""
-          + "  left outer join " + SKEWED_STRING_LIST_VALUES + " on " + SKEWED_COL_VALUE_LOC_MAP + "."
-          + "\"STRING_LIST_ID_KID\" = " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" "
-          + "where " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" in (" + sdIds + ")"
-          + "  and " + SKEWED_COL_VALUE_LOC_MAP + ".\"STRING_LIST_ID_KID\" is not null "
-          + "order by " + SKEWED_COL_VALUE_LOC_MAP + ".\"SD_ID\" asc,"
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"STRING_LIST_ID\" asc,"
-          + "  " + SKEWED_STRING_LIST_VALUES + ".\"INTEGER_IDX\" asc";
-
-      loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
-        private Long currentListId;
-        private List<String> currentList;
-        @Override
-        public void apply(StorageDescriptor t, Object[] fields) throws MetaException {
-          if (!t.isSetSkewedInfo()) {
-            SkewedInfo skewedInfo = new SkewedInfo();
-            skewedInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
-            t.setSkewedInfo(skewedInfo);
-          }
-          Map<List<String>, String> skewMap = t.getSkewedInfo().getSkewedColValueLocationMaps();
-          // Note that this is not a typical list accumulator - there's no call to finalize
-          // the last list. Instead we add list to SD first, as well as locally to add elements.
-          if (fields[1] == null) {
-            currentList = new ArrayList<String>(); // left outer join produced a list with no values
-            currentListId = null;
-          } else {
-            long fieldsListId = extractSqlLong(fields[1]);
-            if (currentListId == null || fieldsListId != currentListId) {
-              currentList = new ArrayList<String>();
-              currentListId = fieldsListId;
-            } else {
-              skewMap.remove(currentList); // value based compare.. remove first
-            }
-            currentList.add((String)fields[3]);
-          }
-          skewMap.put(currentList, (String)fields[2]);
-        }});
-    } // if (hasSkewedColumns)
-
-    // Get FieldSchema stuff if any.
-    if (!colss.isEmpty()) {
-      // We are skipping the CDS table here, as it seems to be totally useless.
-      queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", \"TYPE_NAME\""
-          + " from " + COLUMNS_V2 + " where \"CD_ID\" in (" + colIds + ")"
-          + " order by \"CD_ID\" asc, \"INTEGER_IDX\" asc";
-      loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
-        @Override
-        public void apply(List<FieldSchema> t, Object[] fields) {
-          t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1]));
-        }});
-    }
-
-    // Finally, get all the stuff for serdes - just the params.
-    queryText = "select \"SERDE_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from " + SERDE_PARAMS + ""
-        + " where \"SERDE_ID\" in (" + serdeIds + ") and \"PARAM_KEY\" is not null"
-        + " order by \"SERDE_ID\" asc";
-    loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
-      @Override
-      public void apply(SerDeInfo t, Object[] fields) {
-        t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
-      }});
-    // Perform conversion of null map values
-    for (SerDeInfo t : serdes.values()) {
-      t.setParameters(MetaStoreServerUtils.trimMapNulls(t.getParameters(), convertMapNullsToEmptyStrings));
-    }
-
-    return orderedResult;
-  }
-
-  public int getNumPartitionsViaSqlFilter(SqlFilterForPushdown filter) throws MetaException {
-    boolean doTrace = LOG.isDebugEnabled();
-    String catName = filter.table.getCatName().toLowerCase();
-    String dbName = filter.table.getDbName().toLowerCase();
-    String tblName = filter.table.getTableName().toLowerCase();
-
-    // Get number of partitions by doing count on PART_ID.
-    String queryText = "select count(" + PARTITIONS + ".\"PART_ID\") from " + PARTITIONS + ""
-      + "  inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\" "
-      + "    and " + TBLS + ".\"TBL_NAME\" = ? "
-      + "  inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
-      + "     and " + DBS + ".\"NAME\" = ? "
-      + join(filter.joins, ' ')
-      + " where " + DBS + ".\"CTLG_NAME\" = ? "
-      + (filter.filter == null || filter.filter.trim().isEmpty() ? "" : (" and " + filter.filter));
-
-    Object[] params = new Object[filter.params.size() + 3];
-    params[0] = tblName;
-    params[1] = dbName;
-    params[2] = catName;
-    for (int i = 0; i < filter.params.size(); ++i) {
-      params[i + 3] = filter.params.get(i);
-    }
-
-    long start = doTrace ? System.nanoTime() : 0;
-    Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    query.setUnique(true);
-    int sqlResult = extractSqlInt(query.executeWithArray(params));
-    long queryTime = doTrace ? System.nanoTime() : 0;
-    timingTrace(doTrace, queryText, start, queryTime);
-    return sqlResult;
-  }
-
-
-  private void timingTrace(boolean doTrace, String queryText, long start, long queryTime) {
-    if (!doTrace) return;
-    LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
-        (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]");
-  }
-
-  static Long extractSqlLong(Object obj) throws MetaException {
-    if (obj == null) return null;
-    if (!(obj instanceof Number)) {
-      throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
-    }
-    return ((Number)obj).longValue();
-  }
-
-  /**
-   * Convert a boolean value returned from the RDBMS to a Java Boolean object.
-   * MySQL has booleans, but e.g. Derby uses 'Y'/'N' mapping.
-   *
-   * @param value
-   *          column value from the database
-   * @return The Boolean value of the database column value, null if the column
-   *         value is null
-   * @throws MetaException
-   *           if the column value cannot be converted into a Boolean object
-   */
-  private static Boolean extractSqlBoolean(Object value) throws MetaException {
-    if (value == null) {
-      return null;
-    }
-    if (value instanceof Boolean) {
-      return (Boolean)value;
-    }
-    if (value instanceof String) {
-      try {
-        return BooleanUtils.toBooleanObject((String) value, "Y", "N", null);
-      } catch (IllegalArgumentException iae) {
-        // NOOP
-      }
-    }
-    throw new MetaException("Cannot extract boolean from column value " + value);
-  }
-
-  private int extractSqlInt(Object field) {
-    return ((Number)field).intValue();
-  }
-
-  private String extractSqlString(Object value) {
-    if (value == null) return null;
-    return value.toString();
-  }
-
-  static Double extractSqlDouble(Object obj) throws MetaException {
-    if (obj == null)
-      return null;
-    if (!(obj instanceof Number)) {
-      throw new MetaException("Expected numeric type but got " + obj.getClass().getName());
-    }
-    return ((Number) obj).doubleValue();
-  }
-
-  private String extractSqlClob(Object value) {
-    if (value == null) return null;
-    try {
-      if (value instanceof Clob) {
-        // we trim the Clob value to a max length an int can hold
-        int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2;
-        return ((Clob)value).getSubString(1L, maxLength);
-      } else {
-        return value.toString();
-      }
-    } catch (SQLException sqle) {
-      return null;
-    }
-  }
-
-  static byte[] extractSqlBlob(Object value) throws MetaException {
-    if (value == null)
-      return null;
-    if (value instanceof Blob) {
-      //derby, oracle
-      try {
-        // getBytes function says: pos the ordinal position of the first byte in
-        // the BLOB value to be extracted; the first byte is at position 1
-        return ((Blob) value).getBytes(1, (int) ((Blob) value).length());
-      } catch (SQLException e) {
-        throw new MetaException("Encounter error while processing blob.");
-      }
-    }
-    else if (value instanceof byte[]) {
-      // mysql, postgres, sql server
-      return (byte[]) value;
-    }
-	else {
-      // this may happen when enablebitvector is false
-      LOG.debug("Expected blob type but got " + value.getClass().getName());
-      return null;
-    }
-  }
-
-  /**
-   * Helper method for preparing for "SOMETHING_ID in (...)" to use in future queries.
-   * @param objectIds the objectId collection
-   * @return The concatenated list
-   * @throws MetaException If the list contains wrong data
-   */
-  private static String getIdListForIn(List<Object> objectIds) throws MetaException {
-    return objectIds.stream()
-               .map(i -> i.toString())
-               .collect(Collectors.joining(","));
-  }
-
-  private static String trimCommaList(StringBuilder sb) {
-    if (sb.length() > 0) {
-      sb.setLength(sb.length() - 1);
-    }
-    return sb.toString();
-  }
-
-  private abstract class ApplyFunc<Target> {
-    public abstract void apply(Target t, Object[] fields) throws MetaException;
-  }
-
-  /**
-   * Merges applies the result of a PM SQL query into a tree of object.
-   * Essentially it's an object join. DN could do this for us, but it issues queries
-   * separately for every object, which is suboptimal.
-   * @param tree The object tree, by ID.
-   * @param queryText The query text.
-   * @param keyIndex Index of the Long column corresponding to the map ID in query result rows.
-   * @param func The function that is called on each (object,row) pair with the same id.
-   * @return the count of results returned from the query.
-   */
-  private <T> int loopJoinOrderedResult(TreeMap<Long, T> tree,
-      String queryText, int keyIndex, ApplyFunc<T> func) throws MetaException {
-    boolean doTrace = LOG.isDebugEnabled();
-    long start = doTrace ? System.nanoTime() : 0;
-    Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    Object result = query.execute();
-    long queryTime = doTrace ? System.nanoTime() : 0;
-    if (result == null) {
-      query.closeAll();
-      return 0;
-    }
-    List<Object[]> list = ensureList(result);
-    Iterator<Object[]> iter = list.iterator();
-    Object[] fields = null;
-    for (Map.Entry<Long, T> entry : tree.entrySet()) {
-      if (fields == null && !iter.hasNext()) break;
-      long id = entry.getKey();
-      while (fields != null || iter.hasNext()) {
-        if (fields == null) {
-          fields = iter.next();
-        }
-        long nestedId = extractSqlLong(fields[keyIndex]);
-        if (nestedId < id) throw new MetaException("Found entries for unknown ID " + nestedId);
-        if (nestedId > id) break; // fields belong to one of the next entries
-        func.apply(entry.getValue(), fields);
-        fields = null;
-      }
-      Deadline.checkTimeout();
-    }
-    int rv = list.size();
-    query.closeAll();
-    timingTrace(doTrace, queryText, start, queryTime);
-    return rv;
-  }
-
-  private static class PartitionFilterGenerator extends TreeVisitor {
-    private final Table table;
-    private final FilterBuilder filterBuffer;
-    private final List<Object> params;
-    private final List<String> joins;
-    private final boolean dbHasJoinCastBug;
-    private final String defaultPartName;
-    private final DatabaseProduct dbType;
-    private final String PARTITION_KEY_VALS, PARTITIONS, DBS, TBLS;
-
-    private PartitionFilterGenerator(Table table, List<Object> params, List<String> joins,
-        boolean dbHasJoinCastBug, String defaultPartName, DatabaseProduct dbType, String schema) {
-      this.table = table;
-      this.params = params;
-      this.joins = joins;
-      this.dbHasJoinCastBug = dbHasJoinCastBug;
-      this.filterBuffer = new FilterBuilder(false);
-      this.defaultPartName = defaultPartName;
-      this.dbType = dbType;
-      this.PARTITION_KEY_VALS = getFullyQualifiedName(schema, "PARTITION_KEY_VALS");
-      this.PARTITIONS = getFullyQualifiedName(schema, "PARTITIONS");
-      this.DBS = getFullyQualifiedName(schema, "DBS");
-      this.TBLS = getFullyQualifiedName(schema, "TBLS");
-    }
-
-    /**
-     * Generate the ANSI SQL92 filter for the given expression tree
-     * @param table the table being queried
-     * @param params the ordered parameters for the resulting expression
-     * @param joins the joins necessary for the resulting expression
-     * @return the string representation of the expression tree
-     */
-    private static String generateSqlFilter(Table table, ExpressionTree tree, List<Object> params,
-        List<String> joins, boolean dbHasJoinCastBug, String defaultPartName,
-        DatabaseProduct dbType, String schema) throws MetaException {
-      assert table != null;
-      if (tree == null) {
-        // consistent with other APIs like makeExpressionTree, null is returned to indicate that
-        // the filter could not pushed down due to parsing issue etc
-        return null;
-      }
-      if (tree.getRoot() == null) {
-        return "";
-      }
-      PartitionFilterGenerator visitor = new PartitionFilterGenerator(
-          table, params, joins, dbHasJoinCastBug, defaultPartName, dbType, schema);
-      tree.accept(visitor);
-      if (visitor.filterBuffer.hasError()) {
-        LOG.info("Unable to push down SQL filter: " + visitor.filterBuffer.getErrorMessage());
-        return null;
-      }
-
-      // Some joins might be null (see processNode for LeafNode), clean them up.
-      for (int i = 0; i < joins.size(); ++i) {
-        if (joins.get(i) != null) continue;
-        joins.remove(i--);
-      }
-      return "(" + visitor.filterBuffer.getFilter() + ")";
-    }
-
-    @Override
-    protected void beginTreeNode(TreeNode node) throws MetaException {
-      filterBuffer.append(" (");
-    }
-
-    @Override
-    protected void midTreeNode(TreeNode node) throws MetaException {
-      filterBuffer.append((node.getAndOr() == LogicalOperator.AND) ? " and " : " or ");
-    }
-
-    @Override
-    protected void endTreeNode(TreeNode node) throws MetaException {
-      filterBuffer.append(") ");
-    }
-
-    @Override
-    protected boolean shouldStop() {
-      return filterBuffer.hasError();
-    }
-
-    private static enum FilterType {
-      Integral,
-      String,
-      Date,
-
-      Invalid;
-
-      static FilterType fromType(String colTypeStr) {
-        if (colTypeStr.equals(ColumnType.STRING_TYPE_NAME)) {
-          return FilterType.String;
-        } else if (colTypeStr.equals(ColumnType.DATE_TYPE_NAME)) {
-          return FilterType.Date;
-        } else if (ColumnType.IntegralTypes.contains(colTypeStr)) {
-          return FilterType.Integral;
-        }
-        return FilterType.Invalid;
-      }
-
-      public static FilterType fromClass(Object value) {
-        if (value instanceof String) {
-          return FilterType.String;
-        } else if (value instanceof Long) {
-          return FilterType.Integral;
-        } else if (value instanceof java.sql.Date) {
-          return FilterType.Date;
-        }
-        return FilterType.Invalid;
-      }
-    }
-
-    @Override
-    public void visit(LeafNode node) throws MetaException {
-      if (node.operator == Operator.LIKE) {
-        filterBuffer.setError("LIKE is not supported for SQL filter pushdown");
-        return;
-      }
-      int partColCount = table.getPartitionKeys().size();
-      int partColIndex = node.getPartColIndexForFilter(table, filterBuffer);
-      if (filterBuffer.hasError()) return;
-
-      // We skipped 'like', other ops should all work as long as the types are right.
-      String colTypeStr = table.getPartitionKeys().get(partColIndex).getType();
-      FilterType colType = FilterType.fromType(colTypeStr);
-      if (colType == FilterType.Invalid) {
-        filterBuffer.setError("Filter pushdown not supported for type " + colTypeStr);
-        return;
-      }
-      FilterType valType = FilterType.fromClass(node.value);
-      Object nodeValue = node.value;
-      if (valType == FilterType.Invalid) {
-        filterBuffer.setError("Filter pushdown not supported for value " + node.value.getClass());
-        return;
-      }
-
-      // if Filter.g does date parsing for quoted strings, we'd need to verify there's no
-      // type mismatch when string col is filtered by a string that looks like date.
-      if (colType == FilterType.Date && valType == FilterType.String) {
-        // Filter.g cannot parse a quoted date; try to parse date here too.
-        try {
-          nodeValue = MetaStoreUtils.PARTITION_DATE_FORMAT.get().parse((String)nodeValue);
-          valType = FilterType.Date;
-        } catch (ParseException pe) { // do nothing, handled below - types will mismatch
-        }
-      }
-
-      // We format it so we are sure we are getting the right value
-      if (valType == FilterType.Date) {
-        // Format
-        nodeValue = MetaStoreUtils.PARTITION_DATE_FORMAT.get().format(nodeValue);
-      }
-
-      boolean isDefaultPartition = (valType == FilterType.String) && defaultPartName.equals(nodeValue);
-      if ((colType != valType) && (!isDefaultPartition)) {
-        // It's not clear how filtering for e.g. "stringCol > 5" should work (which side is
-        // to be coerced?). Let the expression evaluation sort this one out, not metastore.
-        filterBuffer.setError("Cannot push down filter for "
-            + colTypeStr + " column and value " + nodeValue.getClass());
-        return;
-      }
-
-      if (joins.isEmpty()) {
-        // There's a fixed number of partition cols that we might have filters on. To avoid
-        // joining multiple times for one column (if there are several filters on it), we will
-        // keep numCols elements in the list, one for each column; we will fill it with nulls,
-        // put each join at a corresponding index when necessary, and remove nulls in the end.
-        for (int i = 0; i < partColCount; ++i) {
-          joins.add(null);
-        }
-      }
-      if (joins.get(partColIndex) == null) {
-        joins.set(partColIndex, "inner join " + PARTITION_KEY_VALS + " \"FILTER" + partColIndex
-            + "\" on \"FILTER"  + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\""
-            + " and \"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex);
-      }
-
-      // Build the filter and add parameters linearly; we are traversing leaf nodes LTR.
-      String tableValue = "\"FILTER" + partColIndex + "\".\"PART_KEY_VAL\"";
-
-      if (node.isReverseOrder) {
-        params.add(nodeValue);
-      }
-      String tableColumn = tableValue;
-      if ((colType != FilterType.String) && (!isDefaultPartition)) {
-        // The underlying database field is varchar, we need to compare numbers.
-        if (colType == FilterType.Integral) {
-          tableValue = "cast(" + tableValue + " as decimal(21,0))";
-        } else if (colType == FilterType.Date) {
-          if (dbType == DatabaseProduct.ORACLE) {
-            // Oracle requires special treatment... as usual.
-            tableValue = "TO_DATE(" + tableValue + ", 'YYYY-MM-DD')";
-          } else {
-            tableValue = "cast(" + tableValue + " as date)";
-          }
-        }
-
-        // Workaround for HIVE_DEFAULT_PARTITION - ignore it like JDO does, for now.
-        String tableValue0 = tableValue;
-        tableValue = "(case when " + tableColumn + " <> ?";
-        params.add(defaultPartName);
-
-        if (dbHasJoinCastBug) {
-          // This is a workaround for DERBY-6358 and Oracle bug; it is pretty horrible.
-          tableValue += (" and " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and "
-              + DBS + ".\"CTLG_NAME\" = ? and "
-              + "\"FILTER" + partColIndex + "\".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\" and "
-                + "\"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex);
-          params.add(table.getTableName().toLowerCase());
-          params.add(table.getDbName().toLowerCase());
-          params.add(table.getCatName().toLowerCase());
-        }
-        tableValue += " then " + tableValue0 + " else null end)";
-      }
-      if (!node.isReverseOrder) {
-        params.add(nodeValue);
-      }
-
-      filterBuffer.append(node.isReverseOrder
-          ? "(? " + node.operator.getSqlOp() + " " + tableValue + ")"
-          : "(" + tableValue + " " + node.operator.getSqlOp() + " ?)");
-    }
-  }
-
-  /**
-   * Retrieve the column statistics for the specified columns of the table. NULL
-   * is returned if the columns are not provided.
-   * @param catName     the catalog name of the table
-   * @param dbName      the database name of the table
-   * @param tableName   the table name
-   * @param colNames    the list of the column names
-   * @return            the column statistics for the specified columns
-   * @throws MetaException
-   */
-  public ColumnStatistics getTableStats(final String catName, final String dbName,
-                                        final String tableName, List<String> colNames,
-                                        boolean enableBitVector) throws MetaException {
-    if (colNames == null || colNames.isEmpty()) {
-      return null;
-    }
-    final boolean doTrace = LOG.isDebugEnabled();
-    final String queryText0 = "select " + getStatsList(enableBitVector) + " from " + TAB_COL_STATS
-          + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? and \"COLUMN_NAME\" in (";
-    Batchable<String, Object[]> b = new Batchable<String, Object[]>() {
-      @Override
-      public List<Object[]> run(List<String> input) throws MetaException {
-        String queryText = queryText0 + makeParams(input.size()) + ")";
-        Object[] params = new Object[input.size() + 3];
-        params[0] = catName;
-        params[1] = dbName;
-        params[2] = tableName;
-        for (int i = 0; i < input.size(); ++i) {
-          params[i + 3] = input.get(i);
-        }
-        long start = doTrace ? System.nanoTime() : 0;
-        Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-        Object qResult = executeWithArray(query, params, queryText);
-        timingTrace(doTrace, queryText0 + "...)", start, (doTrace ? System.nanoTime() : 0));
-        if (qResult == null) {
-          query.closeAll();
-          return null;
-        }
-        addQueryAfterUse(query);
-        return ensureList(qResult);
-      }
-    };
-    List<Object[]> list = Batchable.runBatched(batchSize, colNames, b);
-    if (list.isEmpty()) {
-      return null;
-    }
-    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
-    csd.setCatName(catName);
-    ColumnStatistics result = makeColumnStats(list, csd, 0);
-    b.closeAllQueries();
-    return result;
-  }
-
-  public AggrStats aggrColStatsForPartitions(String catName, String dbName, String tableName,
-      List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation,
-      double ndvTuner, boolean enableBitVector) throws MetaException {
-    if (colNames.isEmpty() || partNames.isEmpty()) {
-      LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval");
-      return new AggrStats(Collections.<ColumnStatisticsObj>emptyList(), 0); // Nothing to aggregate
-    }
-    long partsFound = 0;
-    List<ColumnStatisticsObj> colStatsList;
-    // Try to read from the cache first
-    if (isAggregateStatsCacheEnabled
-        && (partNames.size() < aggrStatsCache.getMaxPartsPerCacheNode())) {
-      AggrColStats colStatsAggrCached;
-      List<ColumnStatisticsObj> colStatsAggrFromDB;
-      int maxPartsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode();
-      double fpp = aggrStatsCache.getFalsePositiveProbability();
-      colStatsList = new ArrayList<ColumnStatisticsObj>();
-      // Bloom filter for the new node that we will eventually add to the cache
-      BloomFilter bloomFilter = createPartsBloomFilter(maxPartsPerCacheNode, fpp, partNames);
-      boolean computePartsFound = true;
-      for (String colName : colNames) {
-        // Check the cache first
-        colStatsAggrCached = aggrStatsCache.get(catName, dbName, tableName, colName, partNames);
-        if (colStatsAggrCached != null) {
-          colStatsList.add(colStatsAggrCached.getColStats());
-          partsFound = colStatsAggrCached.getNumPartsCached();
-        } else {
-          if (computePartsFound) {
-            partsFound = partsFoundForPartitions(catName, dbName, tableName, partNames, colNames);
-            computePartsFound = false;
-          }
-          List<String> colNamesForDB = new ArrayList<>();
-          colNamesForDB.add(colName);
-          // Read aggregated stats for one column
-          colStatsAggrFromDB =
-              columnStatisticsObjForPartitions(catName, dbName, tableName, partNames, colNamesForDB,
-                  partsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector);
-          if (!colStatsAggrFromDB.isEmpty()) {
-            ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0);
-            colStatsList.add(colStatsAggr);
-            // Update the cache to add this new aggregate node
-            aggrStatsCache.add(catName, dbName, tableName, colName, partsFound, colStatsAggr, bloomFilter);
-          }
-        }
-      }
-    } else {
-      partsFound = partsFoundForPartitions(catName, dbName, tableName, partNames, colNames);
-      colStatsList =
-          columnStatisticsObjForPartitions(catName, dbName, tableName, partNames, colNames, partsFound,
-              useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector);
-    }
-    LOG.info("useDensityFunctionForNDVEstimation = " + useDensityFunctionForNDVEstimation
-        + "\npartsFound = " + partsFound + "\nColumnStatisticsObj = "
-        + Arrays.toString(colStatsList.toArray()));
-    return new AggrStats(colStatsList, partsFound);
-  }
-
-  private BloomFilter createPartsBloomFilter(int maxPartsPerCacheNode, double fpp,
-      List<String> partNames) {
-    BloomFilter bloomFilter = new BloomFilter(maxPartsPerCacheNode, fpp);
-    for (String partName : partNames) {
-      bloomFilter.add(partName.getBytes());
-    }
-    return bloomFilter;
-  }
-
-  private long partsFoundForPartitions(
-      final String catName, final String dbName, final String tableName,
-      final List<String> partNames, List<String> colNames) throws MetaException {
-    assert !colNames.isEmpty() && !partNames.isEmpty();
-    final boolean doTrace = LOG.isDebugEnabled();
-    final String queryText0  = "select count(\"COLUMN_NAME\") from " + PART_COL_STATS + ""
-        + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
-        + " and \"COLUMN_NAME\" in (%1$s) and \"PARTITION_NAME\" in (%2$s)"
-        + " group by \"PARTITION_NAME\"";
-    List<Long> allCounts = Batchable.runBatched(batchSize, colNames, new Batchable<String, Long>() {
-      @Override
-      public List<Long> run(final List<String> inputColName) throws MetaException {
-        return Batchable.runBatched(batchSize, partNames, new Batchable<String, Long>() {
-          @Override
-          public List<Long> run(List<String> inputPartNames) throws MetaException {
-            long partsFound = 0;
-            String queryText = String.format(queryText0,
-                makeParams(inputColName.size()), makeParams(inputPartNames.size()));
-            long start = doTrace ? System.nanoTime() : 0;
-            Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
-            try {
-              Object qResult = executeWithArray(query, prepareParams(
-                  catName, dbName, tableName, inputPartNames, inputColName), queryText);
-              long end = doTrace ? System.nanoTime() : 0;
-              timingTrace(doTrace, queryText, start, end);
-              ForwardQueryResult<?> fqr = (ForwardQueryResult<?>) qResult;
-              Iterator<?> iter = fqr.iterator();
-              while (iter.hasNext()) {
-                if (extractSqlLong(iter.next()) == inputColName.size()) {
-                  partsFound++;
-                }
-              }
-              return Lists.<Long>newArrayList(partsFound);
-            } finally {
-              query.closeAll();
-            }
-          }
-        });
-      }
-    });
-    long partsFound = 0;
-    for (Long val : allCounts) {
-      partsFound += val;
-    }
-    return partsFound;
-  }
-
-  private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(
-      final String catName, final String dbName,
-    final String tableName, final List<String> partNames, List<String> colNames, long partsFound,
-    final boolean useDensityFunctionForNDVEstimation, final double ndvTuner, final boolean enableBitVector) throws MetaException {
-    final boolean areAllPartsFound = (partsFound == partNames.size());
-    return Batchable.runBatched(batchSize, colNames, new Batchable<String, ColumnStatisticsObj>() {
-      @Override
-      public List<ColumnStatisticsObj> run(final List<String> inputColNames) throws MetaException {
-        return Batchable.runBatched(batchSize, partNames, new Batchable<String, ColumnStatisticsObj>() {
-          @Override
-          public List<ColumnStatisticsObj> run(List<String> inputPartNames) throws MetaException {
-            return columnStatisticsObjForPartitionsBatch(catName, dbName, tableName, inputPartNames,
-                inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector);
-          }
-        });
-      }
-    });
-  }
-
-  public List<ColStatsObjWithSourceInfo> getColStatsForAllTablePartitions(String catName, String dbName,
-      boolean enableBitVector) throws MetaException {
-    String queryText = "select \"TABLE_NAME\", \"PARTITION_NAME\", " + getStatsList(enableBitVector)
-        + " from " + " " + PART_COL_STATS + " where \"DB_NAME\" = ? and \"CAT_NAME\" = ?";
-    long start = 0;
-    long end = 0;
-    Query query = null;
-    boolean doTrace = LOG.isDebugEnabled();
-    Object qResult = null;
-    start = doTrace ? System.nanoTime() : 0;
-    List<ColStatsObjWithSourceInfo> colStatsForDB = new ArrayList<ColStatsObjWithSourceInfo>();
-    try {
-      query = pm.newQuery("javax.jdo.query.SQL", queryText);
-      qResult = executeWithArray(query, new Object[] { dbName, catName }, queryText);
-      if (qResult == null) {
-        query.closeAll();
-        return colStatsForDB;
-      }
-      end = doTrace ? System.nanoTime() : 0;
-      timingTrace(doTrace, queryText, start, end);
-      List<Object[]> list = ensureList(qResult);
-      for (Object[] row : list) {
-        String tblName = (String) row[0];
-        String partName = (String) row[1];
-        ColumnStatisticsObj colStatObj = prepareCSObj(row, 2);
-        colStatsForDB.add(new ColStatsObjWithSourceInfo(colStatObj, catName, dbName, tblName, partName));
-        Deadline.checkTimeout();
-      }
-    } finally {
-      query.closeAll();
-    }
-    return colStatsForDB;
-  }
-
-  /** Should be called with the list short enough to not trip up Oracle/etc. */
-  private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(String catName, String dbName,
-      String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound,
-      boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector)
-      throws MetaException {
-    if (enableBitVector) {
-      return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, areAllPartsFound,
-          useDensityFunctionForNDVEstimation, ndvTuner);
-    } else {
-      return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, areAllPartsFound,
-          useDensityFunctionForNDVEstimation, ndvTuner);
-    }
-  }
-
-  private List<ColumnStatisticsObj> aggrStatsUseJava(String catName, String dbName, String tableName,
-      List<String> partNames, List<String> colNames, boolean areAllPartsFound,
-      boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
-    // 1. get all the stats for colNames in partNames;
-    List<ColumnStatistics> partStats =
-        getPartitionStats(catName, dbName, tableName, partNames, colNames, true);
-    // 2. use util function to aggr stats
-    return MetaStoreServerUtils.aggrPartitionStats(partStats, catName, dbName, tableName, partNames, colNames,
-        areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
-  }
-
-  private List<ColumnStatisticsObj> aggrStatsUseDB(String catName, String dbName,
-      String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound,
-      boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
-    // TODO: all the extrapolation logic should be moved out of this class,
-    // only mechanical data retrieval should remain here.
-    String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
-        + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
-        + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
-        + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
-        + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
-        // The following data is used to compute a partitioned table's NDV based
-        // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
-        // accurately derived from partition NDVs, because the domain of column value two partitions
-        // can overlap. If there is no overlap then global NDV is just the sum
-        // of partition NDVs (UpperBound). But if there is some overlay then
-        // global NDV can be anywhere between sum of partition NDVs (no overlap)
-        // and same as one of the partition NDV (domain of column value in all other
-        // partitions is subset of the domain value in one of the partition)
-        // (LowerBound).But under uniform distribution, we can roughly estimate the global
-        // NDV by leveraging the min/max values.
-        // And, we also guarantee that the estimation makes sense by comparing it to the
-        // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
-        // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
-        + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
-        + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-        + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
-        + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + ""
-        + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? ";
-    String queryText = null;
-    long start = 0;
-    long end = 0;
-    Query query = null;
-    boolean doTrace = LOG.isDebugEnabled();
-    Object qResult = null;
-    ForwardQueryResult<?> fqr = null;
-    // Check if the status of all the columns of all the partitions exists
-    // Extrapolation is not needed.
-    if (areAllPartsFound) {
-      queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
-          + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
-          + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-      start = doTrace ? System.nanoTime() : 0;
-      query = pm.newQuery("javax.jdo.query.SQL", queryText);
-      qResult = executeWithArray(query, prepareParams(catName, dbName, tableName, partNames, colNames),
-          queryText);
-      if (qResult == null) {
-        query.closeAll();
-        return Collections.emptyList();
-      }
-      end = doTrace ? System.nanoTime() : 0;
-      timingTrace(doTrace, queryText, start, end);
-      List<Object[]> list = ensureList(qResult);
-      List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(list.size());
-      for (Object[] row : list) {
-        colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
-        Deadline.checkTimeout();
-      }
-      query.closeAll();
-      return colStats;
-    } else {
-      // Extrapolation is needed for some columns.
-      // In this case, at least a column status for a partition is missing.
-      // We need to extrapolate this partition based on the other partitions
-      List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
-      queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PARTITION_NAME\") "
-          + " from " + PART_COL_STATS
-          + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
-          + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")"
-          + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
-          + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-      start = doTrace ? System.nanoTime() : 0;
-      query = pm.newQuery("javax.jdo.query.SQL", queryText);
-      qResult = executeWithArray(query, prepareParams(catName, dbName, tableName, partNames, colNames),
-          queryText);
-      end = doTrace ? System.nanoTime() : 0;
-      timingTrace(doTrace, queryText, start, end);
-      if (qResult == null) {
-        query.closeAll();
-        return Collections.emptyList();
-      }
-      List<String> noExtraColumnNames = new ArrayList<String>();
-      Map<String, String[]> extraColumnNameTypeParts = new HashMap<String, String[]>();
-      List<Object[]> list = ensureList(qResult);
-      for (Object[] row : list) {
-        String colName = (String) row[0];
-        String colType = (String) row[1];
-        // Extrapolation is not needed for this column if
-        // count(\"PARTITION_NAME\")==partNames.size()
-        // Or, extrapolation is not possible for this column if
-        // count(\"PARTITION_NAME\")<2
-        Long count = extractSqlLong(row[2]);
-        if (count == partNames.size() || count < 2) {
-          noExtraColumnNames.add(colName);
-        } else {
-          extraColumnNameTypeParts.put(colName, new String[] { colType, String.valueOf(count) });
-        }
-        Deadline.checkTimeout();
-      }
-      query.closeAll();
-      // Extrapolation is not needed for columns noExtraColumnNames
-      if (noExtraColumnNames.size() != 0) {
-        queryText = commonPrefix + " and \"COLUMN_NAME\" in ("
-            + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in ("
-            + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\"";
-        start = doTrace ? System.nanoTime() : 0;
-        query = pm.newQuery("javax.jdo.query.SQL", queryText);
-        qResult = executeWithArray(query,
-            prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames), queryText);
-        if (qResult == null) {
-          query.closeAll();
-          return Collections.emptyList();
-        }
-        list = ensureList(qResult);
-        for (Object[] row : list) {
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
-          Deadline.checkTimeout();
-        }
-        end = doTrace ? System.nanoTime() : 0;
-        timingTrace(doTrace, queryText, start, end);
-        query.closeAll();
-      }
-      // Extrapolation is needed for extraColumnNames.
-      // give a sequence number for all the partitions
-      if (extraColumnNameTypeParts.size() != 0) {
-        Map<String, Integer> indexMap = new HashMap<String, Integer>();
-        for (int index = 0; index < partNames.size(); index++) {
-          indexMap.put(partNames.get(index), index);
-        }
-        // get sum for all columns to reduce the number of queries
-        Map<String, Map<Integer, Object>> sumMap = new HashMap<String, Map<Integer, Object>>();
-        queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")"
-            + " from " + PART_COL_STATS + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ? "
-            + " and \"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size())
-            + ") and \"PARTITION_NAME\" in (" + makeParams(partNames.size())
-            + ") group by \"COLUMN_NAME\"";
-        start = doTrace ? System.nanoTime() : 0;
-        query = pm.newQuery("javax.jdo.query.SQL", queryText);
-        List<String> extraColumnNames = new ArrayList<String>();
-        extraColumnNames.addAll(extraColumnNameTypeParts.keySet());
-        qResult = executeWithArray(query,
-            prepareParams(catName, dbName, tableName, partNames, extraColumnNames), queryText);
-        if (qResult == null) {
-          query.closeAll();
-          return Collections.emptyList();
-        }
-        list = ensureList(qResult);
-        // see the indexes for colstats in IExtrapolatePartStatus
-        Integer[] sumIndex = new Integer[] { 6, 10, 11, 15 };
-        for (Object[] row : list) {
-          Map<Integer, Object> indexToObject = new HashMap<Integer, Object>();
-          for (int ind = 1; ind < row.length; ind++) {
-            indexToObject.put(sumIndex[ind - 1], row[ind]);
-          }
-          // row[0] is the column name
-          sumMap.put((String) row[0], indexToObject);
-          Deadline.checkTimeout();
-        }
-        end = doTrace ? System.nanoTime() : 0;
-        timingTrace(doTrace, queryText, start, end);
-        query.closeAll();
-        for (Map.Entry<String, String[]> entry : extraColumnNameTypeParts.entrySet()) {
-          Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2];
-          String colName = entry.getKey();
-          String colType = entry.getValue()[0];
-          Long sumVal = Long.parseLong(entry.getValue()[1]);
-          // fill in colname
-          row[0] = colName;
-          // fill in coltype
-          row[1] = colType;
-          // use linear extrapolation. more complicated one can be added in the
-          // future.
-          IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus();
-          // fill in colstatus
-          Integer[] index = null;
-          boolean decimal = false;
-          if (colType.toLowerCase().startsWith("decimal")) {
-            index = IExtrapolatePartStatus.indexMaps.get("decimal");
-            decimal = true;
-          } else {
-            index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase());
-          }
-          // if the colType is not the known type, long, double, etc, then get
-          // all index.
-          if (index == null) {
-            index = IExtrapolatePartStatus.indexMaps.get("default");
-          }
-          for (int colStatIndex : index) {
-            String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex];
-            // if the aggregation type is sum, we do a scale-up
-            if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) {
-              Object o = sumMap.get(colName).get(colStatIndex);
-              if (o == null) {
-                row[2 + colStatIndex] = null;
-              } else {
-                Long val = extractSqlLong(o);
-                row[2 + colStatIndex] = val / sumVal * (partNames.size());
-              }
-            } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min
-                || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) {
-              // if the aggregation type is min/max, we extrapolate from the
-              // left/right borders
-              if (!decimal) {
-                queryText = "select \"" + colStatName
-                    + "\",\"PARTITION_NAME\" from " + PART_COL_STATS
-                    + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?"
-                    + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
-                    + " order by \"" + colStatName + "\"";
-              } else {
-                queryText = "select \"" + colStatName
-                    + "\",\"PARTITION_NAME\" from " + PART_COL_STATS
-                    + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?" + " and \"COLUMN_NAME\" = ?"
-                    + " and \"PARTITION_NAME\" in (" + makeParams(partNames.size()) + ")"
-                    + " order by cast(\"" + colStatName + "\" as decimal)";
-              }
-              start = doTrace ? System.nanoTime() : 0;
-              query = pm.newQuery("javax.jdo.query.SQL", queryText);
-              qResult = executeWithArray(query,
-                  prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName)), queryText);
-              if (qResult == null) {
-                query.closeAll();
-                return Collections.emptyList();
-              }
-              fqr = (ForwardQueryResult<?>) qResult;
-              Object[] min = (Object[]) (fqr.get(0));
-              Object[] max = (Object[]) (fqr.get(fqr.size() - 1));
-              end = doTrace ? System.nanoTime() : 0;
-              timingTrace(doTrace, queryText, start, end);
-              query.closeAll();
-              if (min[0] == null || max[0] == null) {
-                row[2 + colStatIndex] = null;
-              } else {
-                row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, max, colStatIndex,
-                    indexMap);
-              }
-            } else {
-              // if the aggregation type is avg, we use the average on the existing ones.
-              queryText = "select "
-                  + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
-                  + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-                  + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")"
-                  + " from " + PART_COL_STATS + "" + " where \"CAT_NAME\" = ? and \"DB_NAME\" = ? and \"TABLE_NAME\" = ?"
-                  + " and \"COLUMN_NAME\" = ?" + " and \"PARTITION_NAME\" in ("
-                  + makeParams(partNames.size()) + ")" + " group by \"COLUMN_NAME\"";
-              start = doTrace ? System.nanoTime() : 0;
-              query = pm.newQuery("javax.jdo.query.SQL", queryText);
-              qResult = executeWithArray(query,
-                  prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName)), queryText);
-              if (qResult == null) {
-                query.closeAll();
-                return Collections.emptyList();
-              }
-              fqr = (ForwardQueryResult<?>) qResult;
-              Object[] avg = (Object[]) (fqr.get(0));
-              // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE",
-              // "AVG_DECIMAL"
-              row[2 + colStatIndex] = avg[colStatIndex - 12];
-              end = doTrace ? System.nanoTime() : 0;
-              timingTrace(doTrace, queryText, start, end);
-              query.closeAll();
-            }
-          }
-          colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
-          Deadline.checkTimeout();
-        }
-      }
-      return colStats;
-    }
-  }
-
-  private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaException {
-    ColumnStatisticsData data = new ColumnStatisticsData();
-    ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], (String)row[i++], data);
-    Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++],
-        declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = row[i++], bitVector = row[i++],
-        avglen = row[i++], maxlen = row[i++], trues = row[i++], falses = row[i++];
-    StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data,
-        llow, lhigh, dlow, dhigh, declow, dechigh, nulls, dist, bitVector, avglen, maxlen, trues, falses);
-    return cso;
-  }
-


<TRUNCATED>