You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 20:43:04 UTC
[14/51] [partial] hive git commit: Revert "HIVE-14671 : merge master
into hive-14535 (Wei Zheng)"
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 64bc1a2..915bce3 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -58,13 +58,9 @@ import java.util.regex.Pattern;
import javax.jdo.JDOException;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
@@ -82,7 +78,6 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.io.HdfsUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
@@ -119,7 +114,6 @@ import org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent;
import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
-import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.model.MTableWrite;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -165,6 +159,10 @@ import com.facebook.fb303.fb_status;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -172,8 +170,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/
public class HiveMetaStore extends ThriftHiveMetastore {
public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStore.class);
- public static final String PARTITION_NUMBER_EXCEED_LIMIT_MSG =
- "Number of partitions scanned (=%d) on table '%s' exceeds limit (=%d). This is controlled on the metastore server by %s.";
// boolean that tells if the HiveMetaStore (remote) server is being used.
// Can be used to determine if the calls to metastore api (HMSHandler) are being made with
@@ -238,6 +234,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
public static class HMSHandler extends FacebookBase implements IHMSHandler, ThreadLocalRawStore {
public static final Logger LOG = HiveMetaStore.LOG;
+ private String rawStoreClassName;
private final HiveConf hiveConf; // stores datastore (jpox) properties,
// right now they come from jpox.properties
@@ -419,6 +416,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
@Override
public void init() throws MetaException {
+ rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
initListeners = MetaStoreUtils.getMetaStoreListeners(
MetaStoreInitListener.class, hiveConf,
hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS));
@@ -516,7 +514,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
fileMetadataManager = new FileMetadataManager((ThreadLocalRawStore)this, hiveConf);
}
- private static String addPrefix(String s) {
+ private String addPrefix(String s) {
return threadLocalId.get() + ": " + s;
}
@@ -591,14 +589,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
@InterfaceStability.Evolving
@Override
public RawStore getMS() throws MetaException {
- Configuration conf = getConf();
- return getMSForConf(conf);
- }
-
- public static RawStore getMSForConf(Configuration conf) throws MetaException {
RawStore ms = threadLocalMS.get();
if (ms == null) {
- ms = newRawStoreForConf(conf);
+ ms = newRawStore();
ms.verifySchema();
threadLocalMS.set(ms);
ms = threadLocalMS.get();
@@ -615,23 +608,24 @@ public class HiveMetaStore extends ThriftHiveMetastore {
return txn;
}
- private static RawStore newRawStoreForConf(Configuration conf) throws MetaException {
- HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
- String rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
- LOG.info(addPrefix("Opening raw store with implementation class:" + rawStoreClassName));
+ private RawStore newRawStore() throws MetaException {
+ LOG.info(addPrefix("Opening raw store with implementation class:"
+ + rawStoreClassName));
+ Configuration conf = getConf();
+
if (hiveConf.getBoolVar(ConfVars.METASTORE_FASTPATH)) {
LOG.info("Fastpath, skipping raw store proxy");
try {
- RawStore rs =
- ((Class<? extends RawStore>) MetaStoreUtils.getClass(rawStoreClassName))
- .newInstance();
- rs.setConf(hiveConf);
+ RawStore rs = ((Class<? extends RawStore>) MetaStoreUtils.getClass(
+ rawStoreClassName)).newInstance();
+ rs.setConf(conf);
return rs;
} catch (Exception e) {
LOG.error("Unable to instantiate raw store directly in fastpath mode", e);
throw new RuntimeException(e);
}
}
+
return RawStoreProxy.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get());
}
@@ -880,11 +874,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Path dbPath = new Path(db.getLocationUri());
boolean success = false;
boolean madeDir = false;
- Map<String, String> transactionalListenersResponses = Collections.emptyMap();
try {
firePreEvent(new PreCreateDatabaseEvent(db, this));
if (!wh.isDir(dbPath)) {
- if (!wh.mkdirs(dbPath)) {
+ if (!wh.mkdirs(dbPath, true)) {
throw new MetaException("Unable to create database path " + dbPath +
", failed to create database " + db.getName());
}
@@ -893,12 +886,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.openTransaction();
ms.createDatabase(db);
-
- if (!transactionalListeners.isEmpty()) {
- transactionalListenersResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.CREATE_DATABASE,
- new CreateDatabaseEvent(db, true, this));
+ if (transactionalListeners.size() > 0) {
+ CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onCreateDatabase(cde);
+ }
}
success = ms.commitTransaction();
@@ -909,13 +901,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
wh.deleteDir(dbPath, true);
}
}
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.CREATE_DATABASE,
- new CreateDatabaseEvent(db, success, this),
- null,
- transactionalListenersResponses);
+ for (MetaStoreEventListener listener : listeners) {
+ listener.onCreateDatabase(new CreateDatabaseEvent(db, success, this));
}
}
}
@@ -1030,7 +1017,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Database db = null;
List<Path> tablePaths = new ArrayList<Path>();
List<Path> partitionPaths = new ArrayList<Path>();
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
db = ms.getDatabase(name);
@@ -1113,13 +1099,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
if (ms.dropDatabase(name)) {
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.DROP_DATABASE,
- new DropDatabaseEvent(db, true, this));
+ if (transactionalListeners.size() > 0) {
+ DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onDropDatabase(dde);
+ }
}
-
success = ms.commitTransaction();
}
} finally {
@@ -1141,13 +1126,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
// it is not a terrible thing even if the data is not deleted
}
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.DROP_DATABASE,
- new DropDatabaseEvent(db, success, this),
- null,
- transactionalListenerResponses);
+ for (MetaStoreEventListener listener : listeners) {
+ listener.onDropDatabase(new DropDatabaseEvent(db, success, this));
}
}
}
@@ -1373,7 +1353,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
Path tblPath = null;
boolean success = false, madeDir = false;
try {
@@ -1395,7 +1374,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
if (tbl.getSd().getLocation() == null
|| tbl.getSd().getLocation().isEmpty()) {
- tblPath = wh.getDefaultTablePath(
+ tblPath = wh.getTablePath(
ms.getDatabase(tbl.getDbName()), tbl.getTableName());
} else {
if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
@@ -1409,7 +1388,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (tblPath != null) {
if (!wh.isDir(tblPath)) {
- if (!wh.mkdirs(tblPath)) {
+ if (!wh.mkdirs(tblPath, true)) {
throw new MetaException(tblPath
+ " is not a directory or unable to create one");
}
@@ -1434,12 +1413,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys);
}
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.CREATE_TABLE,
- new CreateTableEvent(tbl, true, this),
- envContext);
+ if (transactionalListeners.size() > 0) {
+ CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, this);
+ createTableEvent.setEnvironmentContext(envContext);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onCreateTable(createTableEvent);
+ }
}
success = ms.commitTransaction();
@@ -1450,13 +1429,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
wh.deleteDir(tblPath, true);
}
}
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.CREATE_TABLE,
- new CreateTableEvent(tbl, success, this),
- envContext,
- transactionalListenerResponses);
+ for (MetaStoreEventListener listener : listeners) {
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(tbl, success, this);
+ createTableEvent.setEnvironmentContext(envContext);
+ listener.onCreateTable(createTableEvent);
}
}
}
@@ -1621,7 +1598,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
List<Path> partPaths = null;
Table tbl = null;
boolean ifPurge = false;
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
// drop any partitions
@@ -1666,6 +1642,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
+ checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge, deleteData && !isExternal);
// Drop the partitions and get a list of locations which need to be deleted
partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath,
tbl.getPartitionKeys(), deleteData && !isExternal);
@@ -1674,12 +1651,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
throw new MetaException(indexName == null ? "Unable to drop table " + tableName:
"Unable to drop index table " + tableName + " for index " + indexName);
} else {
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.DROP_TABLE,
- new DropTableEvent(tbl, deleteData, true, this),
- envContext);
+ if (transactionalListeners.size() > 0) {
+ DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, deleteData, this);
+ dropTableEvent.setEnvironmentContext(envContext);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onDropTable(dropTableEvent);
+ }
}
success = ms.commitTransaction();
}
@@ -1694,19 +1671,56 @@ public class HiveMetaStore extends ThriftHiveMetastore {
deleteTableData(tblPath, ifPurge);
// ok even if the data is not deleted
}
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.DROP_TABLE,
- new DropTableEvent(tbl, deleteData, success, this),
- envContext,
- transactionalListenerResponses);
+ for (MetaStoreEventListener listener : listeners) {
+ DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this);
+ dropTableEvent.setEnvironmentContext(envContext);
+ listener.onDropTable(dropTableEvent);
}
}
return success;
}
/**
+ * Will throw MetaException if combination of trash policy/purge can't be satisfied
+ * @param pathToData path to data which may potentially be moved to trash
+ * @param objectName db.table, or db.table.part
+ * @param ifPurge if PURGE options is specified
+ */
+ private void checkTrashPurgeCombination(Path pathToData, String objectName, boolean ifPurge,
+ boolean deleteData) throws MetaException {
+ // There is no need to check TrashPurgeCombination in following cases since Purge/Trash
+ // is not applicable:
+ // a) deleteData is false -- drop an external table
+ // b) pathToData is null -- a view
+ // c) ifPurge is true -- force delete without Trash
+ if (!deleteData || pathToData == null || ifPurge) {
+ return;
+ }
+
+ boolean trashEnabled = false;
+ try {
+ trashEnabled = 0 < hiveConf.getFloat("fs.trash.interval", -1);
+ } catch(NumberFormatException ex) {
+ // nothing to do
+ }
+
+ if (trashEnabled) {
+ try {
+ HadoopShims.HdfsEncryptionShim shim =
+ ShimLoader.getHadoopShims().createHdfsEncryptionShim(FileSystem.get(hiveConf), hiveConf);
+ if (shim.isPathEncrypted(pathToData)) {
+ throw new MetaException("Unable to drop " + objectName + " because it is in an encryption zone" +
+ " and trash is enabled. Use PURGE option to skip trash.");
+ }
+ } catch (IOException ex) {
+ MetaException e = new MetaException(ex.getMessage());
+ e.initCause(ex);
+ throw e;
+ }
+ }
+ }
+
+ /**
* Deletes the data in a table's location, if it fails logs an error
*
* @param tablePath
@@ -1869,151 +1883,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
- private void updateStatsForTruncate(Map<String,String> props, EnvironmentContext environmentContext) {
- if (null == props) {
- return;
- }
- for (String stat : StatsSetupConst.supportedStats) {
- String statVal = props.get(stat);
- if (statVal != null) {
- //In the case of truncate table, we set the stats to be 0.
- props.put(stat, "0");
- }
- }
- //first set basic stats to true
- StatsSetupConst.setBasicStatsState(props, StatsSetupConst.TRUE);
- environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
- //then invalidate column stats
- StatsSetupConst.clearColumnStatsState(props);
- return;
- }
-
- private void alterPartitionForTruncate(final RawStore ms,
- final String dbName,
- final String tableName,
- final Table table,
- final Partition partition) throws Exception {
- EnvironmentContext environmentContext = new EnvironmentContext();
- updateStatsForTruncate(partition.getParameters(), environmentContext);
-
- if (!transactionalListeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.ALTER_PARTITION,
- new AlterPartitionEvent(partition, partition, table, true, true, this));
- }
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ALTER_PARTITION,
- new AlterPartitionEvent(partition, partition, table, true, true, this));
- }
-
- alterHandler.alterPartition(ms, wh, dbName, tableName, null, partition, environmentContext, this);
- }
-
- private void alterTableStatsForTruncate(final RawStore ms,
- final String dbName,
- final String tableName,
- final Table table,
- final List<String> partNames) throws Exception {
- if (partNames == null) {
- if (0 != table.getPartitionKeysSize()) {
- for (Partition partition : ms.getPartitions(dbName, tableName, Integer.MAX_VALUE)) {
- alterPartitionForTruncate(ms, dbName, tableName, table, partition);
- }
- } else {
- EnvironmentContext environmentContext = new EnvironmentContext();
- updateStatsForTruncate(table.getParameters(), environmentContext);
-
- if (!transactionalListeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.ALTER_TABLE,
- new AlterTableEvent(table, table, true, true, this));
- }
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ALTER_TABLE,
- new AlterTableEvent(table, table, true, true, this));
- }
-
- alterHandler.alterTable(ms, wh, dbName, tableName, table, environmentContext, this);
- }
- } else {
- for (Partition partition : ms.getPartitionsByNames(dbName, tableName, partNames)) {
- alterPartitionForTruncate(ms, dbName, tableName, table, partition);
- }
- }
- return;
- }
-
- private List<Path> getLocationsForTruncate(final RawStore ms,
- final String dbName,
- final String tableName,
- final Table table,
- final List<String> partNames) throws Exception {
- List<Path> locations = new ArrayList<Path>();
- if (partNames == null) {
- if (0 != table.getPartitionKeysSize()) {
- for (Partition partition : ms.getPartitions(dbName, tableName, Integer.MAX_VALUE)) {
- locations.add(new Path(partition.getSd().getLocation()));
- }
- } else {
- locations.add(new Path(table.getSd().getLocation()));
- }
- } else {
- for (Partition partition : ms.getPartitionsByNames(dbName, tableName, partNames)) {
- locations.add(new Path(partition.getSd().getLocation()));
- }
- }
- return locations;
- }
-
- @Override
- public void truncate_table(final String dbName, final String tableName, List<String> partNames)
- throws NoSuchObjectException, MetaException {
- try {
- Table tbl = get_table_core(dbName, tableName);
- boolean isAutopurge = (tbl.isSetParameters() && "true".equalsIgnoreCase(tbl.getParameters().get("auto.purge")));
-
- // This is not transactional
- for (Path location : getLocationsForTruncate(getMS(), dbName, tableName, tbl, partNames)) {
- FileSystem fs = location.getFileSystem(getHiveConf());
- HadoopShims.HdfsEncryptionShim shim
- = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, getHiveConf());
- if (!shim.isPathEncrypted(location)) {
- HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getHiveConf(), fs, location);
- FileStatus targetStatus = fs.getFileStatus(location);
- String targetGroup = targetStatus == null ? null : targetStatus.getGroup();
- wh.deleteDir(location, true, isAutopurge);
- fs.mkdirs(location);
- HdfsUtils.setFullFileStatus(getHiveConf(), status, targetGroup, fs, location, false);
- } else {
- FileStatus[] statuses = fs.listStatus(location, FileUtils.HIDDEN_FILES_PATH_FILTER);
- if (statuses == null || statuses.length == 0) {
- continue;
- }
- for (final FileStatus status : statuses) {
- wh.deleteDir(status.getPath(), true, isAutopurge);
- }
- }
- }
-
- // Alter the table/partition stats and also notify truncate table event
- alterTableStatsForTruncate(getMS(), dbName, tableName, tbl, partNames);
- } catch (IOException e) {
- throw new MetaException(e.getMessage());
- } catch (Exception e) {
- if (e instanceof MetaException) {
- throw (MetaException) e;
- } else if (e instanceof NoSuchObjectException) {
- throw (NoSuchObjectException) e;
- } else {
- throw newMetaException(e);
- }
- }
- }
-
/**
* Is this an external table?
*
@@ -2266,7 +2135,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean success = false, madeDir = false;
Path partLocation = null;
Table tbl = null;
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
part.setDbName(dbName);
@@ -2305,7 +2173,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
if (!wh.isDir(partLocation)) {
- if (!wh.mkdirs(partLocation)) {
+ if (!wh.mkdirs(partLocation, true)) {
throw new MetaException(partLocation
+ " is not a directory or unable to create one");
}
@@ -2323,12 +2191,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
if (ms.addPartition(part)) {
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, part, true, this),
- envContext);
+ if (transactionalListeners.size() > 0) {
+ AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, part, true, this);
+ addPartitionEvent.setEnvironmentContext(envContext);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onAddPartition(addPartitionEvent);
+ }
}
success = ms.commitTransaction();
@@ -2341,12 +2209,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, part, success, this),
- envContext,
- transactionalListenerResponses);
+ for (MetaStoreEventListener listener : listeners) {
+ AddPartitionEvent addPartitionEvent =
+ new AddPartitionEvent(tbl, part, success, this);
+ addPartitionEvent.setEnvironmentContext(envContext);
+ listener.onAddPartition(addPartitionEvent);
}
}
return part;
@@ -2491,10 +2358,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
final Map<PartValEqWrapper, Boolean> addedPartitions =
Collections.synchronizedMap(new HashMap<PartValEqWrapper, Boolean>());
final List<Partition> newParts = new ArrayList<Partition>();
- final List<Partition> existingParts = new ArrayList<Partition>();
+ final List<Partition> existingParts = new ArrayList<Partition>();;
Table tbl = null;
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
-
try {
ms.openTransaction();
tbl = ms.getTable(dbName, tblName);
@@ -2580,13 +2445,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
success = false;
// Notification is generated for newly created partitions only. The subset of partitions
// that already exist (existingParts), will not generate notifications.
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, newParts, true, this));
- }
-
+ fireMetaStoreAddPartitionEventTransactional(tbl, newParts, null, true);
success = ms.commitTransaction();
} finally {
if (!success) {
@@ -2597,26 +2456,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true);
}
}
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, parts, false, this));
- }
+ fireMetaStoreAddPartitionEvent(tbl, parts, null, false);
} else {
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, newParts, true, this),
- null,
- transactionalListenerResponses);
-
- if (!existingParts.isEmpty()) {
- // The request has succeeded but we failed to add these partitions.
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, existingParts, false, this));
- }
+ fireMetaStoreAddPartitionEvent(tbl, newParts, null, true);
+ if (existingParts != null) {
+ // The request has succeeded but we failed to add these partitions.
+ fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false);
}
}
}
@@ -2703,7 +2548,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy
.getPartitionIterator();
Table tbl = null;
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
tbl = ms.getTable(dbName, tblName);
@@ -2777,14 +2621,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists);
//setting success to false to make sure that if the listener fails, rollback happens.
success = false;
-
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, partitionSpecProxy, true, this));
- }
-
+ fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, null, true);
success = ms.commitTransaction();
return addedPartitions.size();
} finally {
@@ -2797,14 +2634,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
}
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, partitionSpecProxy, true, this),
- null,
- transactionalListenerResponses);
- }
+ fireMetaStoreAddPartitionEvent(tbl, partitionSpecProxy, null, true);
}
}
@@ -2856,7 +2686,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// mkdirs() because if the file system is read-only, mkdirs will
// throw an exception even if the directory already exists.
if (!wh.isDir(partLocation)) {
- if (!wh.mkdirs(partLocation)) {
+ if (!wh.mkdirs(partLocation, true)) {
throw new MetaException(partLocation
+ " is not a directory or unable to create one");
}
@@ -2909,7 +2739,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
boolean success = false;
Table tbl = null;
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
tbl = ms.getTable(part.getDbName(), part.getTableName());
@@ -2934,16 +2763,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Setting success to false to make sure that if the listener fails, rollback happens.
success = false;
-
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, Arrays.asList(part), true, this),
- envContext);
-
- }
-
+ fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), envContext, true);
// we proceed only if we'd actually succeeded anyway, otherwise,
// we'd have thrown an exception
success = ms.commitTransaction();
@@ -2951,19 +2771,64 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!success) {
ms.rollbackTransaction();
}
+ fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success);
+ }
+ return part;
+ }
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(tbl, Arrays.asList(part), success, this),
- envContext,
- transactionalListenerResponses);
+ private void fireMetaStoreAddPartitionEvent(final Table tbl,
+ final List<Partition> parts, final EnvironmentContext envContext, boolean success)
+ throws MetaException {
+ if (tbl != null && parts != null && !parts.isEmpty()) {
+ AddPartitionEvent addPartitionEvent =
+ new AddPartitionEvent(tbl, parts, success, this);
+ addPartitionEvent.setEnvironmentContext(envContext);
+ for (MetaStoreEventListener listener : listeners) {
+ listener.onAddPartition(addPartitionEvent);
+ }
+ }
+ }
+ private void fireMetaStoreAddPartitionEvent(final Table tbl,
+ final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success)
+ throws MetaException {
+ if (tbl != null && partitionSpec != null) {
+ AddPartitionEvent addPartitionEvent =
+ new AddPartitionEvent(tbl, partitionSpec, success, this);
+ addPartitionEvent.setEnvironmentContext(envContext);
+ for (MetaStoreEventListener listener : listeners) {
+ listener.onAddPartition(addPartitionEvent);
}
}
- return part;
}
+ private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
+ final List<Partition> parts, final EnvironmentContext envContext, boolean success)
+ throws MetaException {
+ if (tbl != null && parts != null && !parts.isEmpty()) {
+ AddPartitionEvent addPartitionEvent =
+ new AddPartitionEvent(tbl, parts, success, this);
+ addPartitionEvent.setEnvironmentContext(envContext);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onAddPartition(addPartitionEvent);
+ }
+ }
+ }
+
+ private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
+ final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success)
+ throws MetaException {
+ if (tbl != null && partitionSpec != null) {
+ AddPartitionEvent addPartitionEvent =
+ new AddPartitionEvent(tbl, partitionSpec, success, this);
+ addPartitionEvent.setEnvironmentContext(envContext);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onAddPartition(addPartitionEvent);
+ }
+ }
+ }
+
+
@Override
public Partition add_partition(final Partition part)
throws InvalidObjectException, AlreadyExistsException, MetaException {
@@ -3046,11 +2911,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Path destPath = new Path(destinationTable.getSd().getLocation(),
Warehouse.makePartName(partitionKeysPresent, partValsPresent));
List<Partition> destPartitions = new ArrayList<Partition>();
-
- Map<String, String> transactionalListenerResponsesForAddPartition = Collections.emptyMap();
- List<Map<String, String>> transactionalListenerResponsesForDropPartition =
- Lists.newArrayListWithCapacity(partitionsToExchange.size());
-
try {
for (Partition partition: partitionsToExchange) {
Partition destPartition = new Partition(partition);
@@ -3066,7 +2926,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
Path destParentPath = destPath.getParent();
if (!wh.isDir(destParentPath)) {
- if (!wh.mkdirs(destParentPath)) {
+ if (!wh.mkdirs(destParentPath, true)) {
throw new MetaException("Unable to create path " + destParentPath);
}
}
@@ -3078,22 +2938,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Setting success to false to make sure that if the listener fails, rollback happens.
success = false;
-
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponsesForAddPartition =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.ADD_PARTITION,
- new AddPartitionEvent(destinationTable, destPartitions, true, this));
-
- for (Partition partition : partitionsToExchange) {
- DropPartitionEvent dropPartitionEvent =
- new DropPartitionEvent(sourceTable, partition, true, true, this);
- transactionalListenerResponsesForDropPartition.add(
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.DROP_PARTITION,
- dropPartitionEvent));
- }
- }
+ fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange,
+ destinationTable, destPartitions, transactionalListeners, true);
success = ms.commitTransaction();
return destPartitions;
@@ -3103,31 +2949,34 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (pathCreated) {
wh.renameDir(destPath, sourcePath);
}
+
+ fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange,
+ destinationTable, destPartitions, listeners, success);
}
+ }
+ }
- if (!listeners.isEmpty()) {
- AddPartitionEvent addPartitionEvent = new AddPartitionEvent(destinationTable, destPartitions, success, this);
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ADD_PARTITION,
- addPartitionEvent,
- null,
- transactionalListenerResponsesForAddPartition);
+ private void fireMetaStoreExchangePartitionEvent(Table sourceTable,
+ List<Partition> partitionsToExchange, Table destinationTable,
+ List<Partition> destPartitions,
+ List<MetaStoreEventListener> eventListeners,
+ boolean status) throws MetaException {
+ if (sourceTable != null && destinationTable != null
+ && !CollectionUtils.isEmpty(partitionsToExchange)
+ && !CollectionUtils.isEmpty(destPartitions)) {
+ if (eventListeners.size() > 0) {
+ AddPartitionEvent addPartitionEvent =
+ new AddPartitionEvent(destinationTable, destPartitions, status, this);
+ for (MetaStoreEventListener eventListener : eventListeners) {
+ eventListener.onAddPartition(addPartitionEvent);
+ }
- i = 0;
for (Partition partition : partitionsToExchange) {
DropPartitionEvent dropPartitionEvent =
- new DropPartitionEvent(sourceTable, partition, success, true, this);
- Map<String, String> parameters =
- (transactionalListenerResponsesForDropPartition.size() > i)
- ? transactionalListenerResponsesForDropPartition.get(i)
- : null;
-
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.DROP_PARTITION,
- dropPartitionEvent,
- null,
- parameters);
- i++;
+ new DropPartitionEvent(sourceTable, partition, true, status, this);
+ for (MetaStoreEventListener eventListener : eventListeners) {
+ eventListener.onDropPartition(dropPartitionEvent);
+ }
}
}
}
@@ -3145,7 +2994,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Path archiveParentDir = null;
boolean mustPurge = false;
boolean isExternalTbl = false;
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
@@ -3164,23 +3012,27 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (isArchived) {
archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
verifyIsWritablePath(archiveParentDir);
+ checkTrashPurgeCombination(archiveParentDir, db_name + "." + tbl_name + "." + part_vals,
+ mustPurge, deleteData && !isExternalTbl);
}
if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
partPath = new Path(part.getSd().getLocation());
verifyIsWritablePath(partPath);
+ checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." + part_vals,
+ mustPurge, deleteData && !isExternalTbl);
}
if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
throw new MetaException("Unable to drop partition");
} else {
- if (!transactionalListeners.isEmpty()) {
-
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.DROP_PARTITION,
- new DropPartitionEvent(tbl, part, true, deleteData, this),
- envContext);
+ if (transactionalListeners.size() > 0) {
+ DropPartitionEvent dropPartitionEvent =
+ new DropPartitionEvent(tbl, part, true, deleteData, this);
+ dropPartitionEvent.setEnvironmentContext(envContext);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onDropPartition(dropPartitionEvent);
+ }
}
success = ms.commitTransaction();
}
@@ -3208,12 +3060,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// ok even if the data is not deleted
}
}
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.DROP_PARTITION,
- new DropPartitionEvent(tbl, part, success, deleteData, this),
- envContext,
- transactionalListenerResponses);
+ for (MetaStoreEventListener listener : listeners) {
+ DropPartitionEvent dropPartitionEvent =
+ new DropPartitionEvent(tbl, part, success, deleteData, this);
+ dropPartitionEvent.setEnvironmentContext(envContext);
+ listener.onDropPartition(dropPartitionEvent);
}
}
return true;
@@ -3275,8 +3126,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
List<Partition> parts = null;
boolean mustPurge = false;
boolean isExternalTbl = false;
- List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList();
-
try {
// We need Partition-s for firing events and for result; DN needs MPartition-s to drop.
// Great... Maybe we could bypass fetching MPartitions by issuing direct SQL deletes.
@@ -3346,23 +3195,28 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (MetaStoreUtils.isArchived(part)) {
Path archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
verifyIsWritablePath(archiveParentDir);
+ checkTrashPurgeCombination(archiveParentDir, dbName + "." + tblName + "." +
+ part.getValues(), mustPurge, deleteData && !isExternalTbl);
archToDelete.add(archiveParentDir);
}
if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
Path partPath = new Path(part.getSd().getLocation());
verifyIsWritablePath(partPath);
+ checkTrashPurgeCombination(partPath, dbName + "." + tblName + "." + part.getValues(),
+ mustPurge, deleteData && !isExternalTbl);
dirsToDelete.add(new PathAndPartValSize(partPath, part.getValues().size()));
}
}
ms.dropPartitions(dbName, tblName, partNames);
- if (parts != null && !transactionalListeners.isEmpty()) {
+ if (parts != null && transactionalListeners.size() > 0) {
for (Partition part : parts) {
- transactionalListenerResponses.add(
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.DROP_PARTITION,
- new DropPartitionEvent(tbl, part, true, deleteData, this),
- envContext));
+ DropPartitionEvent dropPartitionEvent =
+ new DropPartitionEvent(tbl, part, true, deleteData, this);
+ dropPartitionEvent.setEnvironmentContext(envContext);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onDropPartition(dropPartitionEvent);
+ }
}
}
@@ -3396,19 +3250,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
if (parts != null) {
- int i = 0;
- if (parts != null && !listeners.isEmpty()) {
- for (Partition part : parts) {
- Map<String, String> parameters =
- (!transactionalListenerResponses.isEmpty()) ? transactionalListenerResponses.get(i) : null;
-
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.DROP_PARTITION,
- new DropPartitionEvent(tbl, part, success, deleteData, this),
- envContext,
- parameters);
-
- i++;
+ for (Partition part : parts) {
+ for (MetaStoreEventListener listener : listeners) {
+ DropPartitionEvent dropPartitionEvent =
+ new DropPartitionEvent(tbl, part, success, deleteData, this);
+ dropPartitionEvent.setEnvironmentContext(envContext);
+ listener.onDropPartition(dropPartitionEvent);
}
}
}
@@ -3588,8 +3435,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
int partitionRequest = (maxToFetch < 0) ? numPartitions : maxToFetch;
if (partitionRequest > partitionLimit) {
String configName = ConfVars.METASTORE_LIMIT_PARTITION_REQUEST.varname;
- throw new MetaException(String.format(PARTITION_NUMBER_EXCEED_LIMIT_MSG, partitionRequest,
- tblName, partitionLimit, configName));
+ throw new MetaException(String.format("Number of partitions scanned (=%d) on table '%s' exceeds limit" +
+ " (=%d). This is controlled on the metastore server by %s.", partitionRequest, tblName, partitionLimit, configName));
}
}
}
@@ -3831,15 +3678,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Only fetch the table if we actually have a listener
Table table = null;
- if (!listeners.isEmpty()) {
+ for (MetaStoreEventListener listener : listeners) {
if (table == null) {
table = getMS().getTable(db_name, tbl_name);
}
-
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ALTER_PARTITION,
- new AlterPartitionEvent(oldPart, new_part, table, false, true, this),
- envContext);
+ AlterPartitionEvent alterPartitionEvent =
+ new AlterPartitionEvent(oldPart, new_part, table, true, this);
+ alterPartitionEvent.setEnvironmentContext(envContext);
+ listener.onAlterPartition(alterPartitionEvent);
}
} catch (InvalidObjectException e) {
ex = e;
@@ -3903,15 +3749,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
else {
throw new InvalidOperationException("failed to alterpartitions");
}
-
- if (table == null) {
- table = getMS().getTable(db_name, tbl_name);
- }
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ALTER_PARTITION,
- new AlterPartitionEvent(oldTmpPart, tmpPart, table, false, true, this));
+ for (MetaStoreEventListener listener : listeners) {
+ if (table == null) {
+ table = getMS().getTable(db_name, tbl_name);
+ }
+ AlterPartitionEvent alterPartitionEvent =
+ new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this);
+ listener.onAlterPartition(alterPartitionEvent);
}
}
} catch (InvalidObjectException e) {
@@ -3948,17 +3792,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Exception ex = null;
Index oldIndex = null;
RawStore ms = getMS();
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
oldIndex = get_index_by_name(dbname, base_table_name, index_name);
firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this));
ms.alterIndex(dbname, base_table_name, index_name, newIndex);
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.ALTER_INDEX,
- new AlterIndexEvent(oldIndex, newIndex, true, this));
+ if (transactionalListeners.size() > 0) {
+ AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onAlterIndex(alterIndexEvent);
+ }
}
success = ms.commitTransaction();
@@ -3980,13 +3823,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
endFunction("alter_index", success, ex, base_table_name);
-
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ALTER_INDEX,
- new AlterIndexEvent(oldIndex, newIndex, success, this),
- null,
- transactionalListenerResponses);
+ for (MetaStoreEventListener listener : listeners) {
+ AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, success, this);
+ listener.onAlterIndex(alterIndexEvent);
}
}
}
@@ -4054,11 +3893,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
alterHandler.alterTable(getMS(), wh, dbname, name, newTable,
envContext, this);
success = true;
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.ALTER_TABLE,
- new AlterTableEvent(oldt, newTable, false, true, this),
- envContext);
+ for (MetaStoreEventListener listener : listeners) {
+ AlterTableEvent alterTableEvent =
+ new AlterTableEvent(oldt, newTable, success, this);
+ alterTableEvent.setEnvironmentContext(envContext);
+ listener.onAlterTable(alterTableEvent);
}
} catch (NoSuchObjectException e) {
// thrown when the table to be altered does not exist
@@ -4626,7 +4465,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean success = false, indexTableCreated = false;
String[] qualified =
MetaStoreUtils.getQualifiedName(index.getDbName(), index.getIndexTableName());
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
firePreEvent(new PreAddIndexEvent(index, this));
@@ -4664,11 +4502,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
index.setCreateTime((int) time);
index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
if (ms.addIndex(index)) {
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.CREATE_INDEX,
- new AddIndexEvent(index, true, this));
+ if (transactionalListeners.size() > 0) {
+ AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onAddIndex(addIndexEvent);
+ }
}
}
@@ -4685,12 +4523,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.rollbackTransaction();
}
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.CREATE_INDEX,
- new AddIndexEvent(index, success, this),
- null,
- transactionalListenerResponses);
+ for (MetaStoreEventListener listener : listeners) {
+ AddIndexEvent addIndexEvent = new AddIndexEvent(index, success, this);
+ listener.onAddIndex(addIndexEvent);
}
}
}
@@ -4728,7 +4563,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Index index = null;
Path tblPath = null;
List<Path> partPaths = null;
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
// drop the underlying index table
@@ -4761,11 +4595,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.DROP_INDEX,
- new DropIndexEvent(index, true, this));
+ if (transactionalListeners.size() > 0) {
+ DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onDropIndex(dropIndexEvent);
+ }
}
success = ms.commitTransaction();
@@ -4778,12 +4612,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// ok even if the data is not deleted
}
// Skip the event listeners if the index is NULL
- if (index != null && !listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.DROP_INDEX,
- new DropIndexEvent(index, success, this),
- null,
- transactionalListenerResponses);
+ if (index != null) {
+ for (MetaStoreEventListener listener : listeners) {
+ DropIndexEvent dropIndexEvent = new DropIndexEvent(index, success, this);
+ listener.onDropIndex(dropIndexEvent);
+ }
}
}
return success;
@@ -6219,7 +6052,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
validateFunctionInfo(func);
boolean success = false;
RawStore ms = getMS();
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
Database db = ms.getDatabase(func.getDbName());
@@ -6236,11 +6068,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
long time = System.currentTimeMillis() / 1000;
func.setCreateTime((int) time);
ms.createFunction(func);
- if (!transactionalListeners.isEmpty()) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.CREATE_FUNCTION,
- new CreateFunctionEvent(func, true, this));
+ if (transactionalListeners.size() > 0) {
+ CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, true, this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onCreateFunction(createFunctionEvent);
+ }
}
success = ms.commitTransaction();
@@ -6249,12 +6081,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.rollbackTransaction();
}
- if (!listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.CREATE_FUNCTION,
- new CreateFunctionEvent(func, success, this),
- null,
- transactionalListenerResponses);
+ if (listeners.size() > 0) {
+ CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, success, this);
+ for (MetaStoreEventListener listener : listeners) {
+ listener.onCreateFunction(createFunctionEvent);
+ }
}
}
}
@@ -6266,7 +6097,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean success = false;
Function func = null;
RawStore ms = getMS();
- Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
func = ms.getFunction(dbName, funcName);
@@ -6276,10 +6106,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.dropFunction(dbName, funcName);
if (transactionalListeners.size() > 0) {
- transactionalListenerResponses =
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.DROP_FUNCTION,
- new DropFunctionEvent(func, true, this));
+ DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, true, this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onDropFunction(dropFunctionEvent);
+ }
}
success = ms.commitTransaction();
@@ -6289,11 +6119,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
if (listeners.size() > 0) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.DROP_FUNCTION,
- new DropFunctionEvent(func, success, this),
- null,
- transactionalListenerResponses);
+ DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, success, this);
+ for (MetaStoreEventListener listener : listeners) {
+ listener.onDropFunction(dropFunctionEvent);
+ }
}
}
}
@@ -6645,13 +6474,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
InsertEvent event =
new InsertEvent(rqst.getDbName(), rqst.getTableName(), rqst.getPartitionVals(), rqst
.getData().getInsertData(), rqst.isSuccessful(), this);
+ for (MetaStoreEventListener transactionalListener : transactionalListeners) {
+ transactionalListener.onInsert(event);
+ }
- /*
- * The transactional listener response will be set already on the event, so there is not need
- * to pass the response to the non-transactional listener.
- */
- MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.INSERT, event);
- MetaStoreListenerNotifier.notifyEvent(listeners, EventType.INSERT, event);
+ for (MetaStoreEventListener listener : listeners) {
+ listener.onInsert(event);
+ }
return new FireEventResponse();
@@ -7405,9 +7234,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ServerMode.METASTORE);
saslServer.setSecretManager(delegationTokenManager.getSecretManager());
transFactory = saslServer.createTransportFactory(
- MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
+ MetaStoreUtils.getMetaStoreSaslProperties(conf));
processor = saslServer.wrapProcessor(
new ThriftHiveMetastore.Processor<IHMSHandler>(handler));
+ serverSocket = HiveAuthUtils.getServerSocket(null, port);
LOG.info("Starting DB backed MetaStore Server in Secure Mode");
} else {
@@ -7426,27 +7256,25 @@ public class HiveMetaStore extends ThriftHiveMetastore {
processor = new TSetIpAddressProcessor<IHMSHandler>(handler);
LOG.info("Starting DB backed MetaStore Server");
}
- }
-
- if (!useSSL) {
- serverSocket = HiveAuthUtils.getServerSocket(null, port);
- } else {
- String keyStorePath = conf.getVar(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH).trim();
- if (keyStorePath.isEmpty()) {
- throw new IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH.varname
- + " Not configured for SSL connection");
- }
- String keyStorePassword = ShimLoader.getHadoopShims().getPassword(conf,
- HiveConf.ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname);
// enable SSL support for HMS
List<String> sslVersionBlacklist = new ArrayList<String>();
for (String sslVersion : conf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
sslVersionBlacklist.add(sslVersion);
}
-
- serverSocket = HiveAuthUtils.getServerSSLSocket(null, port, keyStorePath,
- keyStorePassword, sslVersionBlacklist);
+ if (!useSSL) {
+ serverSocket = HiveAuthUtils.getServerSocket(null, port);
+ } else {
+ String keyStorePath = conf.getVar(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH).trim();
+ if (keyStorePath.isEmpty()) {
+ throw new IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname
+ + " Not configured for SSL connection");
+ }
+ String keyStorePassword = ShimLoader.getHadoopShims().getPassword(conf,
+ HiveConf.ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname);
+ serverSocket = HiveAuthUtils.getServerSSLSocket(null, port, keyStorePath,
+ keyStorePassword, sslVersionBlacklist);
+ }
}
if (tcpKeepAlive) {
@@ -7508,7 +7336,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
HMSHandler.LOG.info("Options.maxWorkerThreads = "
+ maxWorkerThreads);
HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive);
- HMSHandler.LOG.info("Enable SSL = " + useSSL);
if (startLock != null) {
signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing);
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 4912a31..b0b009a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -395,29 +395,6 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
LOG.info("Trying to connect to metastore with URI " + store);
try {
- if (useSSL) {
- try {
- String trustStorePath = conf.getVar(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH).trim();
- if (trustStorePath.isEmpty()) {
- throw new IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH.varname
- + " Not configured for SSL connection");
- }
- String trustStorePassword = ShimLoader.getHadoopShims().getPassword(conf,
- HiveConf.ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PASSWORD.varname);
-
- // Create an SSL socket and connect
- transport = HiveAuthUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout, trustStorePath, trustStorePassword );
- LOG.info("Opened an SSL connection to metastore, current connections: " + connCount.incrementAndGet());
- } catch(IOException e) {
- throw new IllegalArgumentException(e);
- } catch(TTransportException e) {
- tte = e;
- throw new MetaException(e.toString());
- }
- } else {
- transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
- }
-
if (useSasl) {
// Wrap thrift connection with SASL for secure connection.
try {
@@ -432,24 +409,48 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
String tokenSig = conf.getVar(ConfVars.METASTORE_TOKEN_SIGNATURE);
// tokenSig could be null
tokenStrForm = Utils.getTokenStrForm(tokenSig);
+ transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
if(tokenStrForm != null) {
// authenticate using delegation tokens via the "DIGEST" mechanism
transport = authBridge.createClientTransport(null, store.getHost(),
"DIGEST", tokenStrForm, transport,
- MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
+ MetaStoreUtils.getMetaStoreSaslProperties(conf));
} else {
String principalConfig =
conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL);
transport = authBridge.createClientTransport(
principalConfig, store.getHost(), "KERBEROS", null,
- transport, MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
+ transport, MetaStoreUtils.getMetaStoreSaslProperties(conf));
}
} catch (IOException ioe) {
LOG.error("Couldn't create client transport", ioe);
throw new MetaException(ioe.toString());
}
} else {
+ if (useSSL) {
+ try {
+ String trustStorePath = conf.getVar(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH).trim();
+ if (trustStorePath.isEmpty()) {
+ throw new IllegalArgumentException(ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PATH.varname
+ + " Not configured for SSL connection");
+ }
+ String trustStorePassword = ShimLoader.getHadoopShims().getPassword(conf,
+ HiveConf.ConfVars.HIVE_METASTORE_SSL_TRUSTSTORE_PASSWORD.varname);
+
+ // Create an SSL socket and connect
+ transport = HiveAuthUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout, trustStorePath, trustStorePassword );
+ LOG.info("Opened an SSL connection to metastore, current connections: " + connCount.incrementAndGet());
+ } catch(IOException e) {
+ throw new IllegalArgumentException(e);
+ } catch(TTransportException e) {
+ tte = e;
+ throw new MetaException(e.toString());
+ }
+ } else {
+ transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
+ }
+
if (useFramedTransport) {
transport = new TFramedTransport(transport);
}
@@ -1096,23 +1097,6 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
}
/**
- * Truncate the table/partitions in the DEFAULT database.
- * @param dbName
- * The db to which the table to be truncate belongs to
- * @param tableName
- * The table to truncate
- * @param partNames
- * List of partitions to truncate. NULL will truncate the whole table/all partitions
- * @throws MetaException
- * @throws TException
- * Could not truncate table properly.
- */
- @Override
- public void truncateTable(String dbName, String tableName, List<String> partNames) throws MetaException, TException {
- client.truncate_table(dbName, tableName, partNames);
- }
-
- /**
* @param type
* @return true if the type is dropped
* @throws MetaException
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java
index b7d7b50..df698c8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreFsImpl.java
@@ -25,23 +25,35 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hive.metastore.api.MetaException;
public class HiveMetaStoreFsImpl implements MetaStoreFS {
public static final Logger LOG = LoggerFactory
- .getLogger("hive.metastore.hivemetastoreFsimpl");
+ .getLogger("hive.metastore.hivemetastoressimpl");
@Override
public boolean deleteDir(FileSystem fs, Path f, boolean recursive,
boolean ifPurge, Configuration conf) throws MetaException {
+ LOG.debug("deleting " + f);
+
try {
- FileUtils.moveToTrash(fs, f, conf, ifPurge);
+ if (ifPurge) {
+ LOG.info("Not moving "+ f +" to trash");
+ } else if (Trash.moveToAppropriateTrash(fs, f, conf)) {
+ LOG.info("Moved to trash: " + f);
+ return true;
+ }
+
+ if (fs.delete(f, true)) {
+ LOG.debug("Deleted the diretory " + f);
+ return true;
+ }
+
if (fs.exists(f)) {
throw new MetaException("Unable to delete directory: " + f);
}
- return true;
} catch (FileNotFoundException e) {
return true; // ok even if there is not data
} catch (Exception e) {
@@ -49,4 +61,5 @@ public class HiveMetaStoreFsImpl implements MetaStoreFS {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 82db281..d567258 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -305,20 +305,6 @@ public interface IMetaStoreClient {
void dropTable(String dbname, String tableName)
throws MetaException, TException, NoSuchObjectException;
- /**
- * Truncate the table/partitions in the DEFAULT database.
- * @param dbName
- * The db to which the table to be truncate belongs to
- * @param tableName
- * The table to truncate
- * @param partNames
- * List of partitions to truncate. NULL will truncate the whole table/all partitions
- * @throws MetaException
- * @throws TException
- * Could not truncate table properly.
- */
- void truncateTable(String dbName, String tableName, List<String> partNames) throws MetaException, TException;
-
boolean tableExists(String databaseName, String tableName) throws MetaException,
TException, UnknownDBException;