You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2018/12/18 08:18:09 UTC
[2/3] hive git commit: HIVE-18661 : CachedStore: Use metastore
notification log events to update cache. (Mahesh Kumar Behera,
reviewed by Daniel Dai)
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index cb899d7..9e4f3c2 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1135,6 +1135,7 @@ struct CreationMetadata {
struct NotificationEventRequest {
1: required i64 lastEvent,
2: optional i32 maxEvents,
+ 3: optional list<string> eventTypeSkipList,
}
struct NotificationEvent {
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 0ea46f8..617c7bc 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -274,7 +274,7 @@ public class HiveAlterHandler implements AlterHandler {
part.setDbName(newDbName);
part.setTableName(newTblName);
ColumnStatistics colStats = updateOrGetPartitionColumnStats(msdb, catName, dbname, name,
- part.getValues(), part.getSd().getCols(), oldt, part, null);
+ part.getValues(), part.getSd().getCols(), oldt, part, null, null);
if (colStats != null) {
columnStatsNeedUpdated.put(part, colStats);
}
@@ -312,7 +312,7 @@ public class HiveAlterHandler implements AlterHandler {
}
} else {
alterTableUpdateTableColumnStats(
- msdb, oldt, newt, environmentContext, writeIdList);
+ msdb, oldt, newt, environmentContext, writeIdList, conf, null);
}
} else {
// operations other than table rename
@@ -332,7 +332,7 @@ public class HiveAlterHandler implements AlterHandler {
List<FieldSchema> oldCols = part.getSd().getCols();
part.getSd().setCols(newt.getSd().getCols());
ColumnStatistics colStats = updateOrGetPartitionColumnStats(msdb, catName, dbname, name,
- part.getValues(), oldCols, oldt, part, null);
+ part.getValues(), oldCols, oldt, part, null, null);
assert(colStats == null);
if (cascade) {
msdb.alterPartition(
@@ -349,11 +349,11 @@ public class HiveAlterHandler implements AlterHandler {
} else {
LOG.warn("Alter table not cascaded to partitions.");
alterTableUpdateTableColumnStats(
- msdb, oldt, newt, environmentContext, writeIdList);
+ msdb, oldt, newt, environmentContext, writeIdList, conf, null);
}
} else {
alterTableUpdateTableColumnStats(
- msdb, oldt, newt, environmentContext, writeIdList);
+ msdb, oldt, newt, environmentContext, writeIdList, conf, null);
}
}
@@ -481,7 +481,7 @@ public class HiveAlterHandler implements AlterHandler {
// PartitionView does not have SD. We do not need update its column stats
if (oldPart.getSd() != null) {
updateOrGetPartitionColumnStats(msdb, catName, dbname, name, new_part.getValues(),
- oldPart.getSd().getCols(), tbl, new_part, null);
+ oldPart.getSd().getCols(), tbl, new_part, null, null);
}
msdb.alterPartition(
catName, dbname, name, new_part.getValues(), new_part, validWriteIds);
@@ -620,7 +620,7 @@ public class HiveAlterHandler implements AlterHandler {
String newPartName = Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues());
ColumnStatistics cs = updateOrGetPartitionColumnStats(msdb, catName, dbname, name, oldPart.getValues(),
- oldPart.getSd().getCols(), tbl, new_part, null);
+ oldPart.getSd().getCols(), tbl, new_part, null, null);
msdb.alterPartition(catName, dbname, name, part_vals, new_part, validWriteIds);
if (cs != null) {
cs.getStatsDesc().setPartName(newPartName);
@@ -727,7 +727,7 @@ public class HiveAlterHandler implements AlterHandler {
// PartitionView does not have SD and we do not need to update its column stats
if (oldTmpPart.getSd() != null) {
updateOrGetPartitionColumnStats(msdb, catName, dbname, name, oldTmpPart.getValues(),
- oldTmpPart.getSd().getCols(), tbl, tmpPart, null);
+ oldTmpPart.getSd().getCols(), tbl, tmpPart, null, null);
}
}
@@ -799,8 +799,8 @@ public class HiveAlterHandler implements AlterHandler {
}
@VisibleForTesting
- void alterTableUpdateTableColumnStats(RawStore msdb, Table oldTable, Table newTable,
- EnvironmentContext ec, String validWriteIds)
+ public static List<ColumnStatisticsObj> alterTableUpdateTableColumnStats(RawStore msdb, Table oldTable, Table newTable,
+ EnvironmentContext ec, String validWriteIds, Configuration conf, List<String> deletedCols)
throws MetaException, InvalidObjectException {
String catName = normalizeIdentifier(oldTable.isSetCatName() ? oldTable.getCatName() :
getDefaultCatalog(conf));
@@ -808,11 +808,13 @@ public class HiveAlterHandler implements AlterHandler {
String tableName = normalizeIdentifier(oldTable.getTableName());
String newDbName = newTable.getDbName().toLowerCase();
String newTableName = normalizeIdentifier(newTable.getTableName());
+ List<ColumnStatisticsObj> newStatsObjs = new ArrayList<>();
+ //if its not called from cahced store then update the table
+ boolean doAlterTable = deletedCols == null;
try {
List<FieldSchema> oldCols = oldTable.getSd().getCols();
List<FieldSchema> newCols = newTable.getSd().getCols();
- List<ColumnStatisticsObj> newStatsObjs = new ArrayList<>();
ColumnStatistics colStats = null;
boolean updateColumnStats = !newDbName.equals(dbName) || !newTableName.equals(tableName)
|| !MetaStoreServerUtils.columnsIncludedByNameType(oldCols, newCols);
@@ -834,7 +836,10 @@ public class HiveAlterHandler implements AlterHandler {
} else {
List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
if (statsObjs != null) {
- List<String> deletedCols = new ArrayList<>();
+ // for out para, this value is initialized by caller.
+ if (deletedCols == null) {
+ deletedCols = new ArrayList<>();
+ }
for (ColumnStatisticsObj statsObj : statsObjs) {
boolean found = false;
for (FieldSchema newCol : newCols) {
@@ -847,28 +852,36 @@ public class HiveAlterHandler implements AlterHandler {
if (found) {
if (!newDbName.equals(dbName) || !newTableName.equals(tableName)) {
- msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName());
+ if (doAlterTable) {
+ msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName());
+ }
newStatsObjs.add(statsObj);
deletedCols.add(statsObj.getColName());
}
} else {
- msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName());
+ if (doAlterTable) {
+ msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName());
+ }
deletedCols.add(statsObj.getColName());
}
}
- StatsSetupConst.removeColumnStatsState(newTable.getParameters(), deletedCols);
+ if (doAlterTable) {
+ StatsSetupConst.removeColumnStatsState(newTable.getParameters(), deletedCols);
+ }
}
}
}
- // Change to new table and append stats for the new table
- msdb.alterTable(catName, dbName, tableName, newTable, validWriteIds);
- if (updateColumnStats && !newStatsObjs.isEmpty()) {
- ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
- statsDesc.setDbName(newDbName);
- statsDesc.setTableName(newTableName);
- colStats.setStatsObj(newStatsObjs);
- msdb.updateTableColumnStatistics(colStats, validWriteIds, newTable.getWriteId());
+ if (doAlterTable) {
+ // Change to new table and append stats for the new table
+ msdb.alterTable(catName, dbName, tableName, newTable, validWriteIds);
+ if (updateColumnStats && !newStatsObjs.isEmpty()) {
+ ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
+ statsDesc.setDbName(newDbName);
+ statsDesc.setTableName(newTableName);
+ colStats.setStatsObj(newStatsObjs);
+ msdb.updateTableColumnStatistics(colStats, validWriteIds, newTable.getWriteId());
+ }
}
} catch (NoSuchObjectException nsoe) {
LOG.debug("Could not find db entry." + nsoe);
@@ -876,13 +889,15 @@ public class HiveAlterHandler implements AlterHandler {
//should not happen since the input were verified before passed in
throw new InvalidObjectException("Invalid inputs to update table column stats: " + e);
}
+ return newStatsObjs;
}
- private ColumnStatistics updateOrGetPartitionColumnStats(
+ public static ColumnStatistics updateOrGetPartitionColumnStats(
RawStore msdb, String catName, String dbname, String tblname, List<String> partVals,
- List<FieldSchema> oldCols, Table table, Partition part, List<FieldSchema> newCols)
+ List<FieldSchema> oldCols, Table table, Partition part, List<FieldSchema> newCols, List<String> deletedCols)
throws MetaException, InvalidObjectException {
ColumnStatistics newPartsColStats = null;
+ boolean updateColumnStats = true;
try {
// if newCols are not specified, use default ones.
if (newCols == null) {
@@ -906,10 +921,17 @@ public class HiveAlterHandler implements AlterHandler {
List<ColumnStatistics> partsColStats = msdb.getPartitionColumnStatistics(catName, dbname, tblname,
oldPartNames, oldColNames);
assert (partsColStats.size() <= 1);
+
+ // for out para, this value is initialized by caller.
+ if (deletedCols == null) {
+ deletedCols = new ArrayList<>();
+ } else {
+ // in case deletedCols is provided by caller, stats will be updated by caller.
+ updateColumnStats = false;
+ }
for (ColumnStatistics partColStats : partsColStats) { //actually only at most one loop
List<ColumnStatisticsObj> newStatsObjs = new ArrayList<>();
List<ColumnStatisticsObj> statsObjs = partColStats.getStatsObj();
- List<String> deletedCols = new ArrayList<>();
for (ColumnStatisticsObj statsObj : statsObjs) {
boolean found =false;
for (FieldSchema newCol : newCols) {
@@ -921,17 +943,25 @@ public class HiveAlterHandler implements AlterHandler {
}
if (found) {
if (rename) {
- msdb.deletePartitionColumnStatistics(catName, dbname, tblname, partColStats.getStatsDesc().getPartName(),
- partVals, statsObj.getColName());
+ if (updateColumnStats) {
+ msdb.deletePartitionColumnStatistics(catName, dbname, tblname,
+ partColStats.getStatsDesc().getPartName(), partVals, statsObj.getColName());
+ } else {
+ deletedCols.add(statsObj.getColName());
+ }
newStatsObjs.add(statsObj);
}
} else {
- msdb.deletePartitionColumnStatistics(catName, dbname, tblname, partColStats.getStatsDesc().getPartName(),
- partVals, statsObj.getColName());
+ if (updateColumnStats) {
+ msdb.deletePartitionColumnStatistics(catName, dbname, tblname, partColStats.getStatsDesc().getPartName(),
+ partVals, statsObj.getColName());
+ }
deletedCols.add(statsObj.getColName());
}
}
- StatsSetupConst.removeColumnStatsState(part.getParameters(), deletedCols);
+ if (updateColumnStats) {
+ StatsSetupConst.removeColumnStatsState(part.getParameters(), deletedCols);
+ }
if (!newStatsObjs.isEmpty()) {
partColStats.setStatsObj(newStatsObjs);
newPartsColStats = partColStats;
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 598847d..0a1b96d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -114,6 +114,7 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
import org.apache.hadoop.hive.metastore.events.PreAddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.PreAlterCatalogEvent;
import org.apache.hadoop.hive.metastore.events.PreAlterDatabaseEvent;
@@ -140,6 +141,9 @@ import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.PreReadISchemaEvent;
import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
import org.apache.hadoop.hive.metastore.events.PreReadhSchemaVersionEvent;
+import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.metrics.JvmPauseMonitor;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
@@ -568,6 +572,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
listeners.add(new HMSMetricsListener(conf));
}
+ boolean canCachedStoreCanUseEvent = false;
+ for (MetaStoreEventListener listener : transactionalListeners) {
+ if (listener.doesAddEventsToNotificationLogTable()) {
+ canCachedStoreCanUseEvent = true;
+ break;
+ }
+ }
+ if (conf.getBoolean(ConfVars.METASTORE_CACHE_CAN_USE_EVENT.getVarname(), false) &&
+ !canCachedStoreCanUseEvent) {
+ throw new MetaException("CahcedStore can not use events for invalidation as there is no " +
+ " TransactionalMetaStoreEventListener to add events to notification table");
+ }
+
endFunctionListeners = MetaStoreServerUtils.getMetaStoreListeners(
MetaStoreEndFunctionListener.class, conf, MetastoreConf.getVar(conf, ConfVars.END_FUNCTION_LISTENERS));
@@ -5786,14 +5803,33 @@ public class HiveMetaStore extends ThriftHiveMetastore {
colStats.getStatsDesc().getCatName(), colStats.getStatsDesc().getDbName(),
colStats.getStatsDesc().getTableName()));
- boolean ret = false;
+ Map<String, String> parameters = null;
+ getMS().openTransaction();
+ boolean committed = false;
try {
- ret = getMS().updateTableColumnStatistics(colStats, validWriteIds, writeId) != null;
+ parameters = getMS().updateTableColumnStatistics(colStats, validWriteIds, writeId);
+ if (parameters != null) {
+ if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.UPDATE_TABLE_COLUMN_STAT,
+ new UpdateTableColumnStatEvent(colStats, parameters, validWriteIds, writeId, this));
+ }
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.UPDATE_TABLE_COLUMN_STAT,
+ new UpdateTableColumnStatEvent(colStats, parameters, validWriteIds, writeId, this));
+ }
+ }
+ committed = getMS().commitTransaction();
} finally {
- endFunction("write_column_statistics", ret != false, null,
+ if (!committed) {
+ getMS().rollbackTransaction();
+ }
+ endFunction("write_column_statistics", parameters != null, null,
colStats.getStatsDesc().getTableName());
}
- return ret;
+
+ return parameters != null;
}
private void normalizeColStatsInput(ColumnStatistics colStats) throws MetaException {
@@ -5826,16 +5862,37 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean ret = false;
+ Map<String, String> parameters;
+ List<String> partVals;
+ boolean committed = false;
+ getMS().openTransaction();
try {
if (tbl == null) {
tbl = getTable(catName, dbName, tableName);
}
- List<String> partVals = getPartValsFromName(tbl, csd.getPartName());
- return getMS().updatePartitionColumnStatistics(
- colStats, partVals, validWriteIds, writeId) != null;
+ partVals = getPartValsFromName(tbl, csd.getPartName());
+ parameters = getMS().updatePartitionColumnStatistics(colStats, partVals, validWriteIds, writeId);
+ if (parameters != null) {
+ if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.UPDATE_PARTITION_COLUMN_STAT,
+ new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, validWriteIds, writeId, this));
+ }
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.UPDATE_PARTITION_COLUMN_STAT,
+ new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, validWriteIds, writeId, this));
+ }
+ }
+ committed = getMS().commitTransaction();
} finally {
+ if (!committed) {
+ getMS().rollbackTransaction();
+ }
endFunction("write_partition_column_statistics", ret != false, null, tableName);
}
+
+ return parameters != null;
}
@Override
@@ -5889,6 +5946,20 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ret = getMS().deletePartitionColumnStatistics(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
convertedPartName, partVals, colName);
+ if (ret) {
+ if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.DELETE_PARTITION_COLUMN_STAT,
+ new DeletePartitionColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
+ convertedPartName, partVals, colName, this));
+ }
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.DELETE_PARTITION_COLUMN_STAT,
+ new DeletePartitionColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
+ convertedPartName, partVals, colName, this));
+ }
+ }
committed = getMS().commitTransaction();
} finally {
if (!committed) {
@@ -5926,6 +5997,20 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
ret = getMS().deleteTableColumnStatistics(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName, colName);
+ if (ret) {
+ if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.DELETE_TABLE_COLUMN_STAT,
+ new DeleteTableColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
+ tableName, colName, this));
+ }
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.DELETE_TABLE_COLUMN_STAT,
+ new DeleteTableColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
+ tableName, colName, this));
+ }
+ }
committed = getMS().commitTransaction();
} finally {
if (!committed) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
index de226bf..0d6d10f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
@@ -56,6 +56,10 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent;
import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
import java.sql.Connection;
@@ -294,6 +298,49 @@ public abstract class MetaStoreEventListener implements Configurable {
throws MetaException {
}
+ /**
+ * This will be called to update table column stats
+ * @param updateTableColumnStatEvent event to be processed
+ * @throws MetaException
+ */
+ public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumnStatEvent)
+ throws MetaException {
+ }
+
+ /**
+ * This will be called to delete table column stats
+ * @param deleteTableColumnStatEvent event to be processed
+ * @throws MetaException
+ */
+ public void onDeleteTableColumnStat(DeleteTableColumnStatEvent deleteTableColumnStatEvent)
+ throws MetaException {
+ }
+
+ /**
+ * This will be called to update partition column stats
+ * @param updatePartColStatEvent event to be processed
+ * @throws MetaException
+ */
+ public void onUpdatePartitionColumnStat(UpdatePartitionColumnStatEvent updatePartColStatEvent)
+ throws MetaException {
+ }
+
+ /**
+ * This will be called to delete partition column stats
+ * @param deletePartColStatEvent event to be processed
+ * @throws MetaException
+ */
+ public void onDeletePartitionColumnStat(DeletePartitionColumnStatEvent deletePartColStatEvent)
+ throws MetaException {
+ }
+
+ /**
+ * This is to check if the listener adds the event info to notification log table.
+ */
+ public boolean doesAddEventsToNotificationLogTable() {
+ return false;
+ }
+
@Override
public Configuration getConf() {
return this.conf;
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
index c296f57..dd82c4b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
@@ -55,6 +55,10 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent;
import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
import java.sql.Connection;
import java.util.List;
@@ -224,6 +228,14 @@ public class MetaStoreListenerNotifier {
(listener, event) -> listener.onAllocWriteId((AllocWriteIdEvent) event, null, null))
.put(EventType.ACID_WRITE,
(listener, event) -> listener.onAcidWrite((AcidWriteEvent) event, null, null))
+ .put(EventType.UPDATE_TABLE_COLUMN_STAT,
+ (listener, event) -> listener.onUpdateTableColumnStat((UpdateTableColumnStatEvent) event))
+ .put(EventType.DELETE_TABLE_COLUMN_STAT,
+ (listener, event) -> listener.onDeleteTableColumnStat((DeleteTableColumnStatEvent) event))
+ .put(EventType.UPDATE_PARTITION_COLUMN_STAT,
+ (listener, event) -> listener.onUpdatePartitionColumnStat((UpdatePartitionColumnStatEvent) event))
+ .put(EventType.DELETE_PARTITION_COLUMN_STAT,
+ (listener, event) -> listener.onDeletePartitionColumnStat((DeletePartitionColumnStatEvent) event))
.build()
);
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 0324a19..d43c0c1 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -4236,7 +4236,7 @@ public class ObjectStore implements RawStore, Configurable {
* Verifies that the stats JSON string is unchanged for alter table (txn stats).
* @return Error message with the details of the change, or null if the value has not changed.
*/
- private static String verifyStatsChangeCtx(Map<String, String> oldP, Map<String, String> newP,
+ public static String verifyStatsChangeCtx(Map<String, String> oldP, Map<String, String> newP,
long writeId, String validWriteIds, boolean isColStatsChange) {
if (validWriteIds != null && writeId > 0) return null; // We have txn context.
String oldVal = oldP == null ? null : oldP.get(StatsSetupConst.COLUMN_STATS_ACCURATE);
@@ -9859,13 +9859,25 @@ public class ObjectStore implements RawStore, Configurable {
try {
openTransaction();
long lastEvent = rqst.getLastEvent();
- query = pm.newQuery(MNotificationLog.class, "eventId > lastEvent");
- query.declareParameters("java.lang.Long lastEvent");
+ List<Object> parameterVals = new ArrayList<>();
+ parameterVals.add(lastEvent);
+ StringBuilder filterBuilder = new StringBuilder("eventId > para" + parameterVals.size());
+ StringBuilder parameterBuilder = new StringBuilder("java.lang.Long para" + parameterVals.size());
+ if (rqst.isSetEventTypeSkipList()) {
+ for (String eventType : rqst.getEventTypeSkipList()) {
+ parameterVals.add(eventType);
+ parameterBuilder.append(", java.lang.String para" + parameterVals.size());
+ filterBuilder.append(" && eventType != para" + parameterVals.size());
+ }
+ }
+ query = pm.newQuery(MNotificationLog.class, filterBuilder.toString());
+ query.declareParameters(parameterBuilder.toString());
query.setOrdering("eventId ascending");
int maxEventResponse = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_MAX_EVENT_RESPONSE);
int maxEvents = (rqst.getMaxEvents() < maxEventResponse && rqst.getMaxEvents() > 0) ? rqst.getMaxEvents() : maxEventResponse;
query.setRange(0, maxEvents);
- Collection<MNotificationLog> events = (Collection) query.execute(lastEvent);
+ Collection<MNotificationLog> events =
+ (Collection) query.executeWithArray(parameterVals.toArray(new Object[parameterVals.size()]));
commited = commitTransaction();
if (events == null) {
return result;
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index e4ef46f..bb504b0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.DatabaseName;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.TableName;
@@ -49,13 +50,29 @@ import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.HiveAlterHandler;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType;
import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
+import org.apache.hadoop.hive.metastore.messaging.DeleteTableColumnStatMessage;
+import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage;
+import org.apache.hadoop.hive.metastore.messaging.DeletePartitionColumnStatMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
@@ -94,9 +111,11 @@ public class CachedStore implements RawStore, Configurable {
private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm();
private RawStore rawStore = null;
private Configuration conf;
- private boolean areTxnStatsSupported;
+ private static boolean areTxnStatsSupported;
private PartitionExpressionProxy expressionProxy = null;
private static final SharedCache sharedCache = new SharedCache();
+ private static boolean canUseEvents = false;
+ private static long lastEventId;
static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
@@ -119,7 +138,38 @@ public class CachedStore implements RawStore, Configurable {
initSharedCache(conf);
}
+ synchronized private static void triggerUpdateUsingEvent(RawStore rawStore) {
+ if (!isCachePrewarmed.get()) {
+ LOG.error("cache update should be done only after prewarm");
+ throw new RuntimeException("cache update should be done only after prewarm");
+ }
+ long startTime = System.nanoTime();
+ long preEventId = lastEventId;
+ try {
+ lastEventId = updateUsingNotificationEvents(rawStore, lastEventId);
+ } catch (Exception e) {
+ LOG.error(" cache update failed for start event id " + lastEventId + " with error ", e);
+ throw new RuntimeException(e.getMessage());
+ } finally {
+ long endTime = System.nanoTime();
+ LOG.info("Time taken in updateUsingNotificationEvents for num events : " + (lastEventId - preEventId) + " = " +
+ (endTime - startTime) / 1000000 + "ms");
+ }
+ }
+
+ synchronized private static void triggerPreWarm(RawStore rawStore) {
+ lastEventId = rawStore.getCurrentNotificationEventId().getEventId();
+ prewarm(rawStore);
+ }
+
private void setConfInternal(Configuration conf) {
+ if (MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT)) {
+ canUseEvents = true;
+ } else {
+ canUseEvents = false;
+ }
+ LOG.info("canUseEvents is set to " + canUseEvents + " in cached Store");
+
String rawStoreClassName =
MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
if (rawStore == null) {
@@ -151,6 +201,201 @@ public class CachedStore implements RawStore, Configurable {
}
@VisibleForTesting
+ public static SharedCache getSharedCache() {
+ return sharedCache;
+ }
+
+ static private ColumnStatistics updateStatsForPart(RawStore rawStore, Table before, String catalogName,
+ String dbName, String tableName, Partition part) throws Exception {
+ ColumnStatistics colStats;
+ List<String> deletedCols = new ArrayList<>();
+ colStats = HiveAlterHandler.updateOrGetPartitionColumnStats(rawStore, catalogName, dbName, tableName,
+ part.getValues(), part.getSd().getCols(), before, part, null, deletedCols);
+ for (String column : deletedCols) {
+ sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, part.getValues(), column);
+ }
+ if (colStats != null) {
+ sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, part.getValues(), colStats.getStatsObj());
+ }
+ return colStats;
+ }
+
+ static private void updateStatsForTable(RawStore rawStore, Table before, Table after, String catalogName,
+ String dbName, String tableName) throws Exception {
+ ColumnStatistics colStats = null;
+ List<String> deletedCols = new ArrayList<>();
+ if (before.isSetPartitionKeys()) {
+ List<Partition> parts = sharedCache.listCachedPartitions(catalogName, dbName, tableName, -1);
+ for (Partition part : parts) {
+ colStats = updateStatsForPart(rawStore, before, catalogName, dbName, tableName, part);
+ }
+ }
+
+ boolean needUpdateAggrStat = false;
+ List<ColumnStatisticsObj> statisticsObjs = HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, before,
+ after,null, null, rawStore.getConf(), deletedCols);
+ if (colStats != null) {
+ sharedCache.updateTableColStatsInCache(catalogName, dbName, tableName, statisticsObjs);
+ needUpdateAggrStat = true;
+ }
+ for (String column : deletedCols) {
+ sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, column);
+ needUpdateAggrStat = true;
+ }
+ }
+
+ @VisibleForTesting
+ public static long updateUsingNotificationEvents(RawStore rawStore, long lastEventId) throws Exception {
+ LOG.debug("updating cache using notification events starting from event id " + lastEventId);
+ NotificationEventRequest rqst = new NotificationEventRequest(lastEventId);
+
+ //Add the events which are not related to metadata update
+ rqst.addToEventTypeSkipList(MessageBuilder.INSERT_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.OPEN_TXN_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.COMMIT_TXN_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ABORT_TXN_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ALLOC_WRITE_ID_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ACID_WRITE_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.CREATE_FUNCTION_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.DROP_FUNCTION_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ADD_PRIMARYKEY_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ADD_FOREIGNKEY_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ADD_UNIQUECONSTRAINT_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ADD_NOTNULLCONSTRAINT_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.DROP_CONSTRAINT_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.CREATE_ISCHEMA_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ALTER_ISCHEMA_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.DROP_ISCHEMA_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ADD_SCHEMA_VERSION_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.ALTER_SCHEMA_VERSION_EVENT);
+ rqst.addToEventTypeSkipList(MessageBuilder.DROP_SCHEMA_VERSION_EVENT);
+
+ Deadline.startTimer("getNextNotification");
+ NotificationEventResponse resp = rawStore.getNextNotification(rqst);
+ Deadline.stopTimer();
+
+ if (resp == null || resp.getEvents() == null) {
+ LOG.debug("no events to process");
+ return lastEventId;
+ }
+
+ List<NotificationEvent> eventList = resp.getEvents();
+ LOG.debug("num events to process" + eventList.size());
+
+ for (NotificationEvent event : eventList) {
+ long eventId = event.getEventId();
+ if (eventId <= lastEventId) {
+ LOG.error("Event id is not valid " + lastEventId + " : " + eventId);
+ throw new RuntimeException(" event id is not valid " + lastEventId + " : " + eventId);
+ }
+ lastEventId = eventId;
+ String message = event.getMessage();
+ LOG.debug("Event to process " + event);
+ MessageDeserializer deserializer = MessageFactory.getInstance(event.getMessageFormat()).getDeserializer();
+ String catalogName = event.getCatName() == null ? "" : event.getCatName().toLowerCase();
+ String dbName = event.getDbName() == null ? "" : event.getDbName().toLowerCase();
+ String tableName = event.getTableName() == null ? "" : event.getTableName().toLowerCase();
+ if (!shouldCacheTable(catalogName, dbName, tableName)) {
+ continue;
+ }
+ switch (event.getEventType()) {
+ case MessageBuilder.ADD_PARTITION_EVENT:
+ AddPartitionMessage addPartMessage = deserializer.getAddPartitionMessage(message);
+ sharedCache.addPartitionsToCache(catalogName,
+ dbName, tableName, addPartMessage.getPartitionObjs());
+ break;
+ case MessageBuilder.ALTER_PARTITION_EVENT:
+ AlterPartitionMessage alterPartitionMessage = deserializer.getAlterPartitionMessage(message);
+ sharedCache.alterPartitionInCache(catalogName, dbName, tableName,
+ alterPartitionMessage.getPtnObjBefore().getValues(), alterPartitionMessage.getPtnObjAfter());
+ //TODO : Use the stat object stored in the alter table message to update the stats in cache.
+ if (updateStatsForPart(rawStore, alterPartitionMessage.getTableObj(),
+ catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter()) != null) {
+ CacheUpdateMasterWork.updateTableAggregatePartitionColStats(rawStore, catalogName, dbName, tableName);
+ }
+ break;
+ case MessageBuilder.DROP_PARTITION_EVENT:
+ DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message);
+ for (Map<String, String> partMap : dropPartitionMessage.getPartitions()) {
+ sharedCache.removePartitionFromCache(catalogName, dbName, tableName, new ArrayList<>(partMap.values()));
+ }
+ break;
+ case MessageBuilder.CREATE_TABLE_EVENT:
+ CreateTableMessage createTableMessage = deserializer.getCreateTableMessage(message);
+ sharedCache.addTableToCache(catalogName, dbName,
+ tableName, createTableMessage.getTableObj());
+ break;
+ case MessageBuilder.ALTER_TABLE_EVENT:
+ AlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(message);
+ sharedCache.alterTableInCache(catalogName, dbName, tableName, alterTableMessage.getTableObjAfter());
+ //TODO : Use the stat object stored in the alter table message to update the stats in cache.
+ updateStatsForTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
+ catalogName, dbName, tableName);
+ break;
+ case MessageBuilder.DROP_TABLE_EVENT:
+ DropTableMessage dropTableMessage = deserializer.getDropTableMessage(message);
+ int batchSize = MetastoreConf.getIntVar(rawStore.getConf(), ConfVars.BATCH_RETRIEVE_OBJECTS_MAX);
+ String tableDnsPath = null;
+ Path tablePath = new Path(dropTableMessage.getTableObj().getSd().getLocation());
+ if (tablePath != null) {
+ tableDnsPath = new Warehouse(rawStore.getConf()).getDnsPath(tablePath).toString();
+ }
+
+ while (true) {
+ Map<String, String> partitionLocations = rawStore.getPartitionLocations(catalogName, dbName, tableName,
+ tableDnsPath, batchSize);
+ if (partitionLocations == null || partitionLocations.isEmpty()) {
+ break;
+ }
+ sharedCache.removePartitionFromCache(catalogName, dbName, tableName,
+ new ArrayList<>(partitionLocations.values()));
+ }
+ sharedCache.removeTableFromCache(catalogName, dbName, tableName);
+ break;
+ case MessageBuilder.CREATE_DATABASE_EVENT:
+ CreateDatabaseMessage createDatabaseMessage = deserializer.getCreateDatabaseMessage(message);
+ sharedCache.addDatabaseToCache(createDatabaseMessage.getDatabaseObject());
+ break;
+ case MessageBuilder.ALTER_DATABASE_EVENT:
+ AlterDatabaseMessage alterDatabaseMessage = deserializer.getAlterDatabaseMessage(message);
+ sharedCache.alterDatabaseInCache(catalogName, dbName, alterDatabaseMessage.getDbObjAfter());
+ break;
+ case MessageBuilder.DROP_DATABASE_EVENT:
+ sharedCache.removeDatabaseFromCache(catalogName, dbName);
+ break;
+ case MessageBuilder.CREATE_CATALOG_EVENT:
+ case MessageBuilder.DROP_CATALOG_EVENT:
+ case MessageBuilder.ALTER_CATALOG_EVENT:
+ // TODO : Need to add cache invalidation for catalog events
+ LOG.error("catalog Events are not supported for cache invalidation : " + event.getEventType());
+ break;
+ case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT:
+ UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message);
+ updateTableColumnsStatsInternal(rawStore.getConf(), msg.getColumnStatistics(), msg.getParameters(),
+ msg.getValidWriteIds(), msg.getWriteId());
+ break;
+ case MessageBuilder.DELETE_TBL_COL_STAT_EVENT:
+ DeleteTableColumnStatMessage msgDel = deserializer.getDeleteTableColumnStatMessage(message);
+ sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, msgDel.getColName());
+ break;
+ case MessageBuilder.UPDATE_PART_COL_STAT_EVENT:
+ UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message);
+ sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getPartVals(),
+ msgPartUpdate.getColumnStatistics().getStatsObj());
+ break;
+ case MessageBuilder.DELETE_PART_COL_STAT_EVENT:
+ DeletePartitionColumnStatMessage msgPart = deserializer.getDeletePartitionColumnStatMessage(message);
+ sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName,
+ msgPart.getPartValues(), msgPart.getColName());
+ break;
+ default:
+ LOG.error("Event is not supported for cache invalidation : " + event.getEventType());
+ }
+ }
+ return lastEventId;
+ }
+
+ @VisibleForTesting
/**
* This initializes the caches in SharedCache by getting the objects from Metastore DB via
* ObjectStore and populating the respective caches
@@ -161,6 +406,7 @@ public class CachedStore implements RawStore, Configurable {
}
long startTime = System.nanoTime();
LOG.info("Prewarming CachedStore");
+ long sleepTime = 100;
while (!isCachePrewarmed.get()) {
// Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
Deadline.registerIfNot(1000000);
@@ -176,6 +422,12 @@ public class CachedStore implements RawStore, Configurable {
sharedCache.populateCatalogsInCache(catalogs);
} catch (MetaException | NoSuchObjectException e) {
LOG.warn("Failed to populate catalogs in cache, going to try again", e);
+ try {
+ Thread.sleep(sleepTime);
+ sleepTime = sleepTime * 2;
+ } catch (InterruptedException timerEx) {
+ LOG.info("sleep interrupted", timerEx.getMessage());
+ }
// try again
continue;
}
@@ -407,8 +659,7 @@ public class CachedStore implements RawStore, Configurable {
}
if (runOnlyOnce) {
// Some tests control the execution of the background update thread
- cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
- TimeUnit.MILLISECONDS);
+ cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, TimeUnit.MILLISECONDS);
}
}
@@ -439,6 +690,7 @@ public class CachedStore implements RawStore, Configurable {
private boolean shouldRunPrewarm = true;
private final RawStore rawStore;
+
CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) {
this.shouldRunPrewarm = shouldRunPrewarm;
String rawStoreClassName =
@@ -456,11 +708,19 @@ public class CachedStore implements RawStore, Configurable {
@Override
public void run() {
if (!shouldRunPrewarm) {
- // TODO: prewarm and update can probably be merged.
- update();
+ if (canUseEvents) {
+ try {
+ triggerUpdateUsingEvent(rawStore);
+ } catch (Exception e) {
+ LOG.error("failed to update cache using events ", e);
+ }
+ } else {
+ // TODO: prewarm and update can probably be merged.
+ update();
+ }
} else {
try {
- prewarm(rawStore);
+ triggerPreWarm(rawStore);
} catch (Exception e) {
LOG.error("Prewarm failure", e);
return;
@@ -619,7 +879,7 @@ public class CachedStore implements RawStore, Configurable {
// Update cached aggregate stats for all partitions of a table and for all
// but default partition
- private void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName,
+ private static void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName,
String tblName) {
try {
Table table = rawStore.getTable(catName, dbName, tblName);
@@ -675,7 +935,22 @@ public class CachedStore implements RawStore, Configurable {
@Override
public boolean commitTransaction() {
- return rawStore.commitTransaction();
+ if (!rawStore.commitTransaction()) {
+ return false;
+ }
+
+ // In case of event based update, shared cache is not updated directly to avoid inconsistency.
+ // For example, if metastore B add a partition, then metastore A drop a partition later. However, on metastore A,
+ // it first get drop partition request, then from notification, create the partition. If there's no tombstone
+ // entry in partition cache to tell drop is after creation, we end up consumes the creation request. Though
+ // eventually there's drop partition notification, but during the interim, later event takes precedence.
+ // So we will not update the cache during raw store operation but wait during commit transaction to make sure that
+ // the event related to the current transactions are updated in the cache and thus we can support strong
+ // consistency in case there is only one metastore.
+ if (canUseEvents) {
+ triggerUpdateUsingEvent(rawStore);
+ }
+ return true;
}
@Override
@@ -691,19 +966,26 @@ public class CachedStore implements RawStore, Configurable {
@Override
public void createCatalog(Catalog cat) throws MetaException {
rawStore.createCatalog(cat);
- sharedCache.addCatalogToCache(cat);
+ // in case of event based cache update, cache will not be updated for catalog.
+ if (!canUseEvents) {
+ sharedCache.addCatalogToCache(cat);
+ }
}
@Override
public void alterCatalog(String catName, Catalog cat) throws MetaException,
InvalidOperationException {
rawStore.alterCatalog(catName, cat);
- sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat);
+ // in case of event based cache update, cache will not be updated for catalog.
+ if (!canUseEvents) {
+ sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat);
+ }
}
@Override
public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException {
- if (!sharedCache.isCatalogCachePrewarmed()) {
+ // in case of event based cache update, cache will not be updated for catalog.
+ if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) {
return rawStore.getCatalog(catalogName);
}
Catalog cat = sharedCache.getCatalogFromCache(normalizeIdentifier(catalogName));
@@ -715,7 +997,8 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getCatalogs() throws MetaException {
- if (!sharedCache.isCatalogCachePrewarmed()) {
+ // in case of event based cache update, cache will not be updated for catalog.
+ if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) {
return rawStore.getCatalogs();
}
return sharedCache.listCachedCatalogs();
@@ -724,19 +1007,29 @@ public class CachedStore implements RawStore, Configurable {
@Override
public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException {
rawStore.dropCatalog(catalogName);
- catalogName = catalogName.toLowerCase();
- sharedCache.removeCatalogFromCache(catalogName);
+
+ // in case of event based cache update, cache will not be updated for catalog.
+ if (!canUseEvents) {
+ catalogName = catalogName.toLowerCase();
+ sharedCache.removeCatalogFromCache(catalogName);
+ }
}
@Override
public void createDatabase(Database db) throws InvalidObjectException, MetaException {
rawStore.createDatabase(db);
- sharedCache.addDatabaseToCache(db);
+ // in case of event based cache update, cache will be updated during commit.
+ if (!canUseEvents) {
+ sharedCache.addDatabaseToCache(db);
+ }
}
@Override
public Database getDatabase(String catName, String dbName) throws NoSuchObjectException {
- if (!sharedCache.isDatabaseCachePrewarmed()) {
+ // in case of event based cache update, cache will be updated during commit. So within active transaction, read
+ // directly from rawStore to avoid reading stale data as the data updated during same transaction will not be
+ // updated in the cache.
+ if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getDatabase(catName, dbName);
}
dbName = dbName.toLowerCase();
@@ -751,7 +1044,8 @@ public class CachedStore implements RawStore, Configurable {
@Override
public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException {
boolean succ = rawStore.dropDatabase(catName, dbName);
- if (succ) {
+ if (succ && !canUseEvents) {
+ // in case of event based cache update, cache will be updated during commit.
sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName),
StringUtils.normalizeIdentifier(dbName));
}
@@ -762,7 +1056,8 @@ public class CachedStore implements RawStore, Configurable {
public boolean alterDatabase(String catName, String dbName, Database db)
throws NoSuchObjectException, MetaException {
boolean succ = rawStore.alterDatabase(catName, dbName, db);
- if (succ) {
+ if (succ && !canUseEvents) {
+ // in case of event based cache update, cache will be updated during commit.
sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(catName),
StringUtils.normalizeIdentifier(dbName), db);
}
@@ -771,7 +1066,7 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getDatabases(String catName, String pattern) throws MetaException {
- if (!sharedCache.isDatabaseCachePrewarmed()) {
+ if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getDatabases(catName, pattern);
}
return sharedCache.listCachedDatabases(catName, pattern);
@@ -779,7 +1074,7 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getAllDatabases(String catName) throws MetaException {
- if (!sharedCache.isDatabaseCachePrewarmed()) {
+ if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getAllDatabases(catName);
}
return sharedCache.listCachedDatabases(catName);
@@ -821,6 +1116,10 @@ public class CachedStore implements RawStore, Configurable {
@Override
public void createTable(Table tbl) throws InvalidObjectException, MetaException {
rawStore.createTable(tbl);
+ // in case of event based cache update, cache will be updated during commit.
+ if (canUseEvents) {
+ return;
+ }
String catName = normalizeIdentifier(tbl.getCatName());
String dbName = normalizeIdentifier(tbl.getDbName());
String tblName = normalizeIdentifier(tbl.getTableName());
@@ -835,7 +1134,8 @@ public class CachedStore implements RawStore, Configurable {
public boolean dropTable(String catName, String dbName, String tblName)
throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.dropTable(catName, dbName, tblName);
- if (succ) {
+ // in case of event based cache update, cache will be updated during commit.
+ if (succ && !canUseEvents) {
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
@@ -857,7 +1157,7 @@ public class CachedStore implements RawStore, Configurable {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getTable(catName, dbName, tblName, validWriteIds);
}
Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -898,7 +1198,8 @@ public class CachedStore implements RawStore, Configurable {
@Override
public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartition(part);
- if (succ) {
+ // in case of event based cache update, cache will be updated during commit.
+ if (succ && !canUseEvents) {
String dbName = normalizeIdentifier(part.getDbName());
String tblName = normalizeIdentifier(part.getTableName());
String catName = part.isSetCatName() ? normalizeIdentifier(part.getCatName()) : DEFAULT_CATALOG_NAME;
@@ -914,7 +1215,8 @@ public class CachedStore implements RawStore, Configurable {
public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartitions(catName, dbName, tblName, parts);
- if (succ) {
+ // in case of event based cache update, cache will be updated during commit.
+ if (succ && !canUseEvents) {
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
@@ -930,7 +1232,8 @@ public class CachedStore implements RawStore, Configurable {
public boolean addPartitions(String catName, String dbName, String tblName, PartitionSpecProxy partitionSpec,
boolean ifNotExists) throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartitions(catName, dbName, tblName, partitionSpec, ifNotExists);
- if (succ) {
+ // in case of event based cache update, cache will be updated during commit.
+ if (succ && !canUseEvents) {
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
@@ -959,7 +1262,7 @@ public class CachedStore implements RawStore, Configurable {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getPartition(
catName, dbName, tblName, part_vals, validWriteIds);
}
@@ -990,7 +1293,7 @@ public class CachedStore implements RawStore, Configurable {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals);
}
Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1005,7 +1308,8 @@ public class CachedStore implements RawStore, Configurable {
public boolean dropPartition(String catName, String dbName, String tblName, List<String> part_vals)
throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.dropPartition(catName, dbName, tblName, part_vals);
- if (succ) {
+ // in case of event based cache update, cache will be updated during commit.
+ if (succ && !canUseEvents) {
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
@@ -1021,6 +1325,10 @@ public class CachedStore implements RawStore, Configurable {
public void dropPartitions(String catName, String dbName, String tblName, List<String> partNames)
throws MetaException, NoSuchObjectException {
rawStore.dropPartitions(catName, dbName, tblName, partNames);
+ // in case of event based cache update, cache will be updated during commit.
+ if (canUseEvents) {
+ return;
+ }
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1040,7 +1348,7 @@ public class CachedStore implements RawStore, Configurable {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getPartitions(catName, dbName, tblName, max);
}
Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1062,6 +1370,10 @@ public class CachedStore implements RawStore, Configurable {
public Table alterTable(String catName, String dbName, String tblName, Table newTable,
String validWriteIds) throws InvalidObjectException, MetaException {
newTable = rawStore.alterTable(catName, dbName, tblName, newTable, validWriteIds);
+ // in case of event based cache update, cache will be updated during commit.
+ if (canUseEvents) {
+ return newTable;
+ }
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
@@ -1096,7 +1408,8 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getTables(String catName, String dbName, String pattern) throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() ||
+ (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getTables(catName, dbName, pattern);
}
return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
@@ -1106,7 +1419,8 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getTables(String catName, String dbName, String pattern, TableType tableType)
throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()
+ || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getTables(catName, dbName, pattern, tableType);
}
return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
@@ -1123,7 +1437,8 @@ public class CachedStore implements RawStore, Configurable {
public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames,
List<String> tableTypes) throws MetaException {
// TODO Check if all required tables are allowed, if so, get it from cache
- if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() ||
+ (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes);
}
return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName),
@@ -1134,6 +1449,9 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<Table> getTableObjectsByName(String catName, String dbName, List<String> tblNames)
throws MetaException, UnknownDBException {
+ if (canUseEvents && rawStore.isActiveTransaction()) {
+ return rawStore.getTableObjectsByName(catName, dbName, tblNames);
+ }
dbName = normalizeIdentifier(dbName);
catName = normalizeIdentifier(catName);
boolean missSomeInCache = false;
@@ -1168,7 +1486,8 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getAllTables(String catName, String dbName) throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() ||
+ (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getAllTables(catName, dbName);
}
return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
@@ -1188,7 +1507,7 @@ public class CachedStore implements RawStore, Configurable {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.listPartitionNames(catName, dbName, tblName, max_parts);
}
Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1218,6 +1537,10 @@ public class CachedStore implements RawStore, Configurable {
List<String> partVals, Partition newPart, String validWriteIds)
throws InvalidObjectException, MetaException {
newPart = rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, validWriteIds);
+ // in case of event based cache update, cache will be updated during commit.
+ if (canUseEvents) {
+ return newPart;
+ }
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
@@ -1235,6 +1558,10 @@ public class CachedStore implements RawStore, Configurable {
throws InvalidObjectException, MetaException {
newParts = rawStore.alterPartitions(
catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds);
+ // in case of event based cache update, cache will be updated during commit.
+ if (canUseEvents) {
+ return newParts;
+ }
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
@@ -1286,7 +1613,7 @@ public class CachedStore implements RawStore, Configurable {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts, result);
}
List<String> partNames = new LinkedList<>();
@@ -1317,7 +1644,7 @@ public class CachedStore implements RawStore, Configurable {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr);
}
String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
@@ -1332,7 +1659,8 @@ public class CachedStore implements RawStore, Configurable {
return partNames.size();
}
- private static List<String> partNameToVals(String name) {
+ @VisibleForTesting
+ public static List<String> partNameToVals(String name) {
if (name == null) {
return null;
}
@@ -1350,7 +1678,7 @@ public class CachedStore implements RawStore, Configurable {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames);
}
Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1537,7 +1865,7 @@ public class CachedStore implements RawStore, Configurable {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName, groupNames);
}
Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1562,7 +1890,7 @@ public class CachedStore implements RawStore, Configurable {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName, groupNames);
}
Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1591,7 +1919,7 @@ public class CachedStore implements RawStore, Configurable {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.listPartitionNamesPs(catName, dbName, tblName, partSpecs, maxParts);
}
Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1620,7 +1948,7 @@ public class CachedStore implements RawStore, Configurable {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.listPartitionsPsWithAuth(catName, dbName, tblName, partSpecs, maxParts, userName, groupNames);
}
Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1702,29 +2030,58 @@ public class CachedStore implements RawStore, Configurable {
return colStat;
}
+ private static void updateTableColumnsStatsInternal(Configuration conf, ColumnStatistics colStats,
+ Map<String, String> newParams, String validWriteIds,
+ long writeId) throws MetaException {
+ String catName = colStats.getStatsDesc().isSetCatName() ?
+ normalizeIdentifier(colStats.getStatsDesc().getCatName()) :
+ getDefaultCatalog(conf);
+ String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
+ String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
+ if (!shouldCacheTable(catName, dbName, tblName)) {
+ return;
+ }
+ Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
+ return;
+ }
+
+ boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters());
+ if (isTxn && validWriteIds != null) {
+ if (!areTxnStatsSupported) {
+ StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
+ } else {
+ String errorMsg = ObjectStore.verifyStatsChangeCtx(
+ table.getParameters(), newParams, writeId, validWriteIds, true);
+ if (errorMsg != null) {
+ throw new MetaException(errorMsg);
+ }
+ if (!ObjectStore.isCurrentStatsValidForTheQuery(conf, newParams, table.getWriteId(),
+ validWriteIds, true)) {
+ // Make sure we set the flag to invalid regardless of the current value.
+ StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
+ LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table "
+ + table.getDbName() + "." + table.getTableName());
+ }
+ }
+ }
+
+ table.setWriteId(writeId);
+ table.setParameters(newParams);
+ sharedCache.alterTableInCache(catName, dbName, tblName, table);
+ sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj());
+ }
+
@Override
public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats,
String validWriteIds, long writeId)
throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
Map<String, String> newParams = rawStore.updateTableColumnStatistics(
colStats, validWriteIds, writeId);
- if (newParams != null) {
- String catName = colStats.getStatsDesc().isSetCatName() ?
- normalizeIdentifier(colStats.getStatsDesc().getCatName()) :
- getDefaultCatalog(conf);
- String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
- String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
- if (!shouldCacheTable(catName, dbName, tblName)) {
- return newParams;
- }
- Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
- if (table == null) {
- // The table is not yet loaded in cache
- return newParams;
- }
- table.setParameters(newParams);
- sharedCache.alterTableInCache(catName, dbName, tblName, table);
- sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj());
+ // in case of event based cache update, cache will be updated during commit.
+ if (newParams != null && !canUseEvents) {
+ updateTableColumnsStatsInternal(conf, colStats, newParams, null, writeId);
}
return newParams;
}
@@ -1765,7 +2122,8 @@ public class CachedStore implements RawStore, Configurable {
String colName)
throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.deleteTableColumnStatistics(catName, dbName, tblName, colName);
- if (succ) {
+ // in case of event based cache update, cache is updated during commit txn
+ if (succ && !canUseEvents) {
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
@@ -1783,7 +2141,8 @@ public class CachedStore implements RawStore, Configurable {
throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
Map<String, String> newParams = rawStore.updatePartitionColumnStatistics(
colStats, partVals, validWriteIds, writeId);
- if (newParams != null) {
+ // in case of event based cache update, cache is updated during commit txn
+ if (newParams != null && !canUseEvents) {
String catName = colStats.getStatsDesc().isSetCatName() ?
normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME;
String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
@@ -1822,7 +2181,8 @@ public class CachedStore implements RawStore, Configurable {
throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
boolean succ =
rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName);
- if (succ) {
+ // in case of event based cache update, cache is updated during commit txn.
+ if (succ && !canUseEvents) {
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
@@ -1851,8 +2211,9 @@ public class CachedStore implements RawStore, Configurable {
tblName = StringUtils.normalizeIdentifier(tblName);
// TODO: we currently cannot do transactional checks for stats here
// (incl. due to lack of sync w.r.t. the below rawStore call).
- if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null) {
- rawStore.get_aggr_stats_for(
+ //TODO : need to calculate aggregate locally in cached store
+ if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null || canUseEvents) {
+ return rawStore.get_aggr_stats_for(
catName, dbName, tblName, partNames, colNames, writeIdList);
}
Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -2246,6 +2607,10 @@ public class CachedStore implements RawStore, Configurable {
// TODO constraintCache
List<String> constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys,
foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+ // in case of event based cache update, cache is updated during commit.
+ if (canUseEvents) {
+ return constraintNames;
+ }
String dbName = normalizeIdentifier(tbl.getDbName());
String tblName = normalizeIdentifier(tbl.getTableName());
String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) :
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index c24e716..ce9e383 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -210,7 +210,7 @@ public class SharedCache {
}
}
- boolean cachePartitions(List<Partition> parts, SharedCache sharedCache) {
+ boolean cachePartitions(Iterable<Partition> parts, SharedCache sharedCache) {
try {
tableLock.writeLock().lock();
for (Partition part : parts) {
@@ -292,6 +292,9 @@ public class SharedCache {
tableLock.writeLock().lock();
PartitionWrapper wrapper =
partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
+ if (wrapper == null) {
+ return null;
+ }
isPartitionCacheDirty.set(true);
part = CacheUtils.assemble(wrapper, sharedCache);
if (wrapper.getSdHash() != null) {
@@ -1171,6 +1174,10 @@ public class SharedCache {
}
TableWrapper tblWrapper =
tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName));
+ if (tblWrapper == null) {
+ //in case of retry, ignore second try.
+ return;
+ }
byte[] sdHash = tblWrapper.getSdHash();
if (sdHash != null) {
decrSd(sdHash);
@@ -1408,7 +1415,7 @@ public class SharedCache {
}
public void addPartitionsToCache(String catName, String dbName, String tblName,
- List<Partition> parts) {
+ Iterable<Partition> parts) {
try {
cacheLock.readLock().lock();
TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java
new file mode 100644
index 0000000..d64b57d
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+
+import java.util.List;
+
+/**
+ * DeletePartitionColumnStatEvent
+ * Event generated for partition column stat delete event.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class DeletePartitionColumnStatEvent extends ListenerEvent {
+ private String catName, dbName, tableName, colName, partName;
+
+ private List<String> partVals;
+
+ /**
+ * @param catName catalog name
+ * @param dbName database name
+ * @param tableName table name
+ * @param partName partition column name
+ * @param partVals partition value
+ * @param colName column name
+ * @param handler handler that is firing the event
+ */
+ public DeletePartitionColumnStatEvent(String catName, String dbName, String tableName, String partName,
+ List<String> partVals, String colName, IHMSHandler handler) {
+ super(true, handler);
+ this.catName = catName;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.colName = colName;
+ this.partName = partName;
+ this.partVals = partVals;
+ }
+
+ public String getCatName() {
+ return catName;
+ }
+
+ public String getDBName() {
+ return dbName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getColName() {
+ return colName;
+ }
+
+ public String getPartName() {
+ return partName;
+ }
+
+ public List<String> getPartVals() {
+ return partVals;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java
new file mode 100644
index 0000000..7638744
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+
+/**
+ * DeleteTableColumnStatEvent
+ * Event generated for table column stat delete event.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class DeleteTableColumnStatEvent extends ListenerEvent {
+ private String catName, dbName, tableName, colName;
+
+ /**
+ * @param catName catalog name
+ * @param dbName database name
+ * @param tableName table name
+ * @param colName column name
+ * @param handler handler that is firing the event
+ */
+ public DeleteTableColumnStatEvent(String catName, String dbName, String tableName, String colName, IHMSHandler handler) {
+ super(true, handler);
+ this.catName = catName;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.colName = colName;
+ }
+
+ public String getCatName() {
+ return catName;
+ }
+
+ public String getDBName() {
+ return dbName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getColName() {
+ return colName;
+ }
+}