You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:00 UTC
[13/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index c8b0d4c..3670de1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore;
import static org.apache.commons.lang.StringUtils.join;
import static org.apache.commons.lang.StringUtils.repeat;
+import java.sql.Clob;
import java.sql.Connection;
import java.sql.Statement;
import java.sql.SQLException;
@@ -60,6 +61,8 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.cache.CacheUtils;
+import org.apache.hadoop.hive.metastore.cache.CachedStore;
import org.apache.hadoop.hive.metastore.model.MConstraint;
import org.apache.hadoop.hive.metastore.model.MDatabase;
import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
@@ -78,6 +81,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* This class contains the optimizations for MetaStore that rely on direct SQL access to
@@ -648,7 +652,7 @@ class MetaStoreDirectSql {
loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
@Override
public void apply(StorageDescriptor t, Object[] fields) {
- t.putToParameters((String)fields[1], (String)fields[2]);
+ t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
}});
// Perform conversion of null map values
for (StorageDescriptor t : sds.values()) {
@@ -779,7 +783,7 @@ class MetaStoreDirectSql {
loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
@Override
public void apply(List<FieldSchema> t, Object[] fields) {
- t.add(new FieldSchema((String)fields[2], (String)fields[3], (String)fields[1]));
+ t.add(new FieldSchema((String)fields[2], extractSqlClob(fields[3]), (String)fields[1]));
}});
}
@@ -790,7 +794,7 @@ class MetaStoreDirectSql {
loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
@Override
public void apply(SerDeInfo t, Object[] fields) {
- t.putToParameters((String)fields[1], (String)fields[2]);
+ t.putToParameters((String)fields[1], extractSqlClob(fields[2]));
}});
// Perform conversion of null map values
for (SerDeInfo t : serdes.values()) {
@@ -878,6 +882,21 @@ class MetaStoreDirectSql {
return ((Number) obj).doubleValue();
}
+ private String extractSqlClob(Object value) {
+ if (value == null) return null;
+ try {
+ if (value instanceof Clob) {
+ // we trim the Clob value to a max length an int can hold
+ int maxLength = (((Clob)value).length() < Integer.MAX_VALUE - 2) ? (int)((Clob)value).length() : Integer.MAX_VALUE - 2;
+ return ((Clob)value).getSubString(1L, maxLength);
+ } else {
+ return value.toString();
+ }
+ } catch (SQLException sqle) {
+ return null;
+ }
+ }
+
private static String trimCommaList(StringBuilder sb) {
if (sb.length() > 0) {
sb.setLength(sb.length() - 1);
@@ -1190,7 +1209,7 @@ class MetaStoreDirectSql {
}
public AggrStats aggrColStatsForPartitions(String dbName, String tableName,
- List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation)
+ List<String> partNames, List<String> colNames, boolean useDensityFunctionForNDVEstimation, double ndvTuner)
throws MetaException {
if (colNames.isEmpty() || partNames.isEmpty()) {
LOG.debug("Columns is empty or partNames is empty : Short-circuiting stats eval");
@@ -1225,7 +1244,7 @@ class MetaStoreDirectSql {
// Read aggregated stats for one column
colStatsAggrFromDB =
columnStatisticsObjForPartitions(dbName, tableName, partNames, colNamesForDB,
- partsFound, useDensityFunctionForNDVEstimation);
+ partsFound, useDensityFunctionForNDVEstimation, ndvTuner);
if (!colStatsAggrFromDB.isEmpty()) {
ColumnStatisticsObj colStatsAggr = colStatsAggrFromDB.get(0);
colStatsList.add(colStatsAggr);
@@ -1238,7 +1257,7 @@ class MetaStoreDirectSql {
partsFound = partsFoundForPartitions(dbName, tableName, partNames, colNames);
colStatsList =
columnStatisticsObjForPartitions(dbName, tableName, partNames, colNames, partsFound,
- useDensityFunctionForNDVEstimation);
+ useDensityFunctionForNDVEstimation, ndvTuner);
}
LOG.info("useDensityFunctionForNDVEstimation = " + useDensityFunctionForNDVEstimation
+ "\npartsFound = " + partsFound + "\nColumnStatisticsObj = "
@@ -1301,24 +1320,81 @@ class MetaStoreDirectSql {
private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(final String dbName,
final String tableName, final List<String> partNames, List<String> colNames, long partsFound,
- final boolean useDensityFunctionForNDVEstimation) throws MetaException {
+ final boolean useDensityFunctionForNDVEstimation, final double ndvTuner) throws MetaException {
final boolean areAllPartsFound = (partsFound == partNames.size());
return runBatched(colNames, new Batchable<String, ColumnStatisticsObj>() {
public List<ColumnStatisticsObj> run(final List<String> inputColNames) throws MetaException {
return runBatched(partNames, new Batchable<String, ColumnStatisticsObj>() {
public List<ColumnStatisticsObj> run(List<String> inputPartNames) throws MetaException {
return columnStatisticsObjForPartitionsBatch(dbName, tableName, inputPartNames,
- inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation);
+ inputColNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner);
}
});
}
});
}
+ // Get aggregated column stats for a table per partition for all columns in the partition
+ // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm)
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
+ String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", "
+ + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
+ + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
+ + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
+ + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
+ // The following data is used to compute a partitioned table's NDV based
+ // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
+ // accurately derived from partition NDVs, because the domain of column value two partitions
+ // can overlap. If there is no overlap then global NDV is just the sum
+ // of partition NDVs (UpperBound). But if there is some overlay then
+ // global NDV can be anywhere between sum of partition NDVs (no overlap)
+ // and same as one of the partition NDV (domain of column value in all other
+ // partitions is subset of the domain value in one of the partition)
+ // (LowerBound).But under uniform distribution, we can roughly estimate the global
+ // NDV by leveraging the min/max values.
+ // And, we also guarantee that the estimation makes sense by comparing it to the
+ // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
+ // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
+ + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
+ + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
+ + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
+ + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\""
+ + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+ long start = 0;
+ long end = 0;
+ Query query = null;
+ boolean doTrace = LOG.isDebugEnabled();
+ Object qResult = null;
+ ForwardQueryResult fqr = null;
+ start = doTrace ? System.nanoTime() : 0;
+ query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ qResult = executeWithArray(query,
+ prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), queryText);
+ if (qResult == null) {
+ query.closeAll();
+ return Maps.newHashMap();
+ }
+ end = doTrace ? System.nanoTime() : 0;
+ timingTrace(doTrace, queryText, start, end);
+ List<Object[]> list = ensureList(qResult);
+ Map<String, ColumnStatisticsObj> partColStatsMap = new HashMap<String, ColumnStatisticsObj>();
+ for (Object[] row : list) {
+ String partName = (String) row[0];
+ String colName = (String) row[1];
+ partColStatsMap.put(
+ CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName),
+ prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner));
+ Deadline.checkTimeout();
+ }
+ query.closeAll();
+ return partColStatsMap;
+ }
+
/** Should be called with the list short enough to not trip up Oracle/etc. */
private List<ColumnStatisticsObj> columnStatisticsObjForPartitionsBatch(String dbName,
String tableName, List<String> partNames, List<String> colNames, boolean areAllPartsFound,
- boolean useDensityFunctionForNDVEstimation) throws MetaException {
+ boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
// TODO: all the extrapolation logic should be moved out of this class,
// only mechanical data retrieval should remain here.
String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "
@@ -1370,7 +1446,7 @@ class MetaStoreDirectSql {
List<Object[]> list = ensureList(qResult);
List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(list.size());
for (Object[] row : list) {
- colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation));
+ colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
Deadline.checkTimeout();
}
query.closeAll();
@@ -1429,7 +1505,7 @@ class MetaStoreDirectSql {
}
list = ensureList(qResult);
for (Object[] row : list) {
- colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation));
+ colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
Deadline.checkTimeout();
}
end = doTrace ? System.nanoTime() : 0;
@@ -1576,7 +1652,7 @@ class MetaStoreDirectSql {
query.closeAll();
}
}
- colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation));
+ colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner));
Deadline.checkTimeout();
}
}
@@ -1596,13 +1672,13 @@ class MetaStoreDirectSql {
}
private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i,
- boolean useDensityFunctionForNDVEstimation) throws MetaException {
+ boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
ColumnStatisticsData data = new ColumnStatisticsData();
ColumnStatisticsObj cso = new ColumnStatisticsObj((String) row[i++], (String) row[i++], data);
Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++], declow = row[i++], dechigh = row[i++], nulls = row[i++], dist = row[i++], avglen = row[i++], maxlen = row[i++], trues = row[i++], falses = row[i++], avgLong = row[i++], avgDouble = row[i++], avgDecimal = row[i++], sumDist = row[i++];
StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, lhigh, dlow, dhigh,
declow, dechigh, nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble,
- avgDecimal, sumDist, useDensityFunctionForNDVEstimation);
+ avgDecimal, sumDist, useDensityFunctionForNDVEstimation, ndvTuner);
return cso;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
index b0defb5..868e5a5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
@@ -75,19 +75,17 @@ public abstract class MetaStoreEventListener implements Configurable {
}
/**
- * @param add partition event
- * @throws MetaException
- */
-
- /**
* @param tableEvent alter table event
* @throws MetaException
*/
public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
}
- public void onAddPartition (AddPartitionEvent partitionEvent)
- throws MetaException {
+ /**
+ * @param partitionEvent add partition event
+ * @throws MetaException
+ */
+ public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException {
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
new file mode 100644
index 0000000..20011cc
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience.Private;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+
+/**
+ * This class is used to notify a list of listeners about specific MetaStore events.
+ */
+@Private
+public class MetaStoreListenerNotifier {
+ private interface EventNotifier {
+ void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException;
+ }
+
+ private static Map<EventType, EventNotifier> notificationEvents = Maps.newHashMap(
+ ImmutableMap.<EventType, EventNotifier>builder()
+ .put(EventType.CREATE_DATABASE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onCreateDatabase((CreateDatabaseEvent)event);
+ }
+ })
+ .put(EventType.DROP_DATABASE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropDatabase((DropDatabaseEvent)event);
+ }
+ })
+ .put(EventType.CREATE_TABLE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onCreateTable((CreateTableEvent)event);
+ }
+ })
+ .put(EventType.DROP_TABLE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropTable((DropTableEvent)event);
+ }
+ })
+ .put(EventType.ADD_PARTITION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAddPartition((AddPartitionEvent)event);
+ }
+ })
+ .put(EventType.DROP_PARTITION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropPartition((DropPartitionEvent)event);
+ }
+ })
+ .put(EventType.ALTER_TABLE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAlterTable((AlterTableEvent)event);
+ }
+ })
+ .put(EventType.ALTER_PARTITION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAlterPartition((AlterPartitionEvent)event);
+ }
+ })
+ .put(EventType.INSERT, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onInsert((InsertEvent)event);
+ }
+ })
+ .put(EventType.CREATE_FUNCTION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onCreateFunction((CreateFunctionEvent)event);
+ }
+ })
+ .put(EventType.DROP_FUNCTION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropFunction((DropFunctionEvent)event);
+ }
+ })
+ .put(EventType.CREATE_INDEX, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAddIndex((AddIndexEvent)event);
+ }
+ })
+ .put(EventType.DROP_INDEX, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropIndex((DropIndexEvent)event);
+ }
+ })
+ .put(EventType.ALTER_INDEX, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAlterIndex((AlterIndexEvent)event);
+ }
+ })
+ .build()
+ );
+
+ /**
+ * Notify a list of listeners about a specific metastore event. Each listener notified might update
+ * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+ * be returned to the caller.
+ *
+ * @param listeners List of MetaStoreEventListener listeners.
+ * @param eventType Type of the notification event.
+ * @param event The ListenerEvent with information about the event.
+ * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+ * map if no parameters were updated or if no listeners were notified.
+ * @throws MetaException If an error occurred while calling the listeners.
+ */
+ public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+ EventType eventType,
+ ListenerEvent event) throws MetaException {
+
+ Preconditions.checkNotNull(listeners, "Listeners must not be null.");
+ Preconditions.checkNotNull(event, "The event must not be null.");
+
+ for (MetaStoreEventListener listener : listeners) {
+ notificationEvents.get(eventType).notify(listener, event);
+ }
+
+ // Each listener called above might set a different parameter on the event.
+ // This write permission is allowed on the listener side to avoid breaking compatibility if we change the API
+ // method calls.
+ return event.getParameters();
+ }
+
+ /**
+ * Notify a list of listeners about a specific metastore event. Each listener notified might update
+ * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+ * be returned to the caller.
+ *
+ * @param listeners List of MetaStoreEventListener listeners.
+ * @param eventType Type of the notification event.
+ * @param event The ListenerEvent with information about the event.
+ * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client.
+ * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+ * map if no parameters were updated or if no listeners were notified.
+ * @throws MetaException If an error occurred while calling the listeners.
+ */
+ public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+ EventType eventType,
+ ListenerEvent event,
+ EnvironmentContext environmentContext) throws MetaException {
+
+ Preconditions.checkNotNull(event, "The event must not be null.");
+
+ event.setEnvironmentContext(environmentContext);
+ return notifyEvent(listeners, eventType, event);
+ }
+
+ /**
+ * Notify a list of listeners about a specific metastore event. Each listener notified might update
+ * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+ * be returned to the caller.
+ *
+ * @param listeners List of MetaStoreEventListener listeners.
+ * @param eventType Type of the notification event.
+ * @param event The ListenerEvent with information about the event.
+ * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client.
+ * @param parameters A list of key/value pairs with the new parameters to add.
+ * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+ * map if no parameters were updated or if no listeners were notified.
+ * @throws MetaException If an error occurred while calling the listeners.
+ */
+ public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+ EventType eventType,
+ ListenerEvent event,
+ EnvironmentContext environmentContext,
+ Map<String, String> parameters) throws MetaException {
+
+ Preconditions.checkNotNull(event, "The event must not be null.");
+
+ event.putParameters(parameters);
+ return notifyEvent(listeners, eventType, event, environmentContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java
index 9c30ee7..320902b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreSchemaInfo.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.metastore;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
@@ -27,21 +26,19 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.common.util.HiveVersionInfo;
import com.google.common.collect.ImmutableMap;
public class MetaStoreSchemaInfo {
- private static String SQL_FILE_EXTENSION=".sql";
- private static String UPGRADE_FILE_PREFIX="upgrade-";
- private static String INIT_FILE_PREFIX="hive-schema-";
- private static String VERSION_UPGRADE_LIST = "upgrade.order";
- private static String PRE_UPGRADE_PREFIX = "pre-";
+ private static final String SQL_FILE_EXTENSION = ".sql";
+ private static final String UPGRADE_FILE_PREFIX = "upgrade-";
+ private static final String INIT_FILE_PREFIX = "hive-schema-";
+ private static final String VERSION_UPGRADE_LIST = "upgrade.order";
+ private static final String PRE_UPGRADE_PREFIX = "pre-";
private final String dbType;
private final String hiveSchemaVersions[];
- private final HiveConf hiveConf;
private final String hiveHome;
// Some version upgrades often don't change schema. So they are equivalent to
@@ -55,10 +52,9 @@ public class MetaStoreSchemaInfo {
"1.2.1", "1.2.0"
);
- public MetaStoreSchemaInfo(String hiveHome, HiveConf hiveConf, String dbType) throws HiveMetaException {
+ public MetaStoreSchemaInfo(String hiveHome, String dbType) throws HiveMetaException {
this.hiveHome = hiveHome;
this.dbType = dbType;
- this.hiveConf = hiveConf;
// load upgrade order for the given dbType
List<String> upgradeOrderList = new ArrayList<String>();
String upgradeListFile = getMetaStoreScriptDir() + File.separator +
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 6259cda..3ee7977 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -47,6 +47,7 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -86,6 +87,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
+import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.ReflectionUtil;
@@ -638,14 +640,6 @@ public class MetaStoreUtils {
}
}
- static boolean isCascadeNeededInAlterTable(Table oldTable, Table newTable) {
- //currently cascade only supports add/replace columns and
- //changing column type/position/name/comments
- List<FieldSchema> oldCols = oldTable.getSd().getCols();
- List<FieldSchema> newCols = newTable.getSd().getCols();
- return !areSameColumns(oldCols, newCols);
- }
-
static boolean areSameColumns(List<FieldSchema> oldCols, List<FieldSchema> newCols) {
if (oldCols.size() != newCols.size()) {
return false;
@@ -696,8 +690,6 @@ public class MetaStoreUtils {
TypeInfoUtils.getTypeInfoFromTypeString(newType));
}
- public static final int MAX_MS_TYPENAME_LENGTH = 2000; // 4000/2, for an unlikely unicode case
-
public static final String TYPE_FROM_DESERIALIZER = "<derived from deserializer>";
/**
* validate column type
@@ -708,9 +700,6 @@ public class MetaStoreUtils {
*/
static public String validateColumnType(String type) {
if (type.equals(TYPE_FROM_DESERIALIZER)) return null;
- if (type.length() > MAX_MS_TYPENAME_LENGTH) {
- return "type name is too long: " + type;
- }
int last = 0;
boolean lastAlphaDigit = isValidTypeChar(type.charAt(last));
for (int i = 1; i <= type.length(); i++) {
@@ -1769,8 +1758,19 @@ public class MetaStoreUtils {
* @param conf
* @return The SASL configuration
*/
- public static Map<String, String> getMetaStoreSaslProperties(HiveConf conf) {
+ public static Map<String, String> getMetaStoreSaslProperties(HiveConf conf, boolean useSSL) {
// As of now Hive Meta Store uses the same configuration as Hadoop SASL configuration
+
+ // If SSL is enabled, override the given value of "hadoop.rpc.protection" and set it to "authentication"
+ // This disables any encryption provided by SASL, since SSL already provides it
+ String hadoopRpcProtectionVal = conf.get(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION);
+ String hadoopRpcProtectionAuth = SaslRpcServer.QualityOfProtection.AUTHENTICATION.toString();
+
+ if (useSSL && hadoopRpcProtectionVal != null && !hadoopRpcProtectionVal.equals(hadoopRpcProtectionAuth)) {
+ LOG.warn("Overriding value of " + CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION + " setting it from "
+ + hadoopRpcProtectionVal + " to " + hadoopRpcProtectionAuth + " because SSL is enabled");
+ conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, hadoopRpcProtectionAuth);
+ }
return ShimLoader.getHadoopThriftAuthBridge().getHadoopSaslProperties(conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 51bc6d0..c351ffd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -160,9 +160,13 @@ import org.apache.hive.common.util.HiveStringUtils;
import org.apache.thrift.TException;
import org.datanucleus.AbstractNucleusContext;
import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.ClassLoaderResolverImpl;
import org.datanucleus.NucleusContext;
+import org.datanucleus.api.jdo.JDOPersistenceManager;
import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
import org.datanucleus.store.rdbms.exceptions.MissingTableException;
+import org.datanucleus.store.scostore.Store;
+import org.datanucleus.util.WeakValueMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,6 +199,7 @@ public class ObjectStore implements RawStore, Configurable {
private static final Map<String, Class> PINCLASSMAP;
private static final String HOSTNAME;
private static final String USER;
+ private static final String JDO_PARAM = ":param";
static {
Map<String, Class> map = new HashMap<String, Class>();
map.put("table", MTable.class);
@@ -234,26 +239,22 @@ public class ObjectStore implements RawStore, Configurable {
private Pattern partitionValidationPattern;
/**
- * A class to pass the Query object to the caller to let the caller release
- * resources by calling QueryWrapper.query.closeAll() after consuming all the query results.
+ * A Autocloseable wrapper around Query class to pass the Query object to the caller and let the caller release
+ * the resources when the QueryWrapper goes out of scope
*/
- public static class QueryWrapper {
+ public static class QueryWrapper implements AutoCloseable {
public Query query;
/**
* Explicitly closes the query object to release the resources
*/
+ @Override
public void close() {
if (query != null) {
query.closeAll();
query = null;
}
}
-
- @Override
- protected void finalize() {
- this.close();
- }
}
public ObjectStore() {
@@ -284,6 +285,9 @@ public class ObjectStore implements RawStore, Configurable {
boolean propsChanged = !propsFromConf.equals(prop);
if (propsChanged) {
+ if (pmf != null){
+ clearOutPmfClassLoaderCache(pmf);
+ }
pmf = null;
prop = null;
}
@@ -748,12 +752,7 @@ public class ObjectStore implements RawStore, Configurable {
pm.retrieve(mdb);
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
if (mdb == null) {
throw new NoSuchObjectException("There is no database named " + name);
@@ -872,10 +871,7 @@ public class ObjectStore implements RawStore, Configurable {
}
success = commitTransaction();
} finally {
- if (!success) {
- rollbackTransaction();
- }
- queryWrapper.close();
+ rollbackAndCleanup(success, queryWrapper);
}
return success;
}
@@ -893,33 +889,20 @@ public class ObjectStore implements RawStore, Configurable {
// Take the pattern and split it on the | to get all the composing
// patterns
String[] subpatterns = pattern.trim().split("\\|");
- String queryStr = "select name from org.apache.hadoop.hive.metastore.model.MDatabase where (";
- boolean first = true;
- for (String subpattern : subpatterns) {
- subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*");
- if (!first) {
- queryStr = queryStr + " || ";
- }
- queryStr = queryStr + " name.matches(\"" + subpattern + "\")";
- first = false;
- }
- queryStr = queryStr + ")";
- query = pm.newQuery(queryStr);
+ StringBuilder filterBuilder = new StringBuilder();
+ List<String> parameterVals = new ArrayList<>(subpatterns.length);
+ appendPatternCondition(filterBuilder, "name", subpatterns, parameterVals);
+ query = pm.newQuery(MDatabase.class, filterBuilder.toString());
query.setResult("name");
query.setOrdering("name ascending");
- Collection names = (Collection) query.execute();
+ Collection names = (Collection) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
databases = new ArrayList<String>();
for (Iterator i = names.iterator(); i.hasNext();) {
databases.add((String) i.next());
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return databases;
}
@@ -939,12 +922,7 @@ public class ObjectStore implements RawStore, Configurable {
databases = new ArrayList<String>((Collection<String>) query.execute());
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
Collections.sort(databases);
return databases;
@@ -1012,12 +990,7 @@ public class ObjectStore implements RawStore, Configurable {
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return type;
}
@@ -1041,12 +1014,7 @@ public class ObjectStore implements RawStore, Configurable {
success = commitTransaction();
LOG.debug("type not found " + typeName, e);
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return success;
}
@@ -1206,6 +1174,9 @@ public class ObjectStore implements RawStore, Configurable {
private List<MConstraint> listAllTableConstraintsWithOptionalConstraintName
(String dbName, String tableName, String constraintname) {
+ dbName = HiveStringUtils.normalizeIdentifier(dbName);
+ tableName = HiveStringUtils.normalizeIdentifier(tableName);
+ constraintname = constraintname!=null?HiveStringUtils.normalizeIdentifier(constraintname):null;
List<MConstraint> mConstraints = null;
List<String> constraintNames = new ArrayList<String>();
Query query = null;
@@ -1296,40 +1267,28 @@ public class ObjectStore implements RawStore, Configurable {
dbName = HiveStringUtils.normalizeIdentifier(dbName);
// Take the pattern and split it on the | to get all the composing
// patterns
- String[] subpatterns = pattern.trim().split("\\|");
- String queryStr =
- "select tableName from org.apache.hadoop.hive.metastore.model.MTable "
- + "where database.name == dbName && (";
- boolean first = true;
- for (String subpattern : subpatterns) {
- subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*");
- if (!first) {
- queryStr = queryStr + " || ";
- }
- queryStr = queryStr + " tableName.matches(\"" + subpattern + "\")";
- first = false;
+ List<String> parameterVals = new ArrayList<>();
+ StringBuilder filterBuilder = new StringBuilder();
+ //adds database.name == dbName to the filter
+ appendSimpleCondition(filterBuilder, "database.name", new String[] {dbName}, parameterVals);
+ if(pattern != null) {
+ appendPatternCondition(filterBuilder, "tableName", pattern, parameterVals);
}
- queryStr = queryStr + ")";
- if (tableType != null) {
- queryStr = queryStr + " && tableType.matches(\"" + tableType.toString() + "\")";
+ if(tableType != null) {
+ appendPatternCondition(filterBuilder, "tableType", new String[] {tableType.toString()}, parameterVals);
}
- query = pm.newQuery(queryStr);
- query.declareParameters("java.lang.String dbName");
+
+ query = pm.newQuery(MTable.class, filterBuilder.toString());
query.setResult("tableName");
query.setOrdering("tableName ascending");
- Collection names = (Collection) query.execute(dbName);
+ Collection names = (Collection) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
tbls = new ArrayList<String>();
for (Iterator i = names.iterator(); i.hasNext();) {
tbls.add((String) i.next());
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return tbls;
}
@@ -1361,12 +1320,7 @@ public class ObjectStore implements RawStore, Configurable {
result = (Long) query.execute();
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return result.intValue();
}
@@ -1382,19 +1336,20 @@ public class ObjectStore implements RawStore, Configurable {
openTransaction();
// Take the pattern and split it on the | to get all the composing
// patterns
- StringBuilder builder = new StringBuilder();
+ StringBuilder filterBuilder = new StringBuilder();
+ List<String> parameterVals = new ArrayList<>();
if (dbNames != null && !dbNames.equals("*")) {
- appendPatternCondition(builder, "database.name", dbNames);
+ appendPatternCondition(filterBuilder, "database.name", dbNames, parameterVals);
}
if (tableNames != null && !tableNames.equals("*")) {
- appendPatternCondition(builder, "tableName", tableNames);
+ appendPatternCondition(filterBuilder, "tableName", tableNames, parameterVals);
}
if (tableTypes != null && !tableTypes.isEmpty()) {
- appendSimpleCondition(builder, "tableType", tableTypes.toArray(new String[0]));
+ appendSimpleCondition(filterBuilder, "tableType", tableTypes.toArray(new String[0]), parameterVals);
}
- query = pm.newQuery(MTable.class, builder.toString());
- Collection<MTable> tables = (Collection<MTable>) query.execute();
+ query = pm.newQuery(MTable.class, filterBuilder.toString());
+ Collection<MTable> tables = (Collection<MTable>) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
for (MTable table : tables) {
TableMeta metaData = new TableMeta(
table.getDatabase().getName(), table.getTableName(), table.getTableType());
@@ -1403,29 +1358,29 @@ public class ObjectStore implements RawStore, Configurable {
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return metas;
}
+ private StringBuilder appendPatternCondition(StringBuilder filterBuilder, String fieldName,
+ String[] elements, List<String> parameterVals) {
+ return appendCondition(filterBuilder, fieldName, elements, true, parameterVals);
+ }
+
private StringBuilder appendPatternCondition(StringBuilder builder,
- String fieldName, String elements) {
+ String fieldName, String elements, List<String> parameters) {
elements = HiveStringUtils.normalizeIdentifier(elements);
- return appendCondition(builder, fieldName, elements.split("\\|"), true);
+ return appendCondition(builder, fieldName, elements.split("\\|"), true, parameters);
}
private StringBuilder appendSimpleCondition(StringBuilder builder,
- String fieldName, String[] elements) {
- return appendCondition(builder, fieldName, elements, false);
+ String fieldName, String[] elements, List<String> parameters) {
+ return appendCondition(builder, fieldName, elements, false, parameters);
}
private StringBuilder appendCondition(StringBuilder builder,
- String fieldName, String[] elements, boolean pattern) {
+ String fieldName, String[] elements, boolean pattern, List<String> parameters) {
if (builder.length() > 0) {
builder.append(" && ");
}
@@ -1435,14 +1390,15 @@ public class ObjectStore implements RawStore, Configurable {
if (pattern) {
element = "(?i)" + element.replaceAll("\\*", ".*");
}
+ parameters.add(element);
if (builder.length() > length) {
builder.append(" || ");
}
builder.append(fieldName);
if (pattern) {
- builder.append(".matches(\"").append(element).append("\")");
+ builder.append(".matches(").append(JDO_PARAM).append(parameters.size()).append(")");
} else {
- builder.append(" == \"").append(element).append("\"");
+ builder.append(" == ").append(JDO_PARAM).append(parameters.size());
}
}
builder.append(" )");
@@ -1488,12 +1444,7 @@ public class ObjectStore implements RawStore, Configurable {
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
nmtbl.mtbl = mtbl;
return nmtbl;
@@ -1536,15 +1487,10 @@ public class ObjectStore implements RawStore, Configurable {
}
committed = commitTransaction();
} finally {
- if (!committed) {
- rollbackTransaction();
- }
+ rollbackAndCleanup(committed, query);
if (dbExistsQuery != null) {
dbExistsQuery.closeAll();
}
- if (query != null) {
- query.closeAll();
- }
}
return tables;
}
@@ -2065,12 +2011,7 @@ public class ObjectStore implements RawStore, Configurable {
}
}
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return ret;
}
@@ -2302,10 +2243,7 @@ public class ObjectStore implements RawStore, Configurable {
success = commitTransaction();
return parts;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- queryWrapper.close();
+ rollbackAndCleanup(success, queryWrapper);
}
}
@@ -2407,6 +2345,7 @@ public class ObjectStore implements RawStore, Configurable {
for (Iterator i = names.iterator(); i.hasNext();) {
pns.add((String) i.next());
}
+
if (query != null) {
query.closeAll();
}
@@ -2501,10 +2440,7 @@ public class ObjectStore implements RawStore, Configurable {
}
success = commitTransaction();
} finally {
- if (!success) {
- rollbackTransaction();
- }
- queryWrapper.close();
+ rollbackAndCleanup(success, queryWrapper);
}
return partitions;
}
@@ -2526,10 +2462,7 @@ public class ObjectStore implements RawStore, Configurable {
}
success = commitTransaction();
} finally {
- if (!success) {
- rollbackTransaction();
- }
- queryWrapper.close();
+ rollbackAndCleanup(success, queryWrapper);
}
return partitionNames;
}
@@ -3294,12 +3227,7 @@ public class ObjectStore implements RawStore, Configurable {
success = commitTransaction();
LOG.debug("Done retrieving all objects for listTableNamesByFilter");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return tableNames;
}
@@ -3345,12 +3273,7 @@ public class ObjectStore implements RawStore, Configurable {
success = commitTransaction();
LOG.debug("Done retrieving all objects for listMPartitionNamesByFilter");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return partNames;
}
@@ -3571,10 +3494,7 @@ public class ObjectStore implements RawStore, Configurable {
success = commitTransaction();
LOG.debug("successfully deleted a CD in removeUnusedColumnDescriptor");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- queryWrapper.close();
+ rollbackAndCleanup(success, queryWrapper);
}
}
@@ -3658,12 +3578,7 @@ public class ObjectStore implements RawStore, Configurable {
constraintNameIfExists = (String) constraintExistsQuery.execute(name);
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (constraintExistsQuery != null) {
- constraintExistsQuery.closeAll();
- }
+ rollbackAndCleanup(commited, constraintExistsQuery);
}
return constraintNameIfExists != null && !constraintNameIfExists.isEmpty();
}
@@ -3911,12 +3826,7 @@ public class ObjectStore implements RawStore, Configurable {
pm.retrieve(midx);
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return midx;
}
@@ -3979,12 +3889,7 @@ public class ObjectStore implements RawStore, Configurable {
return indexes;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -4011,12 +3916,7 @@ public class ObjectStore implements RawStore, Configurable {
}
success = commitTransaction();
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return pns;
}
@@ -4139,12 +4039,7 @@ public class ObjectStore implements RawStore, Configurable {
pm.retrieve(mRoleMember);
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return mRoleMember;
}
@@ -4213,11 +4108,7 @@ public class ObjectStore implements RawStore, Configurable {
}
success = commitTransaction();
} finally {
- if (!success) {
- rollbackTransaction();
- }
-
- queryWrapper.close();
+ rollbackAndCleanup(success, queryWrapper);
}
return success;
}
@@ -4287,12 +4178,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listRoles");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
if (principalType == PrincipalType.USER) {
@@ -4358,7 +4244,6 @@ public class ObjectStore implements RawStore, Configurable {
mRoleMemebership = (List<MRoleMap>) query.execute(roleName, principalType.toString());
pm.retrieveAll(mRoleMemebership);
success = commitTransaction();
-
LOG.debug("Done retrieving all objects for listMSecurityPrincipalMembershipRole");
} finally {
if (!success) {
@@ -4392,12 +4277,7 @@ public class ObjectStore implements RawStore, Configurable {
pm.retrieve(mrole);
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return mrole;
}
@@ -4419,12 +4299,7 @@ public class ObjectStore implements RawStore, Configurable {
success = commitTransaction();
return roleNames;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -5250,12 +5125,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listRoleMembers");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mRoleMemeberList;
}
@@ -5306,12 +5176,7 @@ public class ObjectStore implements RawStore, Configurable {
userNameDbPriv.addAll(mPrivs);
}
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return userNameDbPriv;
}
@@ -5351,12 +5216,7 @@ public class ObjectStore implements RawStore, Configurable {
commited = commitTransaction();
return convertGlobal(userNameDbPriv);
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
}
@@ -5399,12 +5259,7 @@ public class ObjectStore implements RawStore, Configurable {
mSecurityDBList.addAll(mPrivs);
LOG.debug("Done retrieving all objects for listPrincipalDBGrants");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mSecurityDBList;
}
@@ -5527,12 +5382,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listAllTableGrants");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mSecurityTabList;
}
@@ -5559,12 +5409,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listTableAllPartitionGrants");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mSecurityTabPartList;
}
@@ -5592,12 +5437,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listTableAllColumnGrants");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mTblColPrivilegeList;
}
@@ -5626,12 +5466,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listTableAllPartitionColumnGrants");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mSecurityColList;
}
@@ -5674,7 +5509,6 @@ public class ObjectStore implements RawStore, Configurable {
private List<MDBPrivilege> listDatabaseGrants(String dbName, QueryWrapper queryWrapper) {
dbName = HiveStringUtils.normalizeIdentifier(dbName);
boolean success = false;
-
try {
LOG.debug("Executing listDatabaseGrants");
@@ -5782,12 +5616,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listAllTableGrants");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mSecurityTabPartList;
}
@@ -5847,12 +5676,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalPartitionGrants");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mSecurityTabPartList;
}
@@ -5916,12 +5740,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalTableColumnGrants");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mSecurityColList;
}
@@ -5983,12 +5802,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalPartitionColumnGrants");
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
return mSecurityColList;
}
@@ -6050,12 +5864,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalPartitionColumnGrantsAll");
return result;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -6083,12 +5892,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPartitionColumnGrantsAll");
return result;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -6163,12 +5967,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalAllTableGrants");
return result;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -6191,12 +5990,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalAllTableGrants");
return result;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -6236,7 +6030,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalAllPartitionGrants");
} finally {
if (!success) {
- rollbackTransaction();
+ rollbackTransaction();
}
}
return mSecurityTabPartList;
@@ -6268,12 +6062,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalPartitionGrantsAll");
return result;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -6299,12 +6088,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalPartitionGrantsAll");
return result;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -6382,12 +6166,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalTableColumnGrantsAll");
return result;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -6412,12 +6191,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done retrieving all objects for listPrincipalTableColumnGrantsAll");
return result;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -6494,12 +6268,7 @@ public class ObjectStore implements RawStore, Configurable {
LOG.debug("Done executing isPartitionMarkedForEvent");
return (partEvents != null && !partEvents.isEmpty()) ? true : false;
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
}
@@ -6553,7 +6322,6 @@ public class ObjectStore implements RawStore, Configurable {
public Collection<?> executeJDOQLSelect(String queryStr, QueryWrapper queryWrapper) {
boolean committed = false;
Collection<?> result = null;
-
try {
openTransaction();
Query query = queryWrapper.query = pm.newQuery(queryStr);
@@ -6594,12 +6362,7 @@ public class ObjectStore implements RawStore, Configurable {
return -1;
}
} finally {
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -6629,12 +6392,7 @@ public class ObjectStore implements RawStore, Configurable {
return null;
}
} finally {
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -6745,12 +6503,7 @@ public class ObjectStore implements RawStore, Configurable {
}
return retVal;
} finally {
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -6838,12 +6591,7 @@ public class ObjectStore implements RawStore, Configurable {
}
return retVal;
} finally {
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -6877,12 +6625,7 @@ public class ObjectStore implements RawStore, Configurable {
}
return retVal;
} finally {
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -6973,12 +6716,7 @@ public class ObjectStore implements RawStore, Configurable {
}
return retVal;
} finally {
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -7055,12 +6793,7 @@ public class ObjectStore implements RawStore, Configurable {
}
return retVal;
} finally {
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -7271,7 +7004,6 @@ public class ObjectStore implements RawStore, Configurable {
}
boolean committed = false;
-
try {
openTransaction();
@@ -7318,7 +7050,7 @@ public class ObjectStore implements RawStore, Configurable {
for (String colName : colNames) {
boolean foundCol = false;
for (FieldSchema mCol : colList) {
- if (mCol.getName().equals(colName.trim())) {
+ if (mCol.getName().equals(colName)) {
foundCol = true;
break;
}
@@ -7430,13 +7162,16 @@ public class ObjectStore implements RawStore, Configurable {
@Override
public AggrStats get_aggr_stats_for(String dbName, String tblName,
final List<String> partNames, final List<String> colNames) throws MetaException, NoSuchObjectException {
- final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(), HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
+ final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(),
+ HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
+ final double ndvTuner = HiveConf.getFloatVar(getConf(),
+ HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
return new GetHelper<AggrStats>(dbName, tblName, true, false) {
@Override
protected AggrStats getSqlResult(GetHelper<AggrStats> ctx)
throws MetaException {
return directSql.aggrColStatsForPartitions(dbName, tblName, partNames,
- colNames, useDensityFunctionForNDVEstimation);
+ colNames, useDensityFunctionForNDVEstimation, ndvTuner);
}
@Override
protected AggrStats getJdoResult(GetHelper<AggrStats> ctx)
@@ -7454,6 +7189,38 @@ public class ObjectStore implements RawStore, Configurable {
}
@Override
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException {
+ final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(),
+ HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
+ final double ndvTuner = HiveConf.getFloatVar(getConf(),
+ HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
+ return new GetHelper<Map<String, ColumnStatisticsObj>>(dbName, tableName, true, false) {
+ @Override
+ protected Map<String, ColumnStatisticsObj> getSqlResult(
+ GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException {
+ return directSql.getAggrColStatsForTablePartitions(dbName, tblName,
+ useDensityFunctionForNDVEstimation, ndvTuner);
+ }
+
+ @Override
+ protected Map<String, ColumnStatisticsObj> getJdoResult(
+ GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException,
+ NoSuchObjectException {
+ // This is fast path for query optimizations, if we can find this info
+ // quickly using directSql, do it. No point in failing back to slow path
+ // here.
+ throw new MetaException("Jdo path is not implemented for stats aggr.");
+ }
+
+ @Override
+ protected String describeResult() {
+ return null;
+ }
+ }.run(true);
+ }
+
+ @Override
public void flushCache() {
// NOP as there's no caching
}
@@ -7466,7 +7233,12 @@ public class ObjectStore implements RawStore, Configurable {
try {
openTransaction();
// We are not going to verify SD for each partition. Just verify for the table.
- validateTableCols(table, colNames);
+ // ToDo: we need verify the partition column instead
+ try {
+ validateTableCols(table, colNames);
+ } catch (MetaException me) {
+ LOG.warn("The table does not have the same column definition as its partition.");
+ }
Query query = queryWrapper.query = pm.newQuery(MPartitionColumnStatistics.class);
String paramStr = "java.lang.String t1, java.lang.String t2";
String filter = "tableName == t1 && dbName == t2 && (";
@@ -7593,12 +7365,7 @@ public class ObjectStore implements RawStore, Configurable {
rollbackTransaction();
throw e;
} finally {
- if (!ret) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(ret, query);
}
return ret;
}
@@ -7668,12 +7435,7 @@ public class ObjectStore implements RawStore, Configurable {
rollbackTransaction();
throw e;
} finally {
- if (!ret) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(ret, query);
}
return ret;
}
@@ -7695,12 +7457,7 @@ public class ObjectStore implements RawStore, Configurable {
delCnt = query.deletePersistentAll(curTime, expiryTime);
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
LOG.debug("Done executing cleanupEvents");
}
return delCnt;
@@ -7804,12 +7561,7 @@ public class ObjectStore implements RawStore, Configurable {
return tokenIdents;
} finally {
LOG.debug("Done executing getAllTokenIdentifers with status : " + committed);
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -7852,12 +7604,7 @@ public class ObjectStore implements RawStore, Configurable {
}
committed = commitTransaction();
} finally {
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
LOG.debug("Done executing updateMasterKey with status : " + committed);
if (null == masterKey) {
@@ -7885,12 +7632,7 @@ public class ObjectStore implements RawStore, Configurable {
}
success = commitTransaction();
} finally {
- if (!success) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(success, query);
}
LOG.debug("Done executing removeMasterKey with status : " + success);
return (null != masterKey) && success;
@@ -7916,12 +7658,7 @@ public class ObjectStore implements RawStore, Configurable {
return masterKeys;
} finally {
LOG.debug("Done executing getMasterKeys with status : " + committed);
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -8033,12 +7770,7 @@ public class ObjectStore implements RawStore, Configurable {
}
return mVerTables.get(0);
} finally {
- if (!committed) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(committed, query);
}
}
@@ -8264,12 +7996,7 @@ public class ObjectStore implements RawStore, Configurable {
pm.retrieve(mfunc);
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return mfunc;
}
@@ -8317,37 +8044,23 @@ public class ObjectStore implements RawStore, Configurable {
dbName = HiveStringUtils.normalizeIdentifier(dbName);
// Take the pattern and split it on the | to get all the composing
// patterns
- String[] subpatterns = pattern.trim().split("\\|");
- String queryStr =
- "select functionName from org.apache.hadoop.hive.metastore.model.MFunction "
- + "where database.name == dbName && (";
- boolean first = true;
- for (String subpattern : subpatterns) {
- subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*");
- if (!first) {
- queryStr = queryStr + " || ";
- }
- queryStr = queryStr + " functionName.matches(\"" + subpattern + "\")";
- first = false;
+ List<String> parameterVals = new ArrayList<>();
+ StringBuilder filterBuilder = new StringBuilder();
+ appendSimpleCondition(filterBuilder, "database.name", new String[] { dbName }, parameterVals);
+ if(pattern != null) {
+ appendPatternCondition(filterBuilder, "functionName", pattern, parameterVals);
}
- queryStr = queryStr + ")";
- query = pm.newQuery(queryStr);
- query.declareParameters("java.lang.String dbName");
+ query = pm.newQuery(MFunction.class, filterBuilder.toString());
query.setResult("functionName");
query.setOrdering("functionName ascending");
- Collection names = (Collection) query.execute(dbName);
+ Collection names = (Collection) query.executeWithArray(parameterVals.toArray(new String[parameterVals.size()]));
funcs = new ArrayList<String>();
for (Iterator i = names.iterator(); i.hasNext();) {
funcs.add((String) i.next());
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return funcs;
}
@@ -8356,6 +8069,9 @@ public class ObjectStore implements RawStore, Configurable {
public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) {
boolean commited = false;
Query query = null;
+
+ NotificationEventResponse result = new NotificationEventResponse();
+ result.setEvents(new ArrayList<NotificationEvent>());
try {
openTransaction();
long lastEvent = rqst.getLastEvent();
@@ -8365,11 +8081,9 @@ public class ObjectStore implements RawStore, Configurable {
Collection<MNotificationLog> events = (Collection) query.execute(lastEvent);
commited = commitTransaction();
if (events == null) {
- return null;
+ return result;
}
Iterator<MNotificationLog> i = events.iterator();
- NotificationEventResponse result = new NotificationEventResponse();
- result.setEvents(new ArrayList<NotificationEvent>());
int maxEvents = rqst.getMaxEvents() > 0 ? rqst.getMaxEvents() : Integer.MAX_VALUE;
int numEvents = 0;
while (i.hasNext() && numEvents++ < maxEvents) {
@@ -8377,11 +8091,8 @@ public class ObjectStore implements RawStore, Configurable {
}
return result;
} finally {
- if (query != null) {
- query.closeAll();
- }
if (!commited) {
- rollbackTransaction();
+ rollbackAndCleanup(commited, query);
return null;
}
}
@@ -8411,12 +8122,7 @@ public class ObjectStore implements RawStore, Configurable {
pm.makePersistent(translateThriftToDb(entry));
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
}
@@ -8436,12 +8142,7 @@ public class ObjectStore implements RawStore, Configurable {
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
}
@@ -8460,12 +8161,7 @@ public class ObjectStore implements RawStore, Configurable {
commited = commitTransaction();
return new CurrentNotificationEventId(id);
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
}
@@ -8533,20 +8229,99 @@ public class ObjectStore implements RawStore, Configurable {
*/
public static void unCacheDataNucleusClassLoaders() {
PersistenceManagerFactory pmf = ObjectStore.getPMF();
- if ((pmf != null) && (pmf instanceof JDOPersistenceManagerFactory)) {
- JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
- NucleusContext nc = jdoPmf.getNucleusContext();
- try {
- Field classLoaderResolverMap = AbstractNucleusContext.class.getDeclaredField(
- "classLoaderResolverMap");
- classLoaderResolverMap.setAccessible(true);
- classLoaderResolverMap.set(nc, new HashMap<String, ClassLoaderResolver>());
- LOG.debug("Removed cached classloaders from DataNucleus NucleusContext");
- } catch (Exception e) {
- LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext ", e);
+ clearOutPmfClassLoaderCache(pmf);
+ }
+
+ private static void clearOutPmfClassLoaderCache(PersistenceManagerFactory pmf) {
+ if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) {
+ return;
+ }
+ // NOTE : This is hacky, and this section of code is fragile depending on DN code varnames
+ // so it's likely to stop working at some time in the future, especially if we upgrade DN
+ // versions, so we actively need to find a better way to make sure the leak doesn't happen
+ // instead of just clearing out the cache after every call.
+ JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
+ NucleusContext nc = jdoPmf.getNucleusContext();
+ try {
+ Field pmCache = pmf.getClass().getDeclaredField("pmCache");
+ pmCache.setAccessible(true);
+ Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>)pmCache.get(pmf);
+ for (JDOPersistenceManager pm : pmSet) {
+ org.datanucleus.ExecutionContext ec = (org.datanucleus.ExecutionContext)pm.getExecutionContext();
+ if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) {
+ ClassLoaderResolver clr = ((org.datanucleus.ExecutionContextThreadedImpl)ec).getClassLoaderResolver();
+ clearClr(clr);
+ }
+ }
+ org.datanucleus.plugin.PluginManager pluginManager = jdoPmf.getNucleusContext().getPluginManager();
+ Field registryField = pluginManager.getClass().getDeclaredField("registry");
+ registryField.setAccessible(true);
+ org.datanucleus.plugin.PluginRegistry registry = (org.datanucleus.plugin.PluginRegistry)registryField.get(pluginManager);
+ if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry) {
+ org.datanucleus.plugin.NonManagedPluginRegistry nRegistry = (org.datanucleus.plugin.NonManagedPluginRegistry)registry;
+ Field clrField = nRegistry.getClass().getDeclaredField("clr");
+ clrField.setAccessible(true);
+ ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(nRegistry);
+ clearClr(clr);
+ }
+ if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) {
+ org.datanucleus.PersistenceNucleusContextImpl pnc = (org.datanucleus.PersistenceNucleusContextImpl)nc;
+ org.datanucleus.store.types.TypeManagerImpl tm = (org.datanucleus.store.types.TypeManagerImpl)pnc.getTypeManager();
+ Field clrField = tm.getClass().getDeclaredField("clr");
+ clrField.setAccessible(true);
+ ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(tm);
+ clearClr(clr);
+ Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr");
+ storeMgrField.setAccessible(true);
+ org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr = (org.datanucleus.store.rdbms.RDBMSStoreManager)storeMgrField.get(pnc);
+ Field backingStoreField = storeMgr.getClass().getDeclaredField("backingStoreByMemberName");
+ backingStoreField.setAccessible(true);
+ Map<String, Store> backingStoreByMemberName = (Map<String, Store>)backingStoreField.get(storeMgr);
+ for (Store store : backingStoreByMemberName.values()) {
+ org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore = (org.datanucleus.store.rdbms.scostore.BaseContainerStore)store;
+ clrField = org.datanucleus.store.rdbms.scostore.BaseContainerStore.class.getDeclaredField("clr");
+ clrField.setAccessible(true);
+ clr = (ClassLoaderResolver)clrField.get(baseStore);
+ clearClr(clr);
+ }
+ }
+ Field classLoaderResolverMap = AbstractNucleusContext.class.getDeclaredField(
+ "classLoaderResolverMap");
+ classLoaderResolverMap.setAccessible(true);
+ Map<String,ClassLoaderResolver> loaderMap =
+ (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc);
+ for (ClassLoaderResolver clr : loaderMap.values()){
+ clearClr(clr);
+ }
+ classLoaderResolverMap.set(nc, new HashMap<String, ClassLoaderResolver>());
+ LOG.debug("Removed cached classloaders from DataNucleus NucleusContext");
+ } catch (Exception e) {
+ LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext ", e);
+ }
+ }
+
+ private static void clearClr(ClassLoaderResolver clr) throws Exception {
+ if (clr != null){
+ if (clr instanceof ClassLoaderResolverImpl){
+ ClassLoaderResolverImpl clri = (ClassLoaderResolverImpl) clr;
+ long resourcesCleared = clearFieldMap(clri,"resources");
+ long loadedClassesCleared = clearFieldMap(clri,"loadedClasses");
+ long unloadedClassesCleared = clearFieldMap(clri, "unloadedClasses");
+ LOG.debug("Cleared ClassLoaderResolverImpl: " +
+ resourcesCleared + "," + loadedClassesCleared + "," + unloadedClassesCleared);
}
}
}
+ private static long clearFieldMap(ClassLoaderResolverImpl clri, String mapFieldName) throws Exception {
+ Field mapField = ClassLoaderResolverImpl.class.getDeclaredField(mapFieldName);
+ mapField.setAccessible(true);
+
+ Map<String,Class> map = (Map<String, Class>) mapField.get(clri);
+ long sz = map.size();
+ mapField.set(clri, Collections.synchronizedMap(new WeakValueMap()));
+ return sz;
+ }
+
@Override
public List<SQLPrimaryKey> getPrimaryKeys(String db_name, String tbl_name) throws MetaException {
@@ -8557,10 +8332,12 @@ public class ObjectStore implements RawStore, Configurable {
}
}
- protected List<SQLPrimaryKey> getPrimaryKeysInternal(final String db_name,
- final String tbl_name,
+ protected List<SQLPrimaryKey> getPrimaryKeysInternal(final String db_name_input,
+ final String tbl_name_input,
boolean allowSql, boolean allowJdo)
throws MetaException, NoSuchObjectException {
+ final String db_name = HiveStringUtils.normalizeIdentifier(db_name_input);
+ final String tbl_name = HiveStringUtils.normalizeIdentifier(tbl_name_input);
return new GetListHelper<SQLPrimaryKey>(db_name, tbl_name, allowSql, allowJdo) {
@Override
@@ -8603,12 +8380,7 @@ public class ObjectStore implements RawStore, Configurable {
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return primaryKeys;
}
@@ -8633,12 +8405,7 @@ public class ObjectStore implements RawStore, Configurable {
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return ret;
}
@@ -8654,9 +8421,13 @@ public class ObjectStore implements RawStore, Configurable {
}
}
- protected List<SQLForeignKey> getForeignKeysInternal(final String parent_db_name,
- final String parent_tbl_name, final String foreign_db_name, final String foreign_tbl_name,
- boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException {
+ protected List<SQLForeignKey> getForeignKeysInternal(final String parent_db_name_input,
+ final String parent_tbl_name_input, final String foreign_db_name_input,
+ final String foreign_tbl_name_input, boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException {
+ final String parent_db_name = parent_db_name_input;
+ final String parent_tbl_name = parent_tbl_name_input;
+ final String foreign_db_name = foreign_db_name_input;
+ final String foreign_tbl_name = foreign_tbl_name_input;
return new GetListHelper<SQLForeignKey>(foreign_db_name, foreign_tbl_name, allowSql, allowJdo) {
@Override
@@ -8757,12 +8528,7 @@ public class ObjectStore implements RawStore, Configurable {
}
commited = commitTransaction();
} finally {
- if (!commited) {
- rollbackTransaction();
- }
- if (query != null) {
- query.closeAll();
- }
+ rollbackAndCleanup(commited, query);
}
return foreignKeys;
}
@@ -8790,6 +8556,46 @@ public class ObjectStore implements RawStore, Configurable {
}
}
+ /**
+ * This is a cleanup method which is used to rollback a active transaction
+ * if the success flag is false and close the associated Query object. This method is used
+ * internally and visible for testing purposes only
+ * @param success Rollback the current active transaction if false
+ * @param query Query object which needs to be closed
+ */
+ @VisibleForTesting
+ void rollbackAndCleanup(boolean success, Query query) {
+ try {
+ if (!success) {
+ rollbackTransaction();
+ }
+ } finally {
+ if (query != null) {
+ query.closeAll();
+ }
+ }
+ }
+
+ /**
+ * This is a cleanup method which is used to rollback a active transaction
+ * if the success flag is false and close the associated QueryWrapper object. This method is used
+ * internally and visible for testing purposes only
+ * @param success Rollback the current active transaction if false
+ * @param queryWrapper QueryWrapper object which needs to be closed
+ */
+ @VisibleForTesting
+ void rollbackAndCleanup(boolean success, QueryWrapper queryWrapper) {
+ try {
+ if (!success) {
+ rollbackTransaction();
+ }
+ } finally {
+ if (queryWrapper != null) {
+ queryWrapper.close();
+ }
+ }
+ }
+
@Override
public void createTableWrite(Table tbl, long writeId, char state, long heartbeat) {
boolean success = false;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index 63b696d..ded978c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -589,6 +590,17 @@ public interface RawStore extends Configurable {
List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException;
/**
+ * Get all partition column statistics for a table
+ * @param dbName
+ * @param tableName
+ * @return Map of partition column statistics
+ * @throws MetaException
+ * @throws NoSuchObjectException
+ */
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException;
+
+ /**
* Get the next notification event.
* @param rqst Request containing information on the last processed notification.
* @return list of notifications, sorted by eventId