You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2017/05/01 20:53:17 UTC
[2/2] hive git commit: HIVE-16520: Cache hive metadata in metastore
(Daniel Dai, Vaibhav Gumashta, reviewed by Thejas Nair)
HIVE-16520: Cache hive metadata in metastore (Daniel Dai, Vaibhav Gumashta, reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f79bd63
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f79bd63
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f79bd63
Branch: refs/heads/master
Commit: 2f79bd63a71c32c029992b0336726e248e4e905d
Parents: 2190960
Author: Daniel Dai <da...@hortonworks.com>
Authored: Mon May 1 13:52:58 2017 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Mon May 1 13:52:58 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +
.../listener/DummyRawStoreFailEvent.java | 7 +
.../hive/metastore/MetaStoreDirectSql.java | 60 +
.../hadoop/hive/metastore/ObjectStore.java | 33 +-
.../apache/hadoop/hive/metastore/RawStore.java | 12 +
.../hive/metastore/cache/ByteArrayWrapper.java | 45 +
.../hadoop/hive/metastore/cache/CacheUtils.java | 113 ++
.../hive/metastore/cache/CachedStore.java | 1579 ++++++++++++++++++
.../hive/metastore/cache/SharedCache.java | 356 ++++
.../hadoop/hive/metastore/hbase/HBaseStore.java | 7 +
.../hadoop/hive/metastore/hbase/HBaseUtils.java | 2 +-
.../DummyRawStoreControlledCommit.java | 8 +
.../DummyRawStoreForJdoConnection.java | 8 +
.../hive/metastore/VerifyingObjectStore.java | 2 +-
.../hive/metastore/cache/TestCachedStore.java | 238 +++
15 files changed, 2473 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2f79bd63/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 92cc9bd..3400560 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -899,6 +899,12 @@ public class HiveConf extends Configuration {
METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore",
"Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" +
"This class is used to store and retrieval of raw metadata objects such as table, database"),
+ METASTORE_CACHED_RAW_STORE_IMPL("hive.metastore.cached.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore",
+ "Name of the wrapped RawStore class"),
+ METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY(
+ "hive.metastore.cached.rawstore.cache.update.frequency", "60", new TimeValidator(
+ TimeUnit.SECONDS),
+ "The time after which metastore cache is updated from metastore DB."),
METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl",
"org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler",
"Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " +
http://git-wip-us.apache.org/repos/asf/hive/blob/2f79bd63/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 5282a5a..88b9faf 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -911,4 +912,10 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
public void addForeignKeys(List<SQLForeignKey> fks)
throws InvalidObjectException, MetaException {
}
+
+ @Override
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException {
+ return objectStore.getAggrColStatsForTablePartitions(dbName, tableName);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/2f79bd63/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 22c1a33..b96c27e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.cache.CacheUtils;
+import org.apache.hadoop.hive.metastore.cache.CachedStore;
import org.apache.hadoop.hive.metastore.model.MConstraint;
import org.apache.hadoop.hive.metastore.model.MDatabase;
import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
@@ -79,6 +81,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* This class contains the optimizations for MetaStore that rely on direct SQL access to
@@ -1338,6 +1341,63 @@ class MetaStoreDirectSql {
});
}
+ // Get aggregated column stats for a table per partition for all columns in the partition
+ // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm)
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
+ String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+ + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+ + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+ + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+ + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
+ // The following data is used to compute a partitioned table's NDV based
+ // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
+ // accurately derived from partition NDVs, because the domain of column value two partitions
+ // can overlap. If there is no overlap then global NDV is just the sum
+ // of partition NDVs (UpperBound). But if there is some overlay then
+ // global NDV can be anywhere between sum of partition NDVs (no overlap)
+ // and same as one of the partition NDV (domain of column value in all other
+ // partitions is subset of the domain value in one of the partition)
+ // (LowerBound).But under uniform distribution, we can roughly estimate the global
+ // NDV by leveraging the min/max values.
+ // And, we also guarantee that the estimation makes sense by comparing it to the
+ // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
+ // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+ + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
+ + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+ + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+ + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\""
+ + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+ long start = 0;
+ long end = 0;
+ Query query = null;
+ boolean doTrace = LOG.isDebugEnabled();
+ Object qResult = null;
+ ForwardQueryResult fqr = null;
+ start = doTrace ? System.nanoTime() : 0;
+ query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ qResult = executeWithArray(query,
+ prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), queryText);
+ if (qResult == null) {
+ query.closeAll();
+ return Maps.newHashMap();
+ }
+ end = doTrace ? System.nanoTime() : 0;
+ timingTrace(doTrace, queryText, start, end);
+ List<Object[]> list = ensureList(qResult);
+ Map<String, ColumnStatisticsObj> partColStatsMap = new HashMap<String, ColumnStatisticsObj>();
+ for (Object[] row : list) {
+ String partName = (String) row[0];
+ String colName = (String) row[1];
+ partColStatsMap.put(
+ CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName),
+ prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner));
+ Deadline.checkTimeout();
+ }
+ query.closeAll();
+ return partColStatsMap;
+ }
+
/** Should be called with the list short enough to not trip up Oracle/etc. */
private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(String dbName,
String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound,
http://git-wip-us.apache.org/repos/asf/hive/blob/2f79bd63/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 495e7eb..58b7730 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -7370,6 +7370,38 @@ public class ObjectStore implements RawStore, Configurable {
}
@Override
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException {
+ final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(),
+ HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
+ final double ndvTuner = HiveConf.getFloatVar(getConf(),
+ HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
+ return new GetHelper<Map<String, ColumnStatisticsObj>>(dbName, tableName, true, false) {
+ @Override
+ protected Map<String, ColumnStatisticsObj> getSqlResult(
+ GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException {
+ return directSql.getAggrColStatsForTablePartitions(dbName, tblName,
+ useDensityFunctionForNDVEstimation, ndvTuner);
+ }
+
+ @Override
+ protected Map<String, ColumnStatisticsObj> getJdoResult(
+ GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException,
+ NoSuchObjectException {
+ // This is fast path for query optimizations, if we can find this info
+ // quickly using directSql, do it. No point in failing back to slow path
+ // here.
+ throw new MetaException("Jdo path is not implemented for stats aggr.");
+ }
+
+ @Override
+ protected String describeResult() {
+ return null;
+ }
+ }.run(true);
+ }
+
+ @Override
public void flushCache() {
// NOP as there's no caching
}
@@ -8787,5 +8819,4 @@ public class ObjectStore implements RawStore, Configurable {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f79bd63/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index 6f4f031..c22a1db 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -578,6 +579,17 @@ public interface RawStore extends Configurable {
List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException;
/**
+ * Get all partition column statistics for a table
+ * @param dbName
+ * @param tableName
+ * @return Map of partition column statistics
+ * @throws MetaException
+ * @throws NoSuchObjectException
+ */
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException;
+
+ /**
* Get the next notification event.
* @param rqst Request containing information on the last processed notification.
* @return list of notifications, sorted by eventId
http://git-wip-us.apache.org/repos/asf/hive/blob/2f79bd63/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java
new file mode 100644
index 0000000..45ed1e7
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/ByteArrayWrapper.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.cache;
+
+import java.util.Arrays;
+
+/**
+ * byte array with comparator
+ */
+public class ByteArrayWrapper {
+ byte[] wrapped;
+
+ ByteArrayWrapper(byte[] b) {
+ wrapped = b;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof ByteArrayWrapper) {
+ return Arrays.equals(((ByteArrayWrapper)other).wrapped, wrapped);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(wrapped);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/2f79bd63/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
new file mode 100644
index 0000000..b438479
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.cache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
+import org.apache.hive.common.util.HiveStringUtils;
+
+public class CacheUtils {
+ private static final String delimit = "\u0001";
+
+ public static String buildKey(String dbName, String tableName) {
+ return dbName + delimit + tableName;
+ }
+
+ public static String buildKey(String dbName, String tableName, List<String> partVals) {
+ String key = buildKey(dbName, tableName);
+ if (partVals == null || partVals.size() == 0) {
+ return key;
+ }
+ for (int i = 0; i < partVals.size(); i++) {
+ key += partVals.get(i);
+ if (i != partVals.size() - 1) {
+ key += delimit;
+ }
+ }
+ return key;
+ }
+
+ public static String buildKey(String dbName, String tableName, List<String> partVals, String colName) {
+ String key = buildKey(dbName, tableName, partVals);
+ return key + delimit + colName;
+ }
+
+ public static Table assemble(TableWrapper wrapper) {
+ Table t = wrapper.getTable().deepCopy();
+ if (wrapper.getSdHash()!=null) {
+ StorageDescriptor sdCopy = SharedCache.getSdFromCache(wrapper.getSdHash()).deepCopy();
+ if (sdCopy.getBucketCols()==null) {
+ sdCopy.setBucketCols(new ArrayList<String>());
+ }
+ if (sdCopy.getSortCols()==null) {
+ sdCopy.setSortCols(new ArrayList<Order>());
+ }
+ if (sdCopy.getSkewedInfo()==null) {
+ sdCopy.setSkewedInfo(new SkewedInfo(new ArrayList<String>(),
+ new ArrayList<List<String>>(), new HashMap<List<String>,String>()));
+ }
+ sdCopy.setLocation(wrapper.getLocation());
+ sdCopy.setParameters(wrapper.getParameters());
+ t.setSd(sdCopy);
+ }
+ return t;
+ }
+
+ public static Partition assemble(PartitionWrapper wrapper) {
+ Partition p = wrapper.getPartition().deepCopy();
+ if (wrapper.getSdHash()!=null) {
+ StorageDescriptor sdCopy = SharedCache.getSdFromCache(wrapper.getSdHash()).deepCopy();
+ if (sdCopy.getBucketCols()==null) {
+ sdCopy.setBucketCols(new ArrayList<String>());
+ }
+ if (sdCopy.getSortCols()==null) {
+ sdCopy.setSortCols(new ArrayList<Order>());
+ }
+ if (sdCopy.getSkewedInfo()==null) {
+ sdCopy.setSkewedInfo(new SkewedInfo(new ArrayList<String>(),
+ new ArrayList<List<String>>(), new HashMap<List<String>,String>()));
+ }
+ sdCopy.setLocation(wrapper.getLocation());
+ sdCopy.setParameters(wrapper.getParameters());
+ p.setSd(sdCopy);
+ }
+ return p;
+ }
+
+ public static boolean matches(String name, String pattern) {
+ String[] subpatterns = pattern.trim().split("\\|");
+ for (String subpattern : subpatterns) {
+ subpattern = "(?i)" + subpattern.replaceAll("\\?", ".{1}").replaceAll("\\*", ".*")
+ .replaceAll("\\^", "\\\\^").replaceAll("\\$", "\\\\$");;
+ if (Pattern.matches(subpattern, HiveStringUtils.normalizeIdentifier(name))) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f79bd63/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
new file mode 100644
index 0000000..39b1676
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -0,0 +1,1579 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.cache;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.FileMetadataHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.PartFilterExprUtil;
+import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
+import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
+import org.apache.hadoop.hive.metastore.api.Role;
+import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.Type;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+// TODO filter->expr
+// TODO functionCache
+// TODO constraintCache
+// TODO need sd nested copy?
+// TODO String intern
+// TODO restructure HBaseStore
+// TODO monitor event queue
+// TODO initial load slow?
+// TODO size estimation
+// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation
+// TODO factor in NDV estimation (density based estimation) logic when merging NDVs from 2 colStats object
+// TODO refactor to use same common code with StatObjectConverter (for merging 2 col stats objects)
+
+public class CachedStore implements RawStore, Configurable {
+ private static ScheduledExecutorService cacheUpdateMaster = null;
+ private static AtomicReference<Thread> runningMasterThread = new AtomicReference<Thread>(null);
+ RawStore rawStore;
+ Configuration conf;
+ private PartitionExpressionProxy expressionProxy = null;
+ static boolean firstTime = true;
+
+ static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
+
+ static class TableWrapper {
+ Table t;
+ String location;
+ Map<String, String> parameters;
+ byte[] sdHash;
+ TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) {
+ this.t = t;
+ this.sdHash = sdHash;
+ this.location = location;
+ this.parameters = parameters;
+ }
+ public Table getTable() {
+ return t;
+ }
+ public byte[] getSdHash() {
+ return sdHash;
+ }
+ public String getLocation() {
+ return location;
+ }
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+ }
+
+ static class PartitionWrapper {
+ Partition p;
+ String location;
+ Map<String, String> parameters;
+ byte[] sdHash;
+ PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
+ this.p = p;
+ this.sdHash = sdHash;
+ this.location = location;
+ this.parameters = parameters;
+ }
+ public Partition getPartition() {
+ return p;
+ }
+ public byte[] getSdHash() {
+ return sdHash;
+ }
+ public String getLocation() {
+ return location;
+ }
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+ }
+
+ static class StorageDescriptorWrapper {
+ StorageDescriptor sd;
+ int refCount = 0;
+ StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
+ this.sd = sd;
+ this.refCount = refCount;
+ }
+ public StorageDescriptor getSd() {
+ return sd;
+ }
+ public int getRefCount() {
+ return refCount;
+ }
+ }
+
+ public CachedStore() {
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ String rawStoreClassName = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL,
+ ObjectStore.class.getName());
+ try {
+ rawStore = ((Class<? extends RawStore>) MetaStoreUtils.getClass(
+ rawStoreClassName)).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
+ }
+ rawStore.setConf(conf);
+ Configuration oldConf = this.conf;
+ this.conf = conf;
+ if (expressionProxy != null && conf != oldConf) {
+ LOG.warn("Unexpected setConf when we were already configured");
+ }
+ if (expressionProxy == null || conf != oldConf) {
+ expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
+ }
+ if (firstTime) {
+ try {
+ LOG.info("Prewarming CachedStore");
+ prewarm();
+ LOG.info("CachedStore initialized");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ firstTime = false;
+ }
+ }
+
+ private void prewarm() throws Exception {
+ List<String> dbNames = rawStore.getAllDatabases();
+ for (String dbName : dbNames) {
+ Database db = rawStore.getDatabase(dbName);
+ SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+ List<String> tblNames = rawStore.getAllTables(dbName);
+ for (String tblName : tblNames) {
+ Table table = rawStore.getTable(dbName, tblName);
+ SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), table);
+ List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+ for (Partition partition : partitions) {
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partition);
+ }
+ Map<String, ColumnStatisticsObj> aggrStatsPerPartition = rawStore
+ .getAggrColStatsForTablePartitions(dbName, tblName);
+ SharedCache.addPartitionColStatsToCache(aggrStatsPerPartition);
+ }
+ }
+ // Start the cache update master-worker threads
+ startCacheUpdateService();
+ }
+
+ private synchronized void startCacheUpdateService() {
+ if (cacheUpdateMaster == null) {
+ cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), 0, HiveConf
+ .getTimeVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY,
+ TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ static class CacheUpdateMasterWork implements Runnable {
+
+ private CachedStore cachedStore;
+
+ public CacheUpdateMasterWork(CachedStore cachedStore) {
+ this.cachedStore = cachedStore;
+ }
+
+ @Override
+ public void run() {
+ runningMasterThread.set(Thread.currentThread());
+ RawStore rawStore = cachedStore.getRawStore();
+ try {
+ List<String> dbNames = rawStore.getAllDatabases();
+ // Update the database in cache
+ if (!updateDatabases(rawStore, dbNames)) {
+ return;
+ }
+ // Update the tables and their partitions in cache
+ if (!updateTables(rawStore, dbNames)) {
+ return;
+ }
+ } catch (MetaException e) {
+ LOG.error("Updating CachedStore: error getting database names", e);
+ }
+ }
+
+ private boolean updateDatabases(RawStore rawStore, List<String> dbNames) {
+ if (dbNames != null) {
+ List<Database> databases = new ArrayList<Database>();
+ for (String dbName : dbNames) {
+ // If a preemption of this thread was requested, simply return before proceeding
+ if (Thread.interrupted()) {
+ return false;
+ }
+ Database db;
+ try {
+ db = rawStore.getDatabase(dbName);
+ databases.add(db);
+ } catch (NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
+ }
+ }
+ // Update the cached database objects
+ SharedCache.refreshDatabases(databases);
+ }
+ return true;
+ }
+
+ private boolean updateTables(RawStore rawStore, List<String> dbNames) {
+ if (dbNames != null) {
+ List<Table> tables = new ArrayList<Table>();
+ for (String dbName : dbNames) {
+ try {
+ List<String> tblNames = rawStore.getAllTables(dbName);
+ for (String tblName : tblNames) {
+ // If a preemption of this thread was requested, simply return before proceeding
+ if (Thread.interrupted()) {
+ return false;
+ }
+ Table table = rawStore.getTable(dbName, tblName);
+ tables.add(table);
+ }
+ // Update the cached database objects
+ SharedCache.refreshTables(dbName, tables);
+ for (String tblName : tblNames) {
+ // If a preemption of this thread was requested, simply return before proceeding
+ if (Thread.interrupted()) {
+ return false;
+ }
+ List<Partition> partitions =
+ rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+ SharedCache.refreshPartitions(dbName, tblName, partitions);
+ }
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.error("Updating CachedStore: unable to read table", e);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+ // Interrupt the cache update background thread
+ // Fire and forget (the master will respond appropriately when it gets a chance)
+ // All writes to the cache go through synchronized methods, so fire & forget is fine.
+ private void interruptCacheUpdateMaster() {
+ if (runningMasterThread.get() != null) {
+ runningMasterThread.get().interrupt();
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return rawStore.getConf();
+ }
+
+ @Override
+ public void shutdown() {
+ rawStore.shutdown();
+ }
+
+ @Override
+ public boolean openTransaction() {
+ return rawStore.openTransaction();
+ }
+
+ @Override
+ public boolean commitTransaction() {
+ return rawStore.commitTransaction();
+ }
+
+ @Override
+ public void rollbackTransaction() {
+ rawStore.rollbackTransaction();
+ }
+
+ @Override
+ public void createDatabase(Database db)
+ throws InvalidObjectException, MetaException {
+ rawStore.createDatabase(db);
+ interruptCacheUpdateMaster();
+ SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), db.deepCopy());
+ }
+
+ @Override
+ public Database getDatabase(String dbName) throws NoSuchObjectException {
+ Database db = SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName));
+ if (db == null) {
+ throw new NoSuchObjectException();
+ }
+ return SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName));
+ }
+
+ @Override
+ public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException {
+ boolean succ = rawStore.dropDatabase(dbname);
+ if (succ) {
+ interruptCacheUpdateMaster();
+ SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname));
+ }
+ return succ;
+ }
+
+ @Override
+ public boolean alterDatabase(String dbName, Database db)
+ throws NoSuchObjectException, MetaException {
+ boolean succ = rawStore.alterDatabase(dbName, db);
+ if (succ) {
+ interruptCacheUpdateMaster();
+ SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+ }
+ return succ;
+ }
+
+ @Override
+ public List<String> getDatabases(String pattern) throws MetaException {
+ List<String> results = new ArrayList<String>();
+ for (String dbName : SharedCache.listCachedDatabases()) {
+ dbName = HiveStringUtils.normalizeIdentifier(dbName);
+ if (CacheUtils.matches(dbName, pattern)) {
+ results.add(dbName);
+ }
+ }
+ return results;
+ }
+
+ @Override
+ public List<String> getAllDatabases() throws MetaException {
+ return SharedCache.listCachedDatabases();
+ }
+
+ @Override
+ public boolean createType(Type type) {
+ return rawStore.createType(type);
+ }
+
+ @Override
+ public Type getType(String typeName) {
+ return rawStore.getType(typeName);
+ }
+
+ @Override
+ public boolean dropType(String typeName) {
+ return rawStore.dropType(typeName);
+ }
+
+ private void validateTableType(Table tbl) {
+ // If the table has property EXTERNAL set, update table type
+ // accordingly
+ String tableType = tbl.getTableType();
+ boolean isExternal = "TRUE".equals(tbl.getParameters().get("EXTERNAL"));
+ if (TableType.MANAGED_TABLE.toString().equals(tableType)) {
+ if (isExternal) {
+ tableType = TableType.EXTERNAL_TABLE.toString();
+ }
+ }
+ if (TableType.EXTERNAL_TABLE.toString().equals(tableType)) {
+ if (!isExternal) {
+ tableType = TableType.MANAGED_TABLE.toString();
+ }
+ }
+ tbl.setTableType(tableType);
+ }
+
+ @Override
+ public void createTable(Table tbl)
+ throws InvalidObjectException, MetaException {
+ rawStore.createTable(tbl);
+ interruptCacheUpdateMaster();
+ validateTableType(tbl);
+ SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
+ HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+ }
+
+ @Override
+ public boolean dropTable(String dbName, String tableName)
+ throws MetaException, NoSuchObjectException, InvalidObjectException,
+ InvalidInputException {
+ boolean succ = rawStore.dropTable(dbName, tableName);
+ if (succ) {
+ interruptCacheUpdateMaster();
+ SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName));
+ }
+ return succ;
+ }
+
+ @Override
+ public Table getTable(String dbName, String tableName) throws MetaException {
+ Table tbl = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName));
+ if (tbl != null) {
+ tbl.unsetPrivileges();
+ tbl.setRewriteEnabled(tbl.isRewriteEnabled());
+ }
+ return tbl;
+ }
+
+ @Override
+ public boolean addPartition(Partition part)
+ throws InvalidObjectException, MetaException {
+ boolean succ = rawStore.addPartition(part);
+ if (succ) {
+ interruptCacheUpdateMaster();
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
+ HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+ }
+ return succ;
+ }
+
+ @Override
+ public boolean addPartitions(String dbName, String tblName,
+ List<Partition> parts) throws InvalidObjectException, MetaException {
+ boolean succ = rawStore.addPartitions(dbName, tblName, parts);
+ if (succ) {
+ interruptCacheUpdateMaster();
+ for (Partition part : parts) {
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), part);
+ }
+ }
+ return succ;
+ }
+
+ @Override
+ public boolean addPartitions(String dbName, String tblName,
+ PartitionSpecProxy partitionSpec, boolean ifNotExists)
+ throws InvalidObjectException, MetaException {
+ boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists);
+ if (succ) {
+ interruptCacheUpdateMaster();
+ PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+ while (iterator.hasNext()) {
+ Partition part = iterator.next();
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), part);
+ }
+ }
+ return succ;
+ }
+
+ @Override
+ public Partition getPartition(String dbName, String tableName,
+ List<String> part_vals) throws MetaException, NoSuchObjectException {
+ Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ if (part != null) {
+ part.unsetPrivileges();
+ }
+ return part;
+ }
+
+ @Override
+ public boolean doesPartitionExist(String dbName, String tableName,
+ List<String> part_vals) throws MetaException, NoSuchObjectException {
+ return SharedCache.existPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ }
+
+ @Override
+ public boolean dropPartition(String dbName, String tableName,
+ List<String> part_vals) throws MetaException, NoSuchObjectException,
+ InvalidObjectException, InvalidInputException {
+ boolean succ = rawStore.dropPartition(dbName, tableName, part_vals);
+ if (succ) {
+ interruptCacheUpdateMaster();
+ SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ }
+ return succ;
+ }
+
+ @Override
+ public List<Partition> getPartitions(String dbName, String tableName, int max)
+ throws MetaException, NoSuchObjectException {
+ List<Partition> parts = SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), max);
+ if (parts != null) {
+ for (Partition part : parts) {
+ part.unsetPrivileges();
+ }
+ }
+ return parts;
+ }
+
+ @Override
+ public void alterTable(String dbName, String tblName, Table newTable)
+ throws InvalidObjectException, MetaException {
+ rawStore.alterTable(dbName, tblName, newTable);
+ interruptCacheUpdateMaster();
+ validateTableType(newTable);
+ SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), newTable);
+ }
+
+ @Override
+ public List<String> getTables(String dbName, String pattern)
+ throws MetaException {
+ List<String> tableNames = new ArrayList<String>();
+ for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) {
+ if (CacheUtils.matches(table.getTableName(), pattern)) {
+ tableNames.add(table.getTableName());
+ }
+ }
+ return tableNames;
+ }
+
+ @Override
+ public List<String> getTables(String dbName, String pattern,
+ TableType tableType) throws MetaException {
+ List<String> tableNames = new ArrayList<String>();
+ for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) {
+ if (CacheUtils.matches(table.getTableName(), pattern) &&
+ table.getTableType().equals(tableType.toString())) {
+ tableNames.add(table.getTableName());
+ }
+ }
+ return tableNames;
+ }
+
+ @Override
+ public List<TableMeta> getTableMeta(String dbNames, String tableNames,
+ List<String> tableTypes) throws MetaException {
+ return SharedCache.getTableMeta(HiveStringUtils.normalizeIdentifier(dbNames),
+ HiveStringUtils.normalizeIdentifier(tableNames), tableTypes);
+ }
+
+ @Override
+ public List<Table> getTableObjectsByName(String dbName,
+ List<String> tblNames) throws MetaException, UnknownDBException {
+ List<Table> tables = new ArrayList<Table>();
+ for (String tblName : tblNames) {
+ tables.add(SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName)));
+ }
+ return tables;
+ }
+
+ @Override
+ public List<String> getAllTables(String dbName) throws MetaException {
+ List<String> tblNames = new ArrayList<String>();
+ for (Table tbl : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) {
+ tblNames.add(HiveStringUtils.normalizeIdentifier(tbl.getTableName()));
+ }
+ return tblNames;
+ }
+
+ @Override
+ public List<String> listTableNamesByFilter(String dbName, String filter,
+ short max_tables) throws MetaException, UnknownDBException {
+ List<String> tableNames = new ArrayList<String>();
+ int count = 0;
+ for (Table table : SharedCache.listCachedTables(HiveStringUtils.normalizeIdentifier(dbName))) {
+ if (CacheUtils.matches(table.getTableName(), filter)
+ && (max_tables == -1 || count < max_tables)) {
+ tableNames.add(table.getTableName());
+ count++;
+ }
+ }
+ return tableNames;
+ }
+
+ @Override
+ public List<String> listPartitionNames(String dbName, String tblName,
+ short max_parts) throws MetaException {
+ List<String> partitionNames = new ArrayList<String>();
+ Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName));
+ int count = 0;
+ for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), max_parts)) {
+ if (max_parts == -1 || count < max_parts) {
+ partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+ }
+ }
+ return partitionNames;
+ }
+
+ @Override
+ public List<String> listPartitionNamesByFilter(String db_name,
+ String tbl_name, String filter, short max_parts) throws MetaException {
+ // TODO Translate filter -> expr
+ return null;
+ }
+
+ @Override
+ public void alterPartition(String dbName, String tblName,
+ List<String> partVals, Partition newPart)
+ throws InvalidObjectException, MetaException {
+ rawStore.alterPartition(dbName, tblName, partVals, newPart);
+ interruptCacheUpdateMaster();
+ SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ }
+
+ @Override
+ public void alterPartitions(String dbName, String tblName,
+ List<List<String>> partValsList, List<Partition> newParts)
+ throws InvalidObjectException, MetaException {
+ rawStore.alterPartitions(dbName, tblName, partValsList, newParts);
+ interruptCacheUpdateMaster();
+ for (int i=0;i<partValsList.size();i++) {
+ List<String> partVals = partValsList.get(i);
+ Partition newPart = newParts.get(i);
+ SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ }
+ }
+
+ @Override
+ public boolean addIndex(Index index)
+ throws InvalidObjectException, MetaException {
+ return rawStore.addIndex(index);
+ }
+
+ @Override
+ public Index getIndex(String dbName, String origTableName, String indexName)
+ throws MetaException {
+ return rawStore.getIndex(dbName, origTableName, indexName);
+ }
+
+ @Override
+ public boolean dropIndex(String dbName, String origTableName,
+ String indexName) throws MetaException {
+ return rawStore.dropIndex(dbName, origTableName, indexName);
+ }
+
+ @Override
+ public List<Index> getIndexes(String dbName, String origTableName, int max)
+ throws MetaException {
+ return rawStore.getIndexes(dbName, origTableName, max);
+ }
+
+ @Override
+ public List<String> listIndexNames(String dbName, String origTableName,
+ short max) throws MetaException {
+ return rawStore.listIndexNames(dbName, origTableName, max);
+ }
+
+ @Override
+ public void alterIndex(String dbname, String baseTblName, String name,
+ Index newIndex) throws InvalidObjectException, MetaException {
+ rawStore.alterIndex(dbname, baseTblName, name, newIndex);
+ }
+
+ private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr,
+ String defaultPartName, short maxParts, List<String> result) throws MetaException, NoSuchObjectException {
+ List<Partition> parts = SharedCache.listCachedPartitions(
+ HiveStringUtils.normalizeIdentifier(table.getDbName()),
+ HiveStringUtils.normalizeIdentifier(table.getTableName()), maxParts);
+ for (Partition part : parts) {
+ result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
+ }
+ List<String> columnNames = new ArrayList<String>();
+ List<PrimitiveTypeInfo> typeInfos = new ArrayList<PrimitiveTypeInfo>();
+ for (FieldSchema fs : table.getPartitionKeys()) {
+ columnNames.add(fs.getName());
+ typeInfos.add(TypeInfoFactory.getPrimitiveTypeInfo(fs.getType()));
+ }
+ if (defaultPartName == null || defaultPartName.isEmpty()) {
+ defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+ }
+ return expressionProxy.filterPartitionsByExpr(
+ columnNames, typeInfos, expr, defaultPartName, result);
+ }
+
+ @Override
+ public List<Partition> getPartitionsByFilter(String dbName, String tblName,
+ String filter, short maxParts)
+ throws MetaException, NoSuchObjectException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr,
+ String defaultPartitionName, short maxParts, List<Partition> result)
+ throws TException {
+ List<String> partNames = new LinkedList<String>();
+ Table table = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName));
+ boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(
+ table, expr, defaultPartitionName, maxParts, partNames);
+ for (String partName : partNames) {
+ Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partNameToVals(partName));
+ result.add(part);
+ }
+ return hasUnknownPartitions;
+ }
+
+ @Override
+ public int getNumPartitionsByFilter(String dbName, String tblName,
+ String filter) throws MetaException, NoSuchObjectException {
+ Table table = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName));
+ // TODO filter -> expr
+ return 0;
+ }
+
+ @Override
+ public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr)
+ throws MetaException, NoSuchObjectException {
+ String defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+ List<String> partNames = new LinkedList<String>();
+ Table table = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName));
+ getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames);
+ return partNames.size();
+ }
+
+ public static List<String> partNameToVals(String name) {
+ if (name == null) return null;
+ List<String> vals = new ArrayList<String>();
+ String[] kvp = name.split("/");
+ for (String kv : kvp) {
+ vals.add(FileUtils.unescapePathName(kv.substring(kv.indexOf('=') + 1)));
+ }
+ return vals;
+ }
+
+ @Override
+ public List<Partition> getPartitionsByNames(String dbName, String tblName,
+ List<String> partNames) throws MetaException, NoSuchObjectException {
+ List<Partition> partitions = new ArrayList<Partition>();
+ for (String partName : partNames) {
+ Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partNameToVals(partName));
+ if (part!=null) {
+ partitions.add(part);
+ }
+ }
+ return partitions;
+ }
+
+ @Override
+ public Table markPartitionForEvent(String dbName, String tblName,
+ Map<String, String> partVals, PartitionEventType evtType)
+ throws MetaException, UnknownTableException, InvalidPartitionException,
+ UnknownPartitionException {
+ return rawStore.markPartitionForEvent(dbName, tblName, partVals, evtType);
+ }
+
+ @Override
+ public boolean isPartitionMarkedForEvent(String dbName, String tblName,
+ Map<String, String> partName, PartitionEventType evtType)
+ throws MetaException, UnknownTableException, InvalidPartitionException,
+ UnknownPartitionException {
+ return rawStore.isPartitionMarkedForEvent(dbName, tblName, partName, evtType);
+ }
+
+ @Override
+ public boolean addRole(String rowName, String ownerName)
+ throws InvalidObjectException, MetaException, NoSuchObjectException {
+ return rawStore.addRole(rowName, ownerName);
+ }
+
+ @Override
+ public boolean removeRole(String roleName)
+ throws MetaException, NoSuchObjectException {
+ return rawStore.removeRole(roleName);
+ }
+
+ @Override
+ public boolean grantRole(Role role, String userName,
+ PrincipalType principalType, String grantor, PrincipalType grantorType,
+ boolean grantOption)
+ throws MetaException, NoSuchObjectException, InvalidObjectException {
+ return rawStore.grantRole(role, userName, principalType, grantor, grantorType, grantOption);
+ }
+
+ @Override
+ public boolean revokeRole(Role role, String userName,
+ PrincipalType principalType, boolean grantOption)
+ throws MetaException, NoSuchObjectException {
+ return rawStore.revokeRole(role, userName, principalType, grantOption);
+ }
+
+ @Override
+ public PrincipalPrivilegeSet getUserPrivilegeSet(String userName,
+ List<String> groupNames) throws InvalidObjectException, MetaException {
+ return rawStore.getUserPrivilegeSet(userName, groupNames);
+ }
+
+ @Override
+ public PrincipalPrivilegeSet getDBPrivilegeSet(String dbName, String userName,
+ List<String> groupNames) throws InvalidObjectException, MetaException {
+ return rawStore.getDBPrivilegeSet(dbName, userName, groupNames);
+ }
+
+ @Override
+ public PrincipalPrivilegeSet getTablePrivilegeSet(String dbName,
+ String tableName, String userName, List<String> groupNames)
+ throws InvalidObjectException, MetaException {
+ return rawStore.getTablePrivilegeSet(dbName, tableName, userName, groupNames);
+ }
+
+ @Override
+ public PrincipalPrivilegeSet getPartitionPrivilegeSet(String dbName,
+ String tableName, String partition, String userName,
+ List<String> groupNames) throws InvalidObjectException, MetaException {
+ return rawStore.getPartitionPrivilegeSet(dbName, tableName, partition, userName, groupNames);
+ }
+
+ @Override
+ public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName,
+ String tableName, String partitionName, String columnName,
+ String userName, List<String> groupNames)
+ throws InvalidObjectException, MetaException {
+ return rawStore.getColumnPrivilegeSet(dbName, tableName, partitionName, columnName, userName, groupNames);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalGlobalGrants(
+ String principalName, PrincipalType principalType) {
+ return rawStore.listPrincipalGlobalGrants(principalName, principalType);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalDBGrants(String principalName,
+ PrincipalType principalType, String dbName) {
+ return rawStore.listPrincipalDBGrants(principalName, principalType, dbName);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listAllTableGrants(String principalName,
+ PrincipalType principalType, String dbName, String tableName) {
+ return rawStore.listAllTableGrants(principalName, principalType, dbName, tableName);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalPartitionGrants(
+ String principalName, PrincipalType principalType, String dbName,
+ String tableName, List<String> partValues, String partName) {
+ return rawStore.listPrincipalPartitionGrants(principalName, principalType, dbName, tableName, partValues, partName);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalTableColumnGrants(
+ String principalName, PrincipalType principalType, String dbName,
+ String tableName, String columnName) {
+ return rawStore.listPrincipalTableColumnGrants(principalName, principalType, dbName, tableName, columnName);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrants(
+ String principalName, PrincipalType principalType, String dbName,
+ String tableName, List<String> partValues, String partName,
+ String columnName) {
+ return rawStore.listPrincipalPartitionColumnGrants(principalName, principalType, dbName, tableName, partValues, partName, columnName);
+ }
+
+ @Override
+ public boolean grantPrivileges(PrivilegeBag privileges)
+ throws InvalidObjectException, MetaException, NoSuchObjectException {
+ return rawStore.grantPrivileges(privileges);
+ }
+
+ @Override
+ public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption)
+ throws InvalidObjectException, MetaException, NoSuchObjectException {
+ return rawStore.revokePrivileges(privileges, grantOption);
+ }
+
+ @Override
+ public Role getRole(String roleName) throws NoSuchObjectException {
+ return rawStore.getRole(roleName);
+ }
+
+ @Override
+ public List<String> listRoleNames() {
+ return rawStore.listRoleNames();
+ }
+
+ @Override
+ public List<Role> listRoles(String principalName,
+ PrincipalType principalType) {
+ return rawStore.listRoles(principalName, principalType);
+ }
+
+ @Override
+ public List<RolePrincipalGrant> listRolesWithGrants(String principalName,
+ PrincipalType principalType) {
+ return rawStore.listRolesWithGrants(principalName, principalType);
+ }
+
+ @Override
+ public List<RolePrincipalGrant> listRoleMembers(String roleName) {
+ return rawStore.listRoleMembers(roleName);
+ }
+
+ @Override
+ public Partition getPartitionWithAuth(String dbName, String tblName,
+ List<String> partVals, String userName, List<String> groupNames)
+ throws MetaException, NoSuchObjectException, InvalidObjectException {
+ Partition p = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals);
+ if (p!=null) {
+ Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName));
+ String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals);
+ PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
+ userName, groupNames);
+ p.setPrivileges(privs);
+ }
+ return p;
+ }
+
+ @Override
+ public List<Partition> getPartitionsWithAuth(String dbName, String tblName,
+ short maxParts, String userName, List<String> groupNames)
+ throws MetaException, NoSuchObjectException, InvalidObjectException {
+ Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName));
+ List<Partition> partitions = new ArrayList<Partition>();
+ int count = 0;
+ for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), maxParts)) {
+ if (maxParts == -1 || count < maxParts) {
+ String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+ PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
+ userName, groupNames);
+ part.setPrivileges(privs);
+ partitions.add(part);
+ count++;
+ }
+ }
+ return partitions;
+ }
+
+ @Override
+ public List<String> listPartitionNamesPs(String dbName, String tblName,
+ List<String> partVals, short maxParts)
+ throws MetaException, NoSuchObjectException {
+ List<String> partNames = new ArrayList<String>();
+ int count = 0;
+ Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName));
+ for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), maxParts)) {
+ boolean psMatch = true;
+ for (int i=0;i<partVals.size();i++) {
+ String psVal = partVals.get(i);
+ String partVal = part.getValues().get(i);
+ if (psVal!=null && !psVal.isEmpty() && !psVal.equals(partVal)) {
+ psMatch = false;
+ break;
+ }
+ }
+ if (!psMatch) {
+ break;
+ }
+ if (maxParts == -1 || count < maxParts) {
+ partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+ count++;
+ }
+ }
+ return partNames;
+ }
+
+ @Override
+ public List<Partition> listPartitionsPsWithAuth(String dbName,
+ String tblName, List<String> partVals, short maxParts, String userName,
+ List<String> groupNames)
+ throws MetaException, InvalidObjectException, NoSuchObjectException {
+ List<Partition> partitions = new ArrayList<Partition>();
+ Table t = SharedCache.getTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName));
+ int count = 0;
+ for (Partition part : SharedCache.listCachedPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), maxParts)) {
+ boolean psMatch = true;
+ for (int i=0;i<partVals.size();i++) {
+ String psVal = partVals.get(i);
+ String partVal = part.getValues().get(i);
+ if (psVal!=null && !psVal.isEmpty() && !psVal.equals(partVal)) {
+ psMatch = false;
+ break;
+ }
+ }
+ if (!psMatch) {
+ continue;
+ }
+ if (maxParts == -1 || count < maxParts) {
+ String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+ PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
+ userName, groupNames);
+ part.setPrivileges(privs);
+ partitions.add(part);
+ }
+ }
+ return partitions;
+ }
+
+ @Override
+ public boolean updateTableColumnStatistics(ColumnStatistics colStats)
+ throws NoSuchObjectException, MetaException, InvalidObjectException,
+ InvalidInputException {
+ boolean succ = rawStore.updateTableColumnStatistics(colStats);
+ if (succ) {
+ SharedCache.updateTableColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
+ HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), colStats.getStatsObj());
+ }
+ return succ;
+ }
+
+ @Override
+ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats,
+ List<String> partVals) throws NoSuchObjectException, MetaException,
+ InvalidObjectException, InvalidInputException {
+ boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
+ if (succ) {
+ SharedCache.updatePartitionColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
+ HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj());
+ }
+ return succ;
+ }
+
+ @Override
+ public ColumnStatistics getTableColumnStatistics(String dbName,
+ String tableName, List<String> colName)
+ throws MetaException, NoSuchObjectException {
+ return rawStore.getTableColumnStatistics(dbName, tableName, colName);
+ }
+
+ @Override
+ public List<ColumnStatistics> getPartitionColumnStatistics(String dbName,
+ String tblName, List<String> partNames, List<String> colNames)
+ throws MetaException, NoSuchObjectException {
+ return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
+ }
+
+ @Override
+ public boolean deletePartitionColumnStatistics(String dbName,
+ String tableName, String partName, List<String> partVals, String colName)
+ throws NoSuchObjectException, MetaException, InvalidObjectException,
+ InvalidInputException {
+ return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName);
+ }
+
+ @Override
+ public boolean deleteTableColumnStatistics(String dbName, String tableName,
+ String colName) throws NoSuchObjectException, MetaException,
+ InvalidObjectException, InvalidInputException {
+ return rawStore.deleteTableColumnStatistics(dbName, tableName, colName);
+ }
+
+ @Override
+ public long cleanupEvents() {
+ return rawStore.cleanupEvents();
+ }
+
+ @Override
+ public boolean addToken(String tokenIdentifier, String delegationToken) {
+ return rawStore.addToken(tokenIdentifier, delegationToken);
+ }
+
+ @Override
+ public boolean removeToken(String tokenIdentifier) {
+ return rawStore.removeToken(tokenIdentifier);
+ }
+
+ @Override
+ public String getToken(String tokenIdentifier) {
+ return rawStore.getToken(tokenIdentifier);
+ }
+
+ @Override
+ public List<String> getAllTokenIdentifiers() {
+ return rawStore.getAllTokenIdentifiers();
+ }
+
+ @Override
+ public int addMasterKey(String key) throws MetaException {
+ return rawStore.addMasterKey(key);
+ }
+
+ @Override
+ public void updateMasterKey(Integer seqNo, String key)
+ throws NoSuchObjectException, MetaException {
+ rawStore.updateMasterKey(seqNo, key);
+ }
+
+ @Override
+ public boolean removeMasterKey(Integer keySeq) {
+ return rawStore.removeMasterKey(keySeq);
+ }
+
+ @Override
+ public String[] getMasterKeys() {
+ return rawStore.getMasterKeys();
+ }
+
+ @Override
+ public void verifySchema() throws MetaException {
+ rawStore.verifySchema();
+ }
+
+ @Override
+ public String getMetaStoreSchemaVersion() throws MetaException {
+ return rawStore.getMetaStoreSchemaVersion();
+ }
+
+ @Override
+ public void setMetaStoreSchemaVersion(String version, String comment)
+ throws MetaException {
+ rawStore.setMetaStoreSchemaVersion(version, comment);
+ }
+
+ @Override
+ public void dropPartitions(String dbName, String tblName,
+ List<String> partNames) throws MetaException, NoSuchObjectException {
+ rawStore.dropPartitions(dbName, tblName, partNames);
+ interruptCacheUpdateMaster();
+ for (String partName : partNames) {
+ List<String> vals = partNameToVals(partName);
+ SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), vals);
+ }
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalDBGrantsAll(
+ String principalName, PrincipalType principalType) {
+ return rawStore.listPrincipalDBGrantsAll(principalName, principalType);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalTableGrantsAll(
+ String principalName, PrincipalType principalType) {
+ return rawStore.listPrincipalTableGrantsAll(principalName, principalType);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalPartitionGrantsAll(
+ String principalName, PrincipalType principalType) {
+ return rawStore.listPrincipalPartitionGrantsAll(principalName, principalType);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalTableColumnGrantsAll(
+ String principalName, PrincipalType principalType) {
+ return rawStore.listPrincipalTableColumnGrantsAll(principalName, principalType);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrantsAll(
+ String principalName, PrincipalType principalType) {
+ return rawStore.listPrincipalPartitionColumnGrantsAll(principalName, principalType);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listGlobalGrantsAll() {
+ return rawStore.listGlobalGrantsAll();
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listDBGrantsAll(String dbName) {
+ return rawStore.listDBGrantsAll(dbName);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPartitionColumnGrantsAll(String dbName,
+ String tableName, String partitionName, String columnName) {
+ return rawStore.listPartitionColumnGrantsAll(dbName, tableName, partitionName, columnName);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listTableGrantsAll(String dbName,
+ String tableName) {
+ return rawStore.listTableGrantsAll(dbName, tableName);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listPartitionGrantsAll(String dbName,
+ String tableName, String partitionName) {
+ return rawStore.listPartitionGrantsAll(dbName, tableName, partitionName);
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> listTableColumnGrantsAll(String dbName,
+ String tableName, String columnName) {
+ return rawStore.listTableColumnGrantsAll(dbName, tableName, columnName);
+ }
+
+ @Override
+ public void createFunction(Function func)
+ throws InvalidObjectException, MetaException {
+ // TODO fucntionCache
+ rawStore.createFunction(func);
+ }
+
+ @Override
+ public void alterFunction(String dbName, String funcName,
+ Function newFunction) throws InvalidObjectException, MetaException {
+ // TODO fucntionCache
+ rawStore.alterFunction(dbName, funcName, newFunction);
+ }
+
+ @Override
+ public void dropFunction(String dbName, String funcName) throws MetaException,
+ NoSuchObjectException, InvalidObjectException, InvalidInputException {
+ // TODO fucntionCache
+ rawStore.dropFunction(dbName, funcName);
+ }
+
+ @Override
+ public Function getFunction(String dbName, String funcName)
+ throws MetaException {
+ // TODO fucntionCache
+ return rawStore.getFunction(dbName, funcName);
+ }
+
+ @Override
+ public List<Function> getAllFunctions() throws MetaException {
+ // TODO fucntionCache
+ return rawStore.getAllFunctions();
+ }
+
+ @Override
+ public List<String> getFunctions(String dbName, String pattern)
+ throws MetaException {
+ // TODO fucntionCache
+ return rawStore.getFunctions(dbName, pattern);
+ }
+
+ @Override
+ public AggrStats get_aggr_stats_for(String dbName, String tblName,
+ List<String> partNames, List<String> colNames)
+ throws MetaException, NoSuchObjectException {
+ List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
+ for (String colName : colNames) {
+ colStats.add(mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partNames, colName));
+ }
+ // TODO: revisit the partitions not found case for extrapolation
+ return new AggrStats(colStats, partNames.size());
+ }
+
+ private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName,
+ List<String> partNames, String colName) throws MetaException {
+ ColumnStatisticsObj colStats = null;
+ for (String partName : partNames) {
+ String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
+ ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats(
+ colStatsCacheKey);
+ if (colStats == null) {
+ colStats = colStatsForPart;
+ } else {
+ colStats = mergeColStatsObj(colStats, colStatsForPart);
+ }
+ }
+ return colStats;
+ }
+
+ private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1,
+ ColumnStatisticsObj colStats2) throws MetaException {
+ if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType()))
+ && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) {
+ throw new MetaException("Can't merge column stats for two partitions for different columns.");
+ }
+ ColumnStatisticsData csd = new ColumnStatisticsData();
+ ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(),
+ colStats1.getColType(), csd);
+ ColumnStatisticsData csData1 = colStats1.getStatsData();
+ ColumnStatisticsData csData2 = colStats2.getStatsData();
+ String colType = colStats1.getColType().toLowerCase();
+ if (colType.equals("boolean")) {
+ BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
+ boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses()
+ + csData2.getBooleanStats().getNumFalses());
+ boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues()
+ + csData2.getBooleanStats().getNumTrues());
+ boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls()
+ + csData2.getBooleanStats().getNumNulls());
+ csd.setBooleanStats(boolStats);
+ } else if (colType.equals("string") || colType.startsWith("varchar")
+ || colType.startsWith("char")) {
+ StringColumnStatsData stringStats = new StringColumnStatsData();
+ stringStats.setNumNulls(csData1.getStringStats().getNumNulls()
+ + csData2.getStringStats().getNumNulls());
+ stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2
+ .getStringStats().getAvgColLen()));
+ stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2
+ .getStringStats().getMaxColLen()));
+ stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats()
+ .getNumDVs()));
+ csd.setStringStats(stringStats);
+ } else if (colType.equals("binary")) {
+ BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
+ binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls()
+ + csData2.getBinaryStats().getNumNulls());
+ binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2
+ .getBinaryStats().getAvgColLen()));
+ binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2
+ .getBinaryStats().getMaxColLen()));
+ csd.setBinaryStats(binaryStats);
+ } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint")
+ || colType.equals("tinyint") || colType.equals("timestamp")) {
+ LongColumnStatsData longStats = new LongColumnStatsData();
+ longStats.setNumNulls(csData1.getLongStats().getNumNulls()
+ + csData2.getLongStats().getNumNulls());
+ longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats()
+ .getHighValue()));
+ longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats()
+ .getLowValue()));
+ longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats()
+ .getNumDVs()));
+ csd.setLongStats(longStats);
+ } else if (colType.equals("date")) {
+ DateColumnStatsData dateStats = new DateColumnStatsData();
+ dateStats.setNumNulls(csData1.getDateStats().getNumNulls()
+ + csData2.getDateStats().getNumNulls());
+ dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue()
+ .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch())));
+ dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue()
+ .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch())));
+ dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats()
+ .getNumDVs()));
+ csd.setDateStats(dateStats);
+ } else if (colType.equals("double") || colType.equals("float")) {
+ DoubleColumnStatsData doubleStats = new DoubleColumnStatsData();
+ doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls()
+ + csData2.getDoubleStats().getNumNulls());
+ doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2
+ .getDoubleStats().getHighValue()));
+ doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2
+ .getDoubleStats().getLowValue()));
+ doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats()
+ .getNumDVs()));
+ csd.setDoubleStats(doubleStats);
+ } else if (colType.startsWith("decimal")) {
+ DecimalColumnStatsData decimalStats = new DecimalColumnStatsData();
+ decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls()
+ + csData2.getDecimalStats().getNumNulls());
+ Decimal high = (csData1.getDecimalStats().getHighValue()
+ .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats()
+ .getHighValue() : csData2.getDecimalStats().getHighValue();
+ decimalStats.setHighValue(high);
+ Decimal low = (csData1.getDecimalStats().getLowValue()
+ .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats()
+ .getLowValue() : csData2.getDecimalStats().getLowValue();
+ decimalStats.setLowValue(low);
+ decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2
+ .getDecimalStats().getNumDVs()));
+ csd.setDecimalStats(decimalStats);
+ }
+ return cso;
+ }
+
+ @Override
+ public NotificationEventResponse getNextNotification(
+ NotificationEventRequest rqst) {
+ return rawStore.getNextNotification(rqst);
+ }
+
+ @Override
+ public void addNotificationEvent(NotificationEvent event) {
+ rawStore.addNotificationEvent(event);
+ }
+
+ @Override
+ public void cleanNotificationEvents(int olderThan) {
+ rawStore.cleanNotificationEvents(olderThan);
+ }
+
+ @Override
+ public CurrentNotificationEventId getCurrentNotificationEventId() {
+ return rawStore.getCurrentNotificationEventId();
+ }
+
+ @Override
+ public void flushCache() {
+ rawStore.flushCache();
+ }
+
+ @Override
+ public ByteBuffer[] getFileMetadata(List<Long> fileIds) throws MetaException {
+ return rawStore.getFileMetadata(fileIds);
+ }
+
+ @Override
+ public void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata,
+ FileMetadataExprType type) throws MetaException {
+ rawStore.putFileMetadata(fileIds, metadata, type);
+ }
+
+ @Override
+ public boolean isFileMetadataSupported() {
+ return rawStore.isFileMetadataSupported();
+ }
+
+ @Override
+ public void getFileMetadataByExpr(List<Long> fileIds,
+ FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas,
+ ByteBuffer[] exprResults, boolean[] eliminated) throws MetaException {
+ rawStore.getFileMetadataByExpr(fileIds, type, expr, metadatas, exprResults, eliminated);
+ }
+
+ @Override
+ public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) {
+ return rawStore.getFileMetadataHandler(type);
+ }
+
+ @Override
+ public int getTableCount() throws MetaException {
+ return SharedCache.getCachedTableCount();
+ }
+
+ @Override
+ public int getPartitionCount() throws MetaException {
+ return SharedCache.getCachedPartitionCount();
+ }
+
+ @Override
+ public int getDatabaseCount() throws MetaException {
+ return SharedCache.getCachedDatabaseCount();
+ }
+
+ @Override
+ public List<SQLPrimaryKey> getPrimaryKeys(String db_name, String tbl_name)
+ throws MetaException {
+ // TODO constraintCache
+ return rawStore.getPrimaryKeys(db_name, tbl_name);
+ }
+
+ @Override
+ public List<SQLForeignKey> getForeignKeys(String parent_db_name,
+ String parent_tbl_name, String foreign_db_name, String foreign_tbl_name)
+ throws MetaException {
+ // TODO constraintCache
+ return rawStore.getForeignKeys(parent_db_name, parent_tbl_name, foreign_db_name, foreign_tbl_name);
+ }
+
+ @Override
+ public void createTableWithConstraints(Table tbl,
+ List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys)
+ throws InvalidObjectException, MetaException {
+ // TODO constraintCache
+ rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys);
+ SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
+ HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+ }
+
+ @Override
+ public void dropConstraint(String dbName, String tableName,
+ String constraintName) throws NoSuchObjectException {
+ // TODO constraintCache
+ rawStore.dropConstraint(dbName, tableName, constraintName);
+ }
+
+ @Override
+ public void addPrimaryKeys(List<SQLPrimaryKey> pks)
+ throws InvalidObjectException, MetaException {
+ // TODO constraintCache
+ rawStore.addPrimaryKeys(pks);
+ }
+
+ @Override
+ public void addForeignKeys(List<SQLForeignKey> fks)
+ throws InvalidObjectException, MetaException {
+ // TODO constraintCache
+ rawStore.addForeignKeys(fks);
+ }
+
+ @Override
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(
+ String dbName, String tableName)
+ throws MetaException, NoSuchObjectException {
+ return rawStore.getAggrColStatsForTablePartitions(dbName, tableName);
+ }
+
+ public RawStore getRawStore() {
+ return rawStore;
+ }
+
+ @VisibleForTesting
+ public void setRawStore(RawStore rawStore) {
+ this.rawStore = rawStore;
+ }
+}