You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:58 UTC
[11/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
new file mode 100644
index 0000000..7beee42
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.cache;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper;
+import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
+import org.apache.hadoop.hive.metastore.hbase.HBaseUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class SharedCache {
+ private static Map<String, Database> databaseCache = new TreeMap<String, Database>();
+ private static Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>();
+ private static Map<String, PartitionWrapper> partitionCache = new TreeMap<String, PartitionWrapper>();
+ private static Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<String, ColumnStatisticsObj>();
+ private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>();
+ private static MessageDigest md;
+
+ static {
+ try {
+ md = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("should not happen", e);
+ }
+ }
+
+ public static synchronized Database getDatabaseFromCache(String name) {
+ return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null;
+ }
+
+ public static synchronized void addDatabaseToCache(String dbName, Database db) {
+ Database dbCopy = db.deepCopy();
+ dbCopy.setName(HiveStringUtils.normalizeIdentifier(dbName));
+ databaseCache.put(dbName, dbCopy);
+ }
+
+ public static synchronized void removeDatabaseFromCache(String dbName) {
+ databaseCache.remove(dbName);
+ }
+
+ public static synchronized List<String> listCachedDatabases() {
+ return new ArrayList<String>(databaseCache.keySet());
+ }
+
+ public static synchronized void alterDatabaseInCache(String dbName, Database newDb) {
+ removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName));
+ addDatabaseToCache(HiveStringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy());
+ }
+
+ public static synchronized int getCachedDatabaseCount() {
+ return databaseCache.size();
+ }
+
+ public static synchronized Table getTableFromCache(String dbName, String tableName) {
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName));
+ if (tblWrapper == null) {
+ return null;
+ }
+ Table t = CacheUtils.assemble(tblWrapper);
+ return t;
+ }
+
+ public static synchronized void addTableToCache(String dbName, String tblName, Table tbl) {
+ Table tblCopy = tbl.deepCopy();
+ tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName));
+ tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName));
+ for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+ fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName()));
+ }
+ TableWrapper wrapper;
+ if (tbl.getSd()!=null) {
+ byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md);
+ StorageDescriptor sd = tbl.getSd();
+ increSd(sd, sdHash);
+ tblCopy.setSd(null);
+ wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), sd.getParameters());
+ } else {
+ wrapper = new TableWrapper(tblCopy, null, null, null);
+ }
+ tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper);
+ }
+
+ public static synchronized void removeTableFromCache(String dbName, String tblName) {
+ TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName));
+ byte[] sdHash = tblWrapper.getSdHash();
+ if (sdHash!=null) {
+ decrSd(sdHash);
+ }
+ }
+
+ public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) {
+ removeTableFromCache(dbName, tblName);
+ addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()),
+ HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+ if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
+ List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
+ for (Partition part : partitions) {
+ removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues());
+ part.setDbName(HiveStringUtils.normalizeIdentifier(newTable.getDbName()));
+ part.setTableName(HiveStringUtils.normalizeIdentifier(newTable.getTableName()));
+ addPartitionToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()),
+ HiveStringUtils.normalizeIdentifier(newTable.getTableName()), part);
+ }
+ }
+ }
+
+ public static synchronized int getCachedTableCount() {
+ return tableCache.size();
+ }
+
+ public static synchronized List<Table> listCachedTables(String dbName) {
+ List<Table> tables = new ArrayList<Table>();
+ for (TableWrapper wrapper : tableCache.values()) {
+ if (wrapper.getTable().getDbName().equals(dbName)) {
+ tables.add(CacheUtils.assemble(wrapper));
+ }
+ }
+ return tables;
+ }
+
+ public static synchronized void updateTableColumnStatistics(String dbName, String tableName,
+ List<ColumnStatisticsObj> statsObjs) {
+ Table tbl = getTableFromCache(dbName, tableName);
+ tbl.getSd().getParameters();
+ List<String> colNames = new ArrayList<>();
+ for (ColumnStatisticsObj statsObj:statsObjs) {
+ colNames.add(statsObj.getColName());
+ }
+ StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
+ alterTableInCache(dbName, tableName, tbl);
+ }
+
+ public static synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) {
+ List<TableMeta> tableMetas = new ArrayList<TableMeta>();
+ for (String dbName : listCachedDatabases()) {
+ if (CacheUtils.matches(dbName, dbNames)) {
+ for (Table table : listCachedTables(dbName)) {
+ if (CacheUtils.matches(table.getTableName(), tableNames)) {
+ if (tableTypes==null || tableTypes.contains(table.getTableType())) {
+ TableMeta metaData = new TableMeta(
+ dbName, table.getTableName(), table.getTableType());
+ metaData.setComments(table.getParameters().get("comment"));
+ tableMetas.add(metaData);
+ }
+ }
+ }
+ }
+ }
+ return tableMetas;
+ }
+
+ public static synchronized void addPartitionToCache(String dbName, String tblName, Partition part) {
+ Partition partCopy = part.deepCopy();
+ PartitionWrapper wrapper;
+ if (part.getSd()!=null) {
+ byte[] sdHash = HBaseUtils.hashStorageDescriptor(part.getSd(), md);
+ StorageDescriptor sd = part.getSd();
+ increSd(sd, sdHash);
+ partCopy.setSd(null);
+ wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
+ } else {
+ wrapper = new PartitionWrapper(partCopy, null, null, null);
+ }
+ partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper);
+ }
+
+ public static synchronized Partition getPartitionFromCache(String key) {
+ PartitionWrapper wrapper = partitionCache.get(key);
+ if (wrapper == null) {
+ return null;
+ }
+ Partition p = CacheUtils.assemble(wrapper);
+ return p;
+ }
+
+ public static synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
+ return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals));
+ }
+
+ public static synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
+ return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals));
+ }
+
+ public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List<String> part_vals) {
+ PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
+ if (wrapper.getSdHash()!=null) {
+ decrSd(wrapper.getSdHash());
+ }
+ return wrapper.getPartition();
+ }
+
+ public static synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
+ List<Partition> partitions = new ArrayList<Partition>();
+ int count = 0;
+ for (PartitionWrapper wrapper : partitionCache.values()) {
+ if (wrapper.getPartition().getDbName().equals(dbName)
+ && wrapper.getPartition().getTableName().equals(tblName)
+ && (max == -1 || count < max)) {
+ partitions.add(CacheUtils.assemble(wrapper));
+ count++;
+ }
+ }
+ return partitions;
+ }
+
+ public static synchronized void alterPartitionInCache(String dbName, String tblName, List<String> partVals, Partition newPart) {
+ removePartitionFromCache(dbName, tblName, partVals);
+ addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
+ HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
+ }
+
+ public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName,
+ List<String> partVals, List<ColumnStatisticsObj> statsObjs) {
+ Partition part = getPartitionFromCache(dbName, tableName, partVals);
+ part.getSd().getParameters();
+ List<String> colNames = new ArrayList<>();
+ for (ColumnStatisticsObj statsObj:statsObjs) {
+ colNames.add(statsObj.getColName());
+ }
+ StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
+ alterPartitionInCache(dbName, tableName, partVals, part);
+ }
+
+ public static synchronized int getCachedPartitionCount() {
+ return partitionCache.size();
+ }
+
+ public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) {
+ return partitionColStatsCache.get(key);
+ }
+
+ public static synchronized void addPartitionColStatsToCache(Map<String, ColumnStatisticsObj> aggrStatsPerPartition) {
+ partitionColStatsCache.putAll(aggrStatsPerPartition);
+ }
+
+
+ public static void increSd(StorageDescriptor sd, byte[] sdHash) {
+ ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
+ if (sdCache.containsKey(byteArray)) {
+ sdCache.get(byteArray).refCount++;
+ } else {
+ StorageDescriptor sdToCache = sd.deepCopy();
+ sdToCache.setLocation(null);
+ sdToCache.setParameters(null);
+ sdCache.put(byteArray, new StorageDescriptorWrapper(sdToCache, 1));
+ }
+ }
+
+ public static void decrSd(byte[] sdHash) {
+ ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
+ StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray);
+ sdWrapper.refCount--;
+ if (sdWrapper.getRefCount() == 0) {
+ sdCache.remove(byteArray);
+ }
+ }
+
+ public static StorageDescriptor getSdFromCache(byte[] sdHash) {
+ StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash));
+ return sdWrapper.getSd();
+ }
+
+ // Replace databases in databaseCache with the new list
+ public static synchronized void refreshDatabases(List<Database> databases) {
+ for (String dbName : listCachedDatabases()) {
+ removeDatabaseFromCache(dbName);
+ }
+ for (Database db : databases) {
+ addDatabaseToCache(db.getName(), db);
+ }
+ }
+
+ // Replace tables in tableCache with the new list
+ public static synchronized void refreshTables(String dbName, List<Table> tables) {
+ for (Table tbl : listCachedTables(dbName)) {
+ removeTableFromCache(dbName, tbl.getTableName());
+ }
+ for (Table tbl : tables) {
+ addTableToCache(dbName, tbl.getTableName(), tbl);
+ }
+ }
+
+ public static void refreshPartitions(String dbName, String tblName, List<Partition> partitions) {
+ List<String> keysToRemove = new ArrayList<String>();
+ for (Map.Entry<String, PartitionWrapper> entry : partitionCache.entrySet()) {
+ if (entry.getValue().getPartition().getDbName().equals(dbName)
+ && entry.getValue().getPartition().getTableName().equals(tblName)) {
+ keysToRemove.add(entry.getKey());
+ }
+ }
+ for (String key : keysToRemove) {
+ partitionCache.remove(key);
+ }
+ for (Partition part : partitions) {
+ addPartitionToCache(dbName, tblName, part);
+ }
+ }
+
+ @VisibleForTesting
+ static Map<String, Database> getDatabaseCache() {
+ return databaseCache;
+ }
+
+ @VisibleForTesting
+ static Map<String, TableWrapper> getTableCache() {
+ return tableCache;
+ }
+
+ @VisibleForTesting
+ static Map<String, PartitionWrapper> getPartitionCache() {
+ return partitionCache;
+ }
+
+ @VisibleForTesting
+ static Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() {
+ return sdCache;
+ }
+
+ @VisibleForTesting
+ static Map<String, ColumnStatisticsObj> getPartitionColStatsCache() {
+ return partitionColStatsCache;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java
index 8edb50b..e5b8495 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java
@@ -27,13 +27,15 @@ public class AlterPartitionEvent extends ListenerEvent {
private final Partition oldPart;
private final Partition newPart;
private final Table table;
+ private final boolean isTruncateOp;
- public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table,
- boolean status, HMSHandler handler) {
+ public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, boolean isTruncateOp,
+ boolean status, HMSHandler handler) {
super(status, handler);
this.oldPart = oldPart;
this.newPart = newPart;
this.table = table;
+ this.isTruncateOp = isTruncateOp;
}
/**
@@ -58,4 +60,12 @@ public class AlterPartitionEvent extends ListenerEvent {
public Table getTable() {
return table;
}
+
+ /**
+ * Get the truncate table flag
+ * @return
+ */
+ public boolean getIsTruncateOp() {
+ return isTruncateOp;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
index 4d6dce2..22ea513 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java
@@ -26,10 +26,13 @@ public class AlterTableEvent extends ListenerEvent {
private final Table newTable;
private final Table oldTable;
- public AlterTableEvent (Table oldTable, Table newTable, boolean status, HMSHandler handler) {
+ private final boolean isTruncateOp;
+
+ public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, boolean status, HMSHandler handler) {
super (status, handler);
this.oldTable = oldTable;
this.newTable = newTable;
+ this.isTruncateOp = isTruncateOp;
}
/**
@@ -45,4 +48,11 @@ public class AlterTableEvent extends ListenerEvent {
public Table getNewTable() {
return newTable;
}
+
+ /**
+ * @return the flag for truncate
+ */
+ public boolean getIsTruncateOp() {
+ return isTruncateOp;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
index 7bc0e04..dff1195 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
@@ -38,6 +38,7 @@ public class InsertEvent extends ListenerEvent {
private final String db;
private final String table;
private final Map<String, String> keyValues;
+ private final boolean replace;
private final List<String> files;
private List<String> fileChecksums = new ArrayList<String>();
@@ -56,6 +57,9 @@ public class InsertEvent extends ListenerEvent {
super(status, handler);
this.db = db;
this.table = table;
+
+ // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility
+ this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true);
this.files = insertData.getFilesAdded();
GetTableRequest req = new GetTableRequest(db, table);
req.setCapabilities(HiveMetaStoreClient.TEST_VERSION);
@@ -90,6 +94,13 @@ public class InsertEvent extends ListenerEvent {
}
/**
+ * @return The replace flag.
+ */
+ public boolean isReplace() {
+ return replace;
+ }
+
+ /**
* Get list of files created as a result of this DML operation
*
* @return list of new files
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
index 62aeb8c..b741549 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
@@ -21,10 +21,18 @@ package org.apache.hadoop.hive.metastore.events;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Base class for all the events which are defined for metastore.
+ *
+ * This class is not thread-safe and not expected to be called in parallel.
*/
+@NotThreadSafe
public abstract class ListenerEvent {
/**
@@ -33,6 +41,26 @@ public abstract class ListenerEvent {
private final boolean status;
private final HMSHandler handler;
+ /**
+ * Key/value parameters used by listeners to store notifications results
+ * i.e. DbNotificationListener sets a DB_NOTIFICATION_EVENT_ID.
+ *
+ * NotThreadSafe: The parameters map is not expected to be access in parallel by Hive, so keep it thread-unsafe
+ * to avoid locking overhead.
+ */
+ private Map<String, String> parameters;
+
+ /** For performance concerns, it is preferable to cache the unmodifiable parameters variable that will be returned on the
+ * {@link #getParameters()} method. It is expected that {@link #putParameter(String, String)} is called less times
+ * than {@link #getParameters()}, so performance may be better by using this cache.
+ */
+ private Map<String, String> unmodifiableParameters;
+
+ // Listener parameters aren't expected to have many values. So far only
+ // DbNotificationListener will add a parameter; let's set a low initial capacity for now.
+ // If we find out many parameters are added, then we can adjust or remove this initial capacity.
+ private static final int PARAMETERS_INITIAL_CAPACITY = 1;
+
// Properties passed by the client, to be used in execution hooks.
private EnvironmentContext environmentContext = null;
@@ -40,6 +68,8 @@ public abstract class ListenerEvent {
super();
this.status = status;
this.handler = handler;
+ this.parameters = new HashMap<>(PARAMETERS_INITIAL_CAPACITY);
+ updateUnmodifiableParameters();
}
/**
@@ -49,6 +79,12 @@ public abstract class ListenerEvent {
return status;
}
+ /**
+ * Set the environment context of the event.
+ *
+ * @param environmentContext An EnvironmentContext object that contains environment parameters sent from
+ * the HMS client.
+ */
public void setEnvironmentContext(EnvironmentContext environmentContext) {
this.environmentContext = environmentContext;
}
@@ -66,4 +102,74 @@ public abstract class ListenerEvent {
public HMSHandler getHandler() {
return handler;
}
+
+ /**
+ * Return all parameters of the listener event. Parameters are read-only (unmodifiable map). If a new parameter
+ * must be added, please use the putParameter() method.
+ *
+ *
+ * @return A map object with all parameters.
+ */
+ public final Map<String, String> getParameters() {
+ return unmodifiableParameters;
+ }
+
+ /**
+ * Put a new parameter to the listener event.
+ *
+ * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+ * between listeners setting the same parameters.
+ *
+ * @param name Name of the parameter.
+ * @param value Value of the parameter.
+ * @throws IllegalStateException if a parameter already exists.
+ */
+ public void putParameter(String name, String value) {
+ putParameterIfAbsent(name, value);
+ updateUnmodifiableParameters();
+ }
+
+ /**
+ * Put a new set the parameters to the listener event.
+ *
+ * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+ * between listeners setting the same parameters.
+ *
+ * @param parameters A Map object with the a set of parameters.
+ * @throws IllegalStateException if a parameter already exists.
+ */
+ public void putParameters(final Map<String, String> parameters) {
+ if (parameters != null) {
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ putParameterIfAbsent(entry.getKey(), entry.getValue());
+ }
+
+ updateUnmodifiableParameters();
+ }
+ }
+
+ /**
+ * Put a parameter to the listener event only if the parameter is absent.
+ *
+ * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+ * between listeners setting the same parameters.
+ *
+ * @param name Name of the parameter.
+ * @param value Value of the parameter.
+ * @throws IllegalStateException if a parameter already exists.
+ */
+ private void putParameterIfAbsent(String name, String value) {
+ if (parameters.containsKey(name)) {
+ throw new IllegalStateException("Invalid attempt to overwrite a read-only parameter: " + name);
+ }
+
+ parameters.put(name, value);
+ }
+
+ /**
+ * Keeps a cache of unmodifiable parameters returned by the getParameters() method.
+ */
+ private void updateUnmodifiableParameters() {
+ unmodifiableParameters = Collections.unmodifiableMap(parameters);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 1340645..945e99e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -2708,6 +2708,8 @@ public class HBaseStore implements RawStore {
@Override
public List<SQLPrimaryKey> getPrimaryKeys(String db_name, String tbl_name) throws MetaException {
+ db_name = HiveStringUtils.normalizeIdentifier(db_name);
+ tbl_name = HiveStringUtils.normalizeIdentifier(tbl_name);
boolean commit = false;
openTransaction();
try {
@@ -2726,6 +2728,10 @@ public class HBaseStore implements RawStore {
public List<SQLForeignKey> getForeignKeys(String parent_db_name, String parent_tbl_name,
String foreign_db_name, String foreign_tbl_name)
throws MetaException {
+ parent_db_name = parent_db_name!=null?HiveStringUtils.normalizeIdentifier(parent_db_name):null;
+ parent_tbl_name = parent_tbl_name!=null?HiveStringUtils.normalizeIdentifier(parent_tbl_name):null;
+ foreign_db_name = HiveStringUtils.normalizeIdentifier(foreign_db_name);
+ foreign_tbl_name = HiveStringUtils.normalizeIdentifier(foreign_tbl_name);
boolean commit = false;
openTransaction();
try {
@@ -2770,6 +2776,9 @@ public class HBaseStore implements RawStore {
// This is something of pain, since we have to search both primary key and foreign key to see
// which they want to drop.
boolean commit = false;
+ dbName = HiveStringUtils.normalizeIdentifier(dbName);
+ tableName = HiveStringUtils.normalizeIdentifier(tableName);
+ constraintName = HiveStringUtils.normalizeIdentifier(constraintName);
openTransaction();
try {
List<SQLPrimaryKey> pk = getHBase().getPrimaryKey(dbName, tableName);
@@ -2809,6 +2818,12 @@ public class HBaseStore implements RawStore {
@Override
public void addPrimaryKeys(List<SQLPrimaryKey> pks) throws InvalidObjectException, MetaException {
boolean commit = false;
+ for (SQLPrimaryKey pk : pks) {
+ pk.setTable_db(HiveStringUtils.normalizeIdentifier(pk.getTable_db()));
+ pk.setTable_name(HiveStringUtils.normalizeIdentifier(pk.getTable_name()));
+ pk.setColumn_name(HiveStringUtils.normalizeIdentifier(pk.getColumn_name()));
+ pk.setPk_name(HiveStringUtils.normalizeIdentifier(pk.getPk_name()));
+ }
openTransaction();
try {
List<SQLPrimaryKey> currentPk =
@@ -2830,6 +2845,13 @@ public class HBaseStore implements RawStore {
@Override
public void addForeignKeys(List<SQLForeignKey> fks) throws InvalidObjectException, MetaException {
boolean commit = false;
+ for (SQLForeignKey fk : fks) {
+ fk.setPktable_db(HiveStringUtils.normalizeIdentifier(fk.getPktable_db()));
+ fk.setPktable_name(HiveStringUtils.normalizeIdentifier(fk.getPktable_name()));
+ fk.setFktable_db(HiveStringUtils.normalizeIdentifier(fk.getFktable_db()));
+ fk.setFktable_name(HiveStringUtils.normalizeIdentifier(fk.getFktable_name()));
+ fk.setFk_name(HiveStringUtils.normalizeIdentifier(fk.getFk_name()));
+ }
openTransaction();
try {
// Fetch the existing keys (if any) and add in these new ones
@@ -2848,6 +2870,13 @@ public class HBaseStore implements RawStore {
}
@Override
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException {
+ // TODO: see if it makes sense to implement this here
+ return null;
+ }
+
+ @Override
public void createTableWrite(Table tbl, long writeId, char state, long heartbeat) {
// TODO: Auto-generated method stub
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
index 94087b1..3172f92 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
@@ -619,7 +619,7 @@ public class HBaseUtils {
* @param md message descriptor to use to generate the hash
* @return the hash as a byte array
*/
- static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) {
+ public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) {
// Note all maps and lists have to be absolutely sorted. Otherwise we'll produce different
// results for hashes based on the OS or JVM being used.
md.reset();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
index ed6080b..e9ed7e5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
@@ -31,6 +31,8 @@ public abstract class AlterPartitionMessage extends EventMessage {
public abstract String getTable();
+ public abstract boolean getIsTruncateOp();
+
public abstract Map<String,String> getKeyValues();
public abstract Table getTableObj() throws Exception;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
index 5487123..39a87bc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
@@ -28,6 +28,8 @@ public abstract class AlterTableMessage extends EventMessage {
public abstract String getTable();
+ public abstract boolean getIsTruncateOp();
+
public abstract Table getTableObjBefore() throws Exception;
public abstract Table getTableObjAfter() throws Exception;
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
index a5414d1..8205c25 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.thrift.TException;
import java.io.IOException;
@@ -30,88 +31,10 @@ import java.util.List;
public class EventUtils {
- /**
- * Utility function that constructs a notification filter to match a given db name and/or table name.
- * If dbName == null, fetches all warehouse events.
- * If dnName != null, but tableName == null, fetches all events for the db
- * If dbName != null && tableName != null, fetches all events for the specified table
- * @param dbName
- * @param tableName
- * @return
- */
- public static IMetaStoreClient.NotificationFilter getDbTblNotificationFilter(final String dbName, final String tableName){
- return new IMetaStoreClient.NotificationFilter() {
- @Override
- public boolean accept(NotificationEvent event) {
- if (event == null){
- return false; // get rid of trivial case first, so that we can safely assume non-null
- }
- if (dbName == null){
- return true; // if our dbName is null, we're interested in all wh events
- }
- if (dbName.equalsIgnoreCase(event.getDbName())){
- if ( (tableName == null)
- // if our dbName is equal, but tableName is blank, we're interested in this db-level event
- || (tableName.equalsIgnoreCase(event.getTableName()))
- // table level event that matches us
- ){
- return true;
- }
- }
- return false;
- }
- };
- }
-
- public static IMetaStoreClient.NotificationFilter restrictByMessageFormat(final String messageFormat){
- return new IMetaStoreClient.NotificationFilter() {
- @Override
- public boolean accept(NotificationEvent event) {
- if (event == null){
- return false; // get rid of trivial case first, so that we can safely assume non-null
- }
- if (messageFormat == null){
- return true; // let's say that passing null in will not do any filtering.
- }
- if (messageFormat.equalsIgnoreCase(event.getMessageFormat())){
- return true;
- }
- return false;
- }
- };
- }
-
- public static IMetaStoreClient.NotificationFilter getEventBoundaryFilter(final Long eventFrom, final Long eventTo){
- return new IMetaStoreClient.NotificationFilter() {
- @Override
- public boolean accept(NotificationEvent event) {
- if ( (event == null) || (event.getEventId() < eventFrom) || (event.getEventId() > eventTo)) {
- return false;
- }
- return true;
- }
- };
- }
-
- public static IMetaStoreClient.NotificationFilter andFilter(
- final IMetaStoreClient.NotificationFilter... filters ) {
- return new IMetaStoreClient.NotificationFilter() {
- @Override
- public boolean accept(NotificationEvent event) {
- for (IMetaStoreClient.NotificationFilter filter : filters){
- if (!filter.accept(event)){
- return false;
- }
- }
- return true;
- }
- };
- }
-
public interface NotificationFetcher {
- public int getBatchSize() throws IOException;
- public long getCurrentNotificationEventId() throws IOException;
- public List<NotificationEvent> getNextNotificationEvents(
+ int getBatchSize() throws IOException;
+ long getCurrentNotificationEventId() throws IOException;
+ List<NotificationEvent> getNextNotificationEvents(
long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
}
@@ -177,7 +100,7 @@ public class EventUtils {
public NotificationEventIterator(
NotificationFetcher nfetcher, long eventFrom, int maxEvents,
String dbName, String tableName) throws IOException {
- init(nfetcher, eventFrom, maxEvents, EventUtils.getDbTblNotificationFilter(dbName, tableName));
+ init(nfetcher, eventFrom, maxEvents, new DatabaseAndTableFilter(dbName, tableName));
// using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter
// is an operation that needs to run before delegating to the other ctor, and this messes up chaining
// ctors
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
index 3d16721..6d146e0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -37,6 +37,12 @@ public abstract class InsertMessage extends EventMessage {
public abstract String getTable();
/**
+ * Getter for the replace flag being insert into/overwrite
+ * @return Replace flag to represent INSERT INTO or INSERT OVERWRITE (Boolean).
+ */
+ public abstract boolean isReplace();
+
+ /**
* Get the map of partition keyvalues. Will be null if this insert is to a table and not a
* partition.
* @return Map of partition keyvalues, or null.
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index aa770f2..1bd52a8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -149,9 +149,10 @@ public abstract class MessageFactory {
* and some are not yet supported.
* @param before The table before the alter
* @param after The table after the alter
+ * @param isTruncateOp Flag to denote truncate table
* @return
*/
- public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after);
+ public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp);
/**
* Factory method for DropTableMessage.
@@ -175,10 +176,11 @@ public abstract class MessageFactory {
* @param table The table in which the partition is being altered
* @param before The partition before it was altered
* @param after The partition after it was altered
+ * @param isTruncateOp Flag to denote truncate partition
* @return a new AlterPartitionMessage
*/
public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
- Partition after);
+ Partition after, boolean isTruncateOp);
/**
* Factory method for DropPartitionMessage.
@@ -231,9 +233,10 @@ public abstract class MessageFactory {
* @param table Name of the table the insert occurred in
* @param partVals Partition values for the partition that the insert occurred in, may be null if
* the insert was done into a non-partitioned table
+ * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO
* @param files Iterator of file created
* @return instance of InsertMessage
*/
public abstract InsertMessage buildInsertMessage(String db, String table,
- Map<String, String> partVals, Iterator<String> files);
+ Map<String, String> partVals, boolean replace, Iterator<String> files);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java
index b10b8a8..4fd7f8c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java
@@ -22,10 +22,13 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.collect.Lists;
+import org.codehaus.jackson.annotate.JsonProperty;
public class PartitionFiles {
+ @JsonProperty
private String partitionName;
+ @JsonProperty
private List<String> files;
public PartitionFiles(String partitionName, Iterator<String> files) {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java
new file mode 100644
index 0000000..d6429f6
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+public class AndFilter implements IMetaStoreClient.NotificationFilter {
+ final IMetaStoreClient.NotificationFilter[] filters;
+
+ public AndFilter(final IMetaStoreClient.NotificationFilter... filters) {
+ this.filters = filters;
+ }
+
+ @Override
+ public boolean accept(final NotificationEvent event) {
+ for (IMetaStoreClient.NotificationFilter filter : filters) {
+ if (!filter.accept(event)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java
new file mode 100644
index 0000000..5294063
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+public abstract class BasicFilter implements NotificationFilter {
+ @Override
+ public boolean accept(final NotificationEvent event) {
+ if (event == null) {
+ return false; // get rid of trivial case first, so that we can safely assume non-null
+ }
+ return shouldAccept(event);
+ }
+
+ abstract boolean shouldAccept(final NotificationEvent event);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
new file mode 100644
index 0000000..4a7ca6d
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+/**
+ * Utility function that constructs a notification filter to match a given db name and/or table name.
+ * If dbName == null, fetches all warehouse events.
+ * If dnName != null, but tableName == null, fetches all events for the db
+ * If dbName != null && tableName != null, fetches all events for the specified table
+ */
+public class DatabaseAndTableFilter extends BasicFilter {
+ private final String databaseName, tableName;
+
+ public DatabaseAndTableFilter(final String databaseName, final String tableName) {
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ }
+
+ @Override
+ boolean shouldAccept(final NotificationEvent event) {
+ if (databaseName == null) {
+ return true; // if our dbName is null, we're interested in all wh events
+ }
+ if (databaseName.equalsIgnoreCase(event.getDbName())) {
+ if ((tableName == null)
+ // if our dbName is equal, but tableName is blank, we're interested in this db-level event
+ || (tableName.equalsIgnoreCase(event.getTableName()))
+ // table level event that matches us
+ ) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java
new file mode 100644
index 0000000..137b4ce
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+public class EventBoundaryFilter extends BasicFilter {
+ private final long eventFrom, eventTo;
+
+ public EventBoundaryFilter(final long eventFrom, final long eventTo) {
+ this.eventFrom = eventFrom;
+ this.eventTo = eventTo;
+ }
+
+ @Override
+ boolean shouldAccept(final NotificationEvent event) {
+ return eventFrom <= event.getEventId() && event.getEventId() <= eventTo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java
new file mode 100644
index 0000000..4e91ee6
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging.event.filters;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+public class MessageFormatFilter extends BasicFilter {
+ private final String format;
+
+ public MessageFormatFilter(String format) {
+ this.format = format;
+ }
+
+ @Override
+ boolean shouldAccept(final NotificationEvent event) {
+ if (format == null) {
+ return true; // let's say that passing null in will not do any filtering.
+ }
+ return format.equalsIgnoreCase(event.getMessageFormat());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
index dd1bf3c..bd7776c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java
@@ -37,6 +37,9 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
String server, servicePrincipal, db, table, tableObjJson;
@JsonProperty
+ String isTruncateOp;
+
+ @JsonProperty
Long timestamp;
@JsonProperty
@@ -52,11 +55,12 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
}
public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj,
- Partition partitionObjBefore, Partition partitionObjAfter, Long timestamp) {
+ Partition partitionObjBefore, Partition partitionObjAfter, boolean isTruncateOp, Long timestamp) {
this.server = server;
this.servicePrincipal = servicePrincipal;
this.db = tableObj.getDbName();
this.table = tableObj.getTableName();
+ this.isTruncateOp = Boolean.toString(isTruncateOp);
this.timestamp = timestamp;
this.keyValues = JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore);
try {
@@ -95,6 +99,9 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage {
}
@Override
+ public boolean getIsTruncateOp() { return Boolean.parseBoolean(isTruncateOp); }
+
+ @Override
public Map<String, String> getKeyValues() {
return keyValues;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
index 792015e..58eb1a7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java
@@ -32,6 +32,9 @@ public class JSONAlterTableMessage extends AlterTableMessage {
String server, servicePrincipal, db, table, tableObjBeforeJson, tableObjAfterJson;
@JsonProperty
+ String isTruncateOp;
+
+ @JsonProperty
Long timestamp;
/**
@@ -41,11 +44,12 @@ public class JSONAlterTableMessage extends AlterTableMessage {
}
public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObjBefore, Table tableObjAfter,
- Long timestamp) {
+ boolean isTruncateOp, Long timestamp) {
this.server = server;
this.servicePrincipal = servicePrincipal;
this.db = tableObjBefore.getDbName();
this.table = tableObjBefore.getTableName();
+ this.isTruncateOp = Boolean.toString(isTruncateOp);
this.timestamp = timestamp;
try {
this.tableObjBeforeJson = JSONMessageFactory.createTableObjJson(tableObjBefore);
@@ -82,6 +86,9 @@ public class JSONAlterTableMessage extends AlterTableMessage {
}
@Override
+ public boolean getIsTruncateOp() { return Boolean.parseBoolean(isTruncateOp); }
+
+ @Override
public Table getTableObjBefore() throws Exception {
return (Table) JSONMessageFactory.getTObj(tableObjBeforeJson,Table.class);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index e1316a4..c059d47 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -40,6 +40,9 @@ public class JSONInsertMessage extends InsertMessage {
Long timestamp;
@JsonProperty
+ String replace;
+
+ @JsonProperty
List<String> files;
@JsonProperty
@@ -52,12 +55,13 @@ public class JSONInsertMessage extends InsertMessage {
}
public JSONInsertMessage(String server, String servicePrincipal, String db, String table,
- Map<String, String> partKeyVals, Iterator<String> fileIter, Long timestamp) {
+ Map<String, String> partKeyVals, boolean replace, Iterator<String> fileIter, Long timestamp) {
this.server = server;
this.servicePrincipal = servicePrincipal;
this.db = db;
this.table = table;
this.timestamp = timestamp;
+ this.replace = Boolean.toString(replace);
this.partKeyVals = partKeyVals;
this.files = Lists.newArrayList(fileIter);
checkValid();
@@ -99,6 +103,9 @@ public class JSONInsertMessage extends InsertMessage {
}
@Override
+ public boolean isReplace() { return Boolean.parseBoolean(replace); }
+
+ @Override
public String toString() {
try {
return JSONMessageDeserializer.mapper.writeValueAsString(this);
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
index 41732c7..40ef5fb 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
/**
* MessageDeserializer implementation, for deserializing from JSON strings.
@@ -46,6 +47,9 @@ public class JSONMessageDeserializer extends MessageDeserializer {
static {
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, false);
+ mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, false);
+ mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index 3406afb..04a4041 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -28,6 +28,10 @@ import javax.annotation.Nullable;
import com.google.common.collect.Iterables;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.Index;
@@ -104,8 +108,8 @@ public class JSONMessageFactory extends MessageFactory {
}
@Override
- public AlterTableMessage buildAlterTableMessage(Table before, Table after) {
- return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, now());
+ public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp) {
+ return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, isTruncateOp, now());
}
@Override
@@ -123,8 +127,8 @@ public class JSONMessageFactory extends MessageFactory {
@Override
public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before,
- Partition after) {
- return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, before, after,
+ Partition after, boolean isTruncateOp) {
+ return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, before, after, isTruncateOp,
now());
}
@@ -161,10 +165,9 @@ public class JSONMessageFactory extends MessageFactory {
}
@Override
- public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals,
+ public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, boolean replace,
Iterator<String> fileIter) {
- return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals,
- fileIter, now());
+ return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, replace, fileIter, now());
}
private long now() {
@@ -298,5 +301,4 @@ public class JSONMessageFactory extends MessageFactory {
};
return getTObjs(Iterables.transform(jsonArrayIterator, textExtractor), objClass);
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java b/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
index 63be7b7..10fcbea 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
@@ -284,7 +284,7 @@ public class ExpressionTree {
//can only support "=" and "!=" for now, because our JDO lib is buggy when
// using objects from map.get()
private static final Set<Operator> TABLE_FILTER_OPS = Sets.newHashSet(
- Operator.EQUALS, Operator.NOTEQUALS, Operator.NOTEQUALS2);
+ Operator.EQUALS, Operator.NOTEQUALS, Operator.NOTEQUALS2, Operator.LIKE);
private void generateJDOFilterOverTables(Map<String, Object> params,
FilterBuilder filterBuilder) throws MetaException {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index d378d06..970038d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -52,6 +52,7 @@ import javax.sql.DataSource;
import java.io.IOException;
import java.io.PrintWriter;
+import java.nio.ByteBuffer;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -147,6 +148,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
static private DataSource connPool;
+ private static DataSource connPoolMutex;
static private boolean doRetryOnConnPool = false;
private enum OpertaionType {
@@ -203,8 +205,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
private int deadlockCnt;
private long deadlockRetryInterval;
protected HiveConf conf;
- protected DatabaseProduct dbProduct;
- private SQLGenerator sqlGenerator;
+ private static DatabaseProduct dbProduct;
+ private static SQLGenerator sqlGenerator;
// (End user) Transaction timeout, in milliseconds.
private long timeout;
@@ -223,7 +225,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
*/
private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
private static final String hostname = ServerUtils.hostname();
- private static volatile boolean dumpConfig = true;
// Private methods should never catch SQLException and then throw MetaException. The public
// methods depend on SQLException coming back so they can detect and handle deadlocks. Private
@@ -247,20 +248,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
checkQFileTestHack();
- Connection dbConn = null;
- // Set up the JDBC connection pool
- try {
- setupJdbcConnectionPool(conf);
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- determineDatabaseProduct(dbConn);
- sqlGenerator = new SQLGenerator(dbProduct, conf);
- } catch (SQLException e) {
- String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage();
- LOG.error(msg);
- throw new RuntimeException(e);
- }
- finally {
- closeDbConn(dbConn);
+ synchronized (TxnHandler.class) {
+ if (connPool == null) {
+ //only do this once per JVM; useful for support
+ LOG.info(HiveConfUtil.dumpConfig(conf).toString());
+
+ Connection dbConn = null;
+ // Set up the JDBC connection pool
+ try {
+ int maxPoolSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_MAX_CONNECTIONS);
+ long getConnectionTimeoutMs = 30000;
+ connPool = setupJdbcConnectionPool(conf, maxPoolSize, getConnectionTimeoutMs);
+ /*the mutex pools should ideally be somewhat larger since some operations require 1
+ connection from each pool and we want to avoid taking a connection from primary pool
+ and then blocking because mutex pool is empty. There is only 1 thread in any HMS trying
+ to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock. The CheckLock operation gets a
+ connection from connPool first, then connPoolMutex. All others, go in the opposite
+ order (not very elegant...). So number of connection requests for connPoolMutex cannot
+ exceed (size of connPool + MUTEX_KEY.values().length - 1).*/
+ connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + MUTEX_KEY.values().length, getConnectionTimeoutMs);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ determineDatabaseProduct(dbConn);
+ sqlGenerator = new SQLGenerator(dbProduct, conf);
+ } catch (SQLException e) {
+ String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage();
+ LOG.error(msg);
+ throw new RuntimeException(e);
+ } finally {
+ closeDbConn(dbConn);
+ }
+ }
}
timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -270,11 +287,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
deadlockRetryInterval = retryInterval / 10;
maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS);
- if(dumpConfig) {
- LOG.info(HiveConfUtil.dumpConfig(conf).toString());
- //only do this once per JVM; useful for support
- dumpConfig = false;
- }
}
@Override
@RetrySemantics.ReadOnly
@@ -367,7 +379,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
/**
* This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
-\ */
+ */
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "select ntxn_next - 1 from NEXT_TXN_ID";
@@ -383,23 +395,27 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
"initialized, null record found in next_txn_id");
}
close(rs);
- Set<Long> openList = new HashSet<Long>();
+ List<Long> openList = new ArrayList<Long>();
//need the WHERE clause below to ensure consistent results with READ_COMMITTED
- s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm;
+ s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm + " order by txn_id";
LOG.debug("Going to execute query<" + s + ">");
rs = stmt.executeQuery(s);
long minOpenTxn = Long.MAX_VALUE;
+ BitSet abortedBits = new BitSet();
while (rs.next()) {
long txnId = rs.getLong(1);
openList.add(txnId);
char c = rs.getString(2).charAt(0);
if(c == TXN_OPEN) {
minOpenTxn = Math.min(minOpenTxn, txnId);
+ } else if (c == TXN_ABORTED) {
+ abortedBits.set(openList.size() - 1);
}
}
LOG.debug("Going to rollback");
dbConn.rollback();
- GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
+ GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer);
if(minOpenTxn < Long.MAX_VALUE) {
otr.setMin_open_txn(minOpenTxn);
}
@@ -844,7 +860,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
/**
* As much as possible (i.e. in absence of retries) we want both operations to be done on the same
* connection (but separate transactions). This avoid some flakiness in BONECP where if you
- * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one
+ * perform an operation on 1 connection and immediately get another from the pool, the 2nd one
* doesn't see results of the first.
*
* Retry-by-caller note: If the call to lock is from a transaction, then in the worst case
@@ -983,6 +999,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
case SELECT:
updateTxnComponents = false;
break;
+ case NO_TXN:
+ /*this constant is a bit of a misnomer since we now always have a txn context. It
+ just means the operation is such that we don't care what tables/partitions it
+ affected as it doesn't trigger a compaction or conflict detection. A better name
+ would be NON_TRANSACTIONAL.*/
+ updateTxnComponents = false;
+ break;
default:
//since we have an open transaction, only 4 values above are expected
throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
@@ -1934,7 +1957,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
- protected Connection getDbConn(int isolationLevel) throws SQLException {
+ Connection getDbConn(int isolationLevel) throws SQLException {
+ return getDbConn(isolationLevel, connPool);
+ }
+ private Connection getDbConn(int isolationLevel, DataSource connPool) throws SQLException {
int rc = doRetryOnConnPool ? 10 : 1;
Connection dbConn = null;
while (true) {
@@ -2457,14 +2483,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
response.setLockid(extLockId);
LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
- Savepoint save = dbConn.setSavepoint();//todo: get rid of this
+ Savepoint save = dbConn.setSavepoint();
StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
"hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
"hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
Set<String> strings = new HashSet<String>(locksBeingChecked.size());
- //This the set of entities that the statement represnted by extLockId wants to update
+ //This the set of entities that the statement represented by extLockId wants to update
List<LockInfo> writeSet = new ArrayList<>();
for (LockInfo info : locksBeingChecked) {
@@ -3131,9 +3157,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
- private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
- if (connPool != null) return;
-
+ private static synchronized DataSource setupJdbcConnectionPool(HiveConf conf, int maxPoolSize, long getConnectionTimeoutMs) throws SQLException {
String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY);
String user = getMetastoreJdbcUser(conf);
String passwd = getMetastoreJdbcPasswd(conf);
@@ -3143,33 +3167,40 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if ("bonecp".equals(connectionPooler)) {
BoneCPConfig config = new BoneCPConfig();
config.setJdbcUrl(driverUrl);
- //if we are waiting for connection for 60s, something is really wrong
+ //if we are waiting for connection for a long time, something is really wrong
//better raise an error than hang forever
- config.setConnectionTimeoutInMs(60000);
- config.setMaxConnectionsPerPartition(10);
+ //see DefaultConnectionStrategy.getConnectionInternal()
+ config.setConnectionTimeoutInMs(getConnectionTimeoutMs);
+ config.setMaxConnectionsPerPartition(maxPoolSize);
config.setPartitionCount(1);
config.setUser(user);
config.setPassword(passwd);
- connPool = new BoneCPDataSource(config);
doRetryOnConnPool = true; // Enable retries to work around BONECP bug.
+ return new BoneCPDataSource(config);
} else if ("dbcp".equals(connectionPooler)) {
- ObjectPool objectPool = new GenericObjectPool();
+ GenericObjectPool objectPool = new GenericObjectPool();
+ //https://commons.apache.org/proper/commons-pool/api-1.6/org/apache/commons/pool/impl/GenericObjectPool.html#setMaxActive(int)
+ objectPool.setMaxActive(maxPoolSize);
+ objectPool.setMaxWait(getConnectionTimeoutMs);
ConnectionFactory connFactory = new DriverManagerConnectionFactory(driverUrl, user, passwd);
// This doesn't get used, but it's still necessary, see
// http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup
PoolableConnectionFactory poolConnFactory =
new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true);
- connPool = new PoolingDataSource(objectPool);
+ return new PoolingDataSource(objectPool);
} else if ("hikaricp".equals(connectionPooler)) {
HikariConfig config = new HikariConfig();
+ config.setMaximumPoolSize(maxPoolSize);
config.setJdbcUrl(driverUrl);
config.setUsername(user);
config.setPassword(passwd);
+ //https://github.com/brettwooldridge/HikariCP
+ config.setConnectionTimeout(getConnectionTimeoutMs);
- connPool = new HikariDataSource(config);
+ return new HikariDataSource(config);
} else if ("none".equals(connectionPooler)) {
LOG.info("Choosing not to pool JDBC connections");
- connPool = new NoPoolConnectionPool(conf);
+ return new NoPoolConnectionPool(conf);
} else {
throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler);
}
@@ -3427,7 +3458,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
String sqlStmt = sqlGenerator.addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0");
lockInternal();
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
stmt = dbConn.createStatement();
if(LOG.isDebugEnabled()) {
LOG.debug("About to execute SQL: " + sqlStmt);
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 517eec3..6e0070b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -32,9 +32,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
public class TxnUtils {
private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
@@ -50,8 +50,13 @@ public class TxnUtils {
* @return a valid txn list.
*/
public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+ /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
+ * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
+ * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should
+ * inlude the latest committed set.*/
long highWater = txns.getTxn_high_water_mark();
- Set<Long> open = txns.getOpen_txns();
+ List<Long> open = txns.getOpen_txns();
+ BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
int i = 0;
for(long txn: open) {
@@ -59,10 +64,10 @@ public class TxnUtils {
exceptions[i++] = txn;
}
if(txns.isSetMin_open_txn()) {
- return new ValidReadTxnList(exceptions, highWater, txns.getMin_open_txn());
+ return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn());
}
else {
- return new ValidReadTxnList(exceptions, highWater);
+ return new ValidReadTxnList(exceptions, abortedBits, highWater);
}
}
@@ -93,7 +98,9 @@ public class TxnUtils {
exceptions = Arrays.copyOf(exceptions, i);
}
highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
- return new ValidCompactorTxnList(exceptions, highWater);
+ BitSet bitSet = new BitSet(exceptions.length);
+ bitSet.set(0, bitSet.length()); // for ValidCompactorTxnList, everything in exceptions are aborted
+ return new ValidCompactorTxnList(exceptions, bitSet, highWater);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/model/package.jdo
----------------------------------------------------------------------
diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo
index 844bc46..67e2c20 100644
--- a/metastore/src/model/package.jdo
+++ b/metastore/src/model/package.jdo
@@ -63,10 +63,10 @@
<class name="MFieldSchema" embedded-only="true" table="TYPE_FIELDS" detachable="true">
<field name="name">
- <column name="FNAME" length="128" jdbc-type="VARCHAR"/>
+ <column name="FNAME" length="767" jdbc-type="VARCHAR"/>
</field>
<field name="type" >
- <column name="FTYPE" length="4000" jdbc-type="VARCHAR" allows-null="false"/>
+ <column name="FTYPE" jdbc-type="CLOB" allows-null="false"/>
</field>
<field name="comment" >
<column name="FCOMMENT" length="4000" jdbc-type="VARCHAR" allows-null="true"/>
@@ -118,7 +118,7 @@
<column name="DB_ID"/>
</index>
<field name="tableName">
- <column name="TBL_NAME" length="128" jdbc-type="VARCHAR"/>
+ <column name="TBL_NAME" length="256" jdbc-type="VARCHAR"/>
</field>
<field name="database">
<column name="DB_ID"/>
@@ -170,7 +170,7 @@
<column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/>
</key>
<value>
- <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/>
+ <column name="PARAM_VALUE" jdbc-type="CLOB"/>
</value>
</field>
<field name="viewOriginalText" default-fetch-group="false">
@@ -251,14 +251,14 @@
<column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/>
</key>
<value>
- <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/>
+ <column name="PARAM_VALUE" jdbc-type="CLOB"/>
</value>
</field>
</class>
<class name="MOrder" embedded-only="true" table="SORT_ORDER" detachable="true">
<field name="col">
- <column name="COL_NAME" length="128" jdbc-type="VARCHAR"/>
+ <column name="COL_NAME" length="767" jdbc-type="VARCHAR"/>
</field>
<field name="order">
<column name="ORDER" jdbc-type="INTEGER" allows-null="false"/>
@@ -280,10 +280,10 @@
<element>
<embedded>
<field name="name">
- <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/>
+ <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/>
</field>
<field name="type">
- <column name="TYPE_NAME" length="4000" jdbc-type="VARCHAR" allows-null="false"/>
+ <column name="TYPE_NAME" jdbc-type="CLOB" allows-null="false"/>
</field>
<field name="comment">
<column name="COMMENT" length="256" jdbc-type="VARCHAR" allows-null="true"/>
@@ -349,7 +349,7 @@
<element>
<embedded>
<field name="col">
- <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/>
+ <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/>
</field>
<field name="order">
<column name="ORDER" jdbc-type="INTEGER" allows-null="false"/>
@@ -366,7 +366,7 @@
<column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/>
</key>
<value>
- <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/>
+ <column name="PARAM_VALUE" jdbc-type="CLOB"/>
</value>
</field>
<field name="skewedColNames" table="SKEWED_COL_NAMES">
@@ -725,7 +725,7 @@
<column name="TBL_ID" />
</field>
<field name="columnName">
- <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/>
+ <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/>
</field>
<field name="privilege">
<column name="TBL_COL_PRIV" length="128" jdbc-type="VARCHAR"/>
@@ -770,7 +770,7 @@
<column name="PART_ID" />
</field>
<field name="columnName">
- <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/>
+ <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/>
</field>
<field name="privilege">
<column name="PART_COL_PRIV" length="128" jdbc-type="VARCHAR"/>
@@ -803,7 +803,7 @@
<column name="DB_NAME" length="128" jdbc-type="VARCHAR"/>
</field>
<field name="tblName">
- <column name="TBL_NAME" length="128" jdbc-type="VARCHAR"/>
+ <column name="TBL_NAME" length="256" jdbc-type="VARCHAR"/>
</field>
<field name="partName">
<column name="PARTITION_NAME" length="767" jdbc-type="VARCHAR"/>
@@ -850,13 +850,13 @@
<column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
</field>
<field name="tableName">
- <column name="TABLE_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+ <column name="TABLE_NAME" length="256" jdbc-type="VARCHAR" allows-null="false"/>
</field>
<field name="table">
<column name="TBL_ID"/>
</field>
<field name="colName">
- <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+ <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR" allows-null="false"/>
</field>
<field name="colType">
<column name="COLUMN_TYPE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
@@ -911,7 +911,7 @@
<column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
</field>
<field name="tableName">
- <column name="TABLE_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+ <column name="TABLE_NAME" length="256" jdbc-type="VARCHAR" allows-null="false"/>
</field>
<field name="partitionName">
<column name="PARTITION_NAME" length="767" jdbc-type="VARCHAR" allows-null="false"/>
@@ -920,7 +920,7 @@
<column name="PART_ID"/>
</field>
<field name="colName">
- <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+ <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR" allows-null="false"/>
</field>
<field name="colType">
<column name="COLUMN_TYPE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
@@ -1050,7 +1050,7 @@
<column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/>
</field>
<field name="tableName">
- <column name="TBL_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/>
+ <column name="TBL_NAME" length="256" jdbc-type="VARCHAR" allows-null="true"/>
</field>
<field name="message">
<column name="MESSAGE" jdbc-type="LONGVARCHAR"/>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 64da9b4..7760bc7 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -873,6 +874,13 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
}
@Override
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
public void createTableWrite(Table tbl, long writeId, char state, long heartbeat) {
}