You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by vi...@apache.org on 2020/04/09 04:59:26 UTC
[impala] 02/02: IMPALA-8632: Add support for self-event detection
for insert events
This is an automated email from the ASF dual-hosted git repository.
vihangk1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit e0ed7d321c1dca85a0f1482842f8db6db517c909
Author: xiaomeng <xi...@cloudera.com>
AuthorDate: Thu Feb 20 16:32:48 2020 -0800
IMPALA-8632: Add support for self-event detection for insert events
In case of INSERT_EVENTS if Impala inserts into a table it causes a
refresh to the underlying table/partition. This could be unnecessary
when there is only one Impala cluster in the system.
We can detect a self-event in such cases when the HMS API to fire a
listener event returns the event id. This is used by EventProcessor
to ignore the event when it is fetched later in the next polling cycle.
Testing:
Add testInsertFromImpala() in MetastoreEventsProcessorTest.java to test
insert event self-event detection when insert into table and partition.
Change-Id: I7873fbb2c159343690f93b9d120f6b425b983dcf
Reviewed-on: http://gerrit.cloudera.org:8080/15648
Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/common/global-flags.cc | 6 +
be/src/util/backend-gflag-util.cc | 2 +
common/thrift/BackendGflags.thrift | 2 +
.../org/apache/impala/compat/MetastoreShim.java | 69 ++++++++++++
.../org/apache/impala/compat/MetastoreShim.java | 72 ++++++++++++
.../impala/catalog/CatalogServiceCatalog.java | 64 ++++++-----
fe/src/main/java/org/apache/impala/catalog/Db.java | 4 +-
.../org/apache/impala/catalog/HdfsPartition.java | 25 +++--
.../main/java/org/apache/impala/catalog/Table.java | 21 ++--
.../impala/catalog/events/InFlightEvents.java | 91 +++++++++++----
.../impala/catalog/events/MetastoreEvents.java | 34 ++++--
.../impala/catalog/events/SelfEventContext.java | 24 ++--
.../org/apache/impala/service/BackendConfig.java | 2 +
.../apache/impala/service/CatalogOpExecutor.java | 67 +++++------
.../java/org/apache/impala/util/MetaStoreUtil.java | 57 +++-------
.../events/MetastoreEventsProcessorTest.java | 124 ++++++++++++++++++++-
tests/custom_cluster/test_event_processing.py | 11 +-
17 files changed, 504 insertions(+), 171 deletions(-)
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 79084ff..69e856e 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -266,6 +266,12 @@ DEFINE_int32(hms_event_polling_interval_s, 0,
"feature and not recommended to be deployed on production systems until it is "
"made generally available.");
+DEFINE_bool(enable_insert_events, true,
+ "Enables insert events in the events processor. When this configuration is set to "
+ "true Impala will generate INSERT event types which when received by other Impala "
+ "clusters can be used to automatically refresh the tables or partitions. Event "
+ "processing must be turned on for this flag to have any effect.");
+
DEFINE_string(blacklisted_dbs, "sys,information_schema",
"Comma separated list for blacklisted databases. Configure which databases to be "
"skipped for loading (in startup and global INVALIDATE METADATA). Users can't access,"
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index f3e3755..90122d2 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -67,6 +67,7 @@ DECLARE_int64(exchg_node_buffer_size_bytes);
DECLARE_int32(kudu_mutation_buffer_size);
DECLARE_int32(kudu_error_buffer_size);
DECLARE_int32(hms_event_polling_interval_s);
+DECLARE_bool(enable_insert_events);
DECLARE_string(authorization_factory_class);
DECLARE_bool(unlock_mt_dop);
DECLARE_bool(mt_dop_auto_fallback);
@@ -155,6 +156,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size);
cfg.__set_hms_event_polling_interval_s(FLAGS_hms_event_polling_interval_s);
+ cfg.__set_enable_insert_events(FLAGS_enable_insert_events);
cfg.__set_impala_build_version(::GetDaemonBuildVersion());
cfg.__set_authorization_factory_class(FLAGS_authorization_factory_class);
cfg.__set_unlock_mt_dop(FLAGS_unlock_mt_dop);
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 90fad1a..c9b7d47 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -155,4 +155,6 @@ struct TBackendGflags {
65: required bool use_customized_user_groups_mapper_for_ranger
66: required bool enable_column_masking
+
+ 67: required bool enable_insert_events
}
diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index 87e8ecf..39cd4ff 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -23,8 +23,14 @@ import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
@@ -51,6 +57,9 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
@@ -62,6 +71,9 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
import org.apache.hive.service.rpc.thrift.TGetTablesReq;
import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
@@ -71,6 +83,8 @@ import org.apache.impala.service.MetadataOp;
import org.apache.impala.thrift.TMetadataOpRequest;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.util.AcidUtils.TblTransaction;
+import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
+import org.apache.log4j.Logger;
import org.apache.thrift.TException;
/**
@@ -78,6 +92,7 @@ import org.apache.thrift.TException;
* between major versions of Hive. This implements the shimmed methods for Hive 2.
*/
public class MetastoreShim {
+ private static final Logger LOG = Logger.getLogger(MetastoreShim.class);
public static TblTransaction createTblTransaction(
IMetaStoreClient client, Table tbl, long txnId) {
@@ -484,4 +499,58 @@ public class MetastoreShim {
throws MetaException {
return new Path(db.getLocationUri(), tbl.getTableName().toLowerCase()).toString();
}
+
+ /**
+ * Fire insert events asynchronously. This creates a single thread to execute the
+ * fireInsertEvent method and shuts down the thread after it has finished.
+ * In case of any exception, we just log the failure of firing insert events.
+ */
+ public static List<Long> fireInsertEvents(MetaStoreClient msClient,
+ List<InsertEventInfo> insertEventInfos, String dbName, String tableName) {
+ ExecutorService fireInsertEventThread = Executors.newSingleThreadExecutor();
+ CompletableFuture.runAsync(() -> {
+ try {
+ fireInsertEventHelper(msClient.getHiveClient(), insertEventInfos, dbName, tableName);
+ } catch (Exception e) {
+ LOG.error("Failed to fire insert event. Some tables might not be"
+ + " refreshed on other impala clusters.", e);
+ } finally {
+ msClient.close();
+ }
+ }, Executors.newSingleThreadExecutor()).thenRun(() ->
+ fireInsertEventThread.shutdown());
+ return Collections.emptyList();
+ }
+
+ /**
+ * Fires an insert event to HMS notification log. In Hive-2 for partitioned table,
+ * each existing partition touched by the insert will fire a separate insert event.
+ * @param msClient Metastore client,
+ * @param insertEventInfos A list of insert event encapsulating the information needed
+ * to fire insert
+ * @param dbName
+ * @param tableName
+ */
+ @VisibleForTesting
+ public static void fireInsertEventHelper(IMetaStoreClient msClient,
+ List<InsertEventInfo> insertEventInfos, String dbName, String tableName)
+ throws TException {
+ Preconditions.checkNotNull(msClient);
+ Preconditions.checkNotNull(dbName);
+ Preconditions.checkNotNull(tableName);
+ for (InsertEventInfo info : insertEventInfos) {
+ Preconditions.checkNotNull(info.getNewFiles());
+ LOG.debug("Firing an insert event for " + tableName);
+ FireEventRequestData data = new FireEventRequestData();
+ InsertEventRequestData insertData = new InsertEventRequestData();
+ data.setInsertData(insertData);
+ FireEventRequest rqst = new FireEventRequest(true, data);
+ rqst.setDbName(dbName);
+ rqst.setTableName(tableName);
+ insertData.setFilesAdded(new ArrayList<>(info.getNewFiles()));
+ insertData.setReplace(info.isOverwrite());
+ if (info.getPartVals() != null) rqst.setPartitionVals(info.getPartVals());
+ msClient.fireListenerEvent(rqst);
+ }
+ }
}
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 43f174c..5981b2c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -32,6 +32,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -52,6 +53,10 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -83,6 +88,9 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
import org.apache.hive.service.rpc.thrift.TGetTablesReq;
import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
@@ -94,6 +102,7 @@ import org.apache.impala.thrift.TMetadataOpRequest;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.AcidUtils.TblTransaction;
+import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
@@ -984,4 +993,67 @@ public class MetastoreShim {
return wh.getDefaultTablePath(db, tbl.getTableName().toLowerCase(), isExternal)
.toString();
}
+
+ /**
+ * Fire insert events for table and partition.
+ * In case of any exception, we just log the failure of firing insert events.
+ */
+ public static List<Long> fireInsertEvents(MetaStoreClient msClient,
+ List<InsertEventInfo> insertEventInfos, String dbName, String tableName) {
+ try {
+ return fireInsertEventHelper(msClient.getHiveClient(), insertEventInfos, dbName, tableName);
+ } catch (Exception e) {
+ LOG.error("Failed to fire insert event. Some tables might not be"
+ + " refreshed on other impala clusters.", e);
+ } finally {
+ msClient.close();
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Fires an insert event to HMS notification log. In Hive-3 for partitioned table,
+ * all partition insert events will be fired by a bulk API.
+ *
+ * @param msClient Metastore client,
+ * @param insertEventInfos A list of insert event encapsulating the information needed
+ * to fire insert
+ * @param dbName
+ * @param tableName
+ * @return a list of eventIds for the insert events
+ */
+ @VisibleForTesting
+ public static List<Long> fireInsertEventHelper(IMetaStoreClient msClient,
+ List<InsertEventInfo> insertEventInfos, String dbName, String tableName)
+ throws TException {
+ Preconditions.checkNotNull(msClient);
+ Preconditions.checkNotNull(dbName);
+ Preconditions.checkNotNull(tableName);
+ LOG.debug(String.format(
+ "Firing %s insert event for %s", insertEventInfos.size(), tableName));
+ FireEventRequestData data = new FireEventRequestData();
+ FireEventRequest rqst = new FireEventRequest(true, data);
+ rqst.setDbName(dbName);
+ rqst.setTableName(tableName);
+ List<InsertEventRequestData> insertDatas = new ArrayList<>();
+ for (InsertEventInfo info : insertEventInfos) {
+ InsertEventRequestData insertData = new InsertEventRequestData();
+ Preconditions.checkNotNull(info.getNewFiles());
+ insertData.setFilesAdded(new ArrayList<>(info.getNewFiles()));
+ insertData.setReplace(info.isOverwrite());
+ if (info.getPartVals() != null) insertData.setPartitionVal(info.getPartVals());
+ insertDatas.add(insertData);
+ }
+ if (insertDatas.size() == 1) {
+ if (insertEventInfos.get(0).getPartVals() != null) {
+ rqst.setPartitionVals(insertEventInfos.get(0).getPartVals());
+ }
+ data.setInsertData(insertDatas.get(0));
+ } else {
+ data.setInsertDatas(insertDatas);
+ }
+ FireEventResponse response = msClient.fireListenerEvent(rqst);
+
+ return response.getEventIds();
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index fb7ef4d..87465b6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -817,29 +817,36 @@ public class CatalogServiceCatalog extends Catalog {
* Evaluates if the information from an event (serviceId and versionNumber) matches to
* the catalog object. If there is match, the in-flight version for that object is
* removed and method returns true. If it does not match, returns false
+
* @param ctx self context which provides all the information needed to
* evaluate if this is a self-event or not
* @return true if given event information evaluates to a self-event, false otherwise
*/
- public boolean evaluateSelfEvent(SelfEventContext ctx)
+ public boolean evaluateSelfEvent(boolean isInsertEvent, SelfEventContext ctx)
throws CatalogException {
Preconditions.checkState(isEventProcessingActive(),
"Event processing should be enabled when calling this method");
- long versionNumber = ctx.getVersionNumberFromEvent();
+ long versionNumber =
+ isInsertEvent ? ctx.getIdFromEvent() : ctx.getVersionNumberFromEvent();
String serviceIdFromEvent = ctx.getServiceIdFromEvent();
- LOG.debug("Input arguments for self-event evaluation: {} {}",versionNumber,
- serviceIdFromEvent);
- // no version info or service id in the event
- if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
- LOG.info("Not a self-event since the given version is {} and service id is {}",
- versionNumber, serviceIdFromEvent);
- return false;
- }
- // if the service id from event doesn't match with our service id this is not a
- // self-event
- if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
- LOG.info("Not a self-event because service id of this catalog {} does not match "
- + "with one in event {}.", getCatalogServiceId(), serviceIdFromEvent);
+
+ if (!isInsertEvent) {
+ // no version info or service id in the event
+ if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
+ LOG.info("Not a self-event since the given version is {} and service id is {}",
+ versionNumber, serviceIdFromEvent);
+ return false;
+ }
+ // if the service id from event doesn't match with our service id this is not a
+ // self-event
+ if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
+ LOG.info("Not a self-event because service id of this catalog {} does not match "
+ + "with one in event {}.",
+ getCatalogServiceId(), serviceIdFromEvent);
+ }
+ } else if (versionNumber == -1) {
+ // if insert event, we only compare eventId
+ LOG.info("Not a self-event because eventId is {}", versionNumber);
return false;
}
Db db = getDb(ctx.getDbName());
@@ -881,10 +888,11 @@ public class CatalogServiceCatalog extends Catalog {
List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues();
// if the partitionKeyValues is null, we look for tbl's in-flight events
if (partitionKeyValues == null) {
- boolean removed = tbl.removeFromVersionsForInflightEvents(versionNumber);
+ boolean removed =
+ tbl.removeFromVersionsForInflightEvents(isInsertEvent, versionNumber);
if (!removed) {
- LOG.info("Could not find version {} in in-flight event list of table {}",
- versionNumber, tbl.getFullName());
+ LOG.info("Could not find {} {} in in-flight event list of table {}",
+ isInsertEvent ? "eventId" : "version", versionNumber, tbl.getFullName());
}
return removed;
}
@@ -893,8 +901,9 @@ public class CatalogServiceCatalog extends Catalog {
for (List<TPartitionKeyValue> partitionKeyValue : partitionKeyValues) {
HdfsPartition hdfsPartition =
((HdfsTable) tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue);
- if (hdfsPartition == null || !hdfsPartition
- .removeFromVersionsForInflightEvents(versionNumber)) {
+ if (hdfsPartition == null
+ || !hdfsPartition.removeFromVersionsForInflightEvents(
+ isInsertEvent, versionNumber)) {
// even if this is an error condition we should not bail out early since we
// should clean up the self-event state on the rest of the partitions
String partName = HdfsTable.constructPartitionName(partitionKeyValue);
@@ -919,15 +928,18 @@ public class CatalogServiceCatalog extends Catalog {
/**
* Adds a given version number from the catalog table's list of versions for in-flight
* events. Applicable only when external event processing is enabled.
- *
+ * @param isInsertEvent if false add versionNumber for DDL Event, otherwise add eventId
+ * for Insert Event.
* @param tbl Catalog table
- * @param versionNumber version number to be added
+ * @param versionNumber when isInsertEvent is true, it is eventId to add
+ * when isInsertEvent is false, it is version number to add
*/
- public void addVersionsForInflightEvents(Table tbl, long versionNumber) {
+ public void addVersionsForInflightEvents(
+ boolean isInsertEvent, Table tbl, long versionNumber) {
if (!isEventProcessingActive()) return;
- tbl.addToVersionsForInflightEvents(versionNumber);
- LOG.info("Added catalog version {} in table's {} in-flight events",
- versionNumber, tbl.getFullName());
+ tbl.addToVersionsForInflightEvents(isInsertEvent, versionNumber);
+ LOG.info("Added {} {} in table's {} in-flight events",
+ isInsertEvent ? "eventId" : "catalog version", versionNumber, tbl.getFullName());
}
/**
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 330227c..8a465eb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -511,7 +511,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
"removeFromVersionsForInflightEvents called without getting the db lock for "
+ getName() + " database.");
- return inFlightEvents_.remove(versionNumber);
+ return inFlightEvents_.remove(false, versionNumber);
}
/**
@@ -525,7 +525,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
"addToVersionsForInFlightEvents called without getting the db lock for "
+ getName() + " database.");
- if (!inFlightEvents_.add(versionNumber)) {
+ if (!inFlightEvents_.add(false, versionNumber)) {
LOG.warn(String.format("Could not add version %s to the list of in-flight "
+ "events. This could cause unnecessary database %s invalidation when the "
+ "event is processed", versionNumber, getName()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 2069756..e7a5bb8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -852,25 +852,33 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
/**
* Removes a given version from the in-flight events
- * @param versionNumber version number to remove
+ * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
+ * Insert events. If false, remove version number from list of versions for in-flight
+ * DDL events.
+ * @param versionNumber when isInsertEvent is true, it's eventId to remove
+ * when isInsertEvent is false, it's version number to remove
* @return true if the versionNumber was removed, false if it didn't exist
*/
- public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+ public boolean removeFromVersionsForInflightEvents(
+ boolean isInsertEvent, long versionNumber) {
Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
"removeFromVersionsForInflightEvents called without holding the table lock on "
+ "partition " + getPartitionName() + " of table " + table_.getFullName());
- return inFlightEvents_.remove(versionNumber);
+ return inFlightEvents_.remove(isInsertEvent, versionNumber);
}
/**
* Adds a version number to the in-flight events of this partition
- * @param versionNumber version number to add
+ * @param isInsertEvent if true, add eventId to list of eventIds for in-flight Insert
+ * events if false, add version number to list of versions for in-flight DDL events
+ * @param versionNumber when isInsertEvent is true, it's eventId to add
+ * when isInsertEvent is false, it's version number to add
*/
- public void addToVersionsForInflightEvents(long versionNumber) {
+ public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) {
Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
"addToVersionsForInflightEvents called without holding the table lock on "
+ "partition " + getPartitionName() + " of table " + table_.getFullName());
- if (!inFlightEvents_.add(versionNumber)) {
+ if (!inFlightEvents_.add(isInsertEvent, versionNumber)) {
LOG.warn(String.format("Could not add %s version to the partition %s of table %s. "
+ "This could cause unnecessary refresh of the partition when the event is"
+ "received by the Events processor.", versionNumber, getPartitionName(),
@@ -886,13 +894,14 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
*/
private void addInflightVersionsFromParameters() {
Preconditions.checkNotNull(hmsParameters_);
- Preconditions.checkState(inFlightEvents_.size() == 0);
+ Preconditions.checkState(inFlightEvents_.size(false) == 0);
// we should not check for table lock being held here since there are certain code
// paths which call this method without holding the table lock (eg. getOrLoadTable())
if (!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
return;
}
- inFlightEvents_.add(Long.parseLong(
+ inFlightEvents_.add(false,
+ Long.parseLong(
hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 0c6ac31..8f0f5ad 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -63,7 +63,6 @@ import org.apache.log4j.Logger;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
@@ -801,14 +800,19 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
/**
* Removes a given version from the collection of version numbers for in-flight events
- * @param versionNumber version number to remove from the collection
+ * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
+ * Insert events. If false, remove versionNumber from list of versions for in-flight DDL
+ * events
+ * @param versionNumber when isInsertEvent is true, it's eventId to remove
+ * when isInsertEvent is false, it's version number to remove
* @return true if version was successfully removed, false if didn't exist
*/
- public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+ public boolean removeFromVersionsForInflightEvents(
+ boolean isInsertEvent, long versionNumber) {
Preconditions.checkState(tableLock_.isHeldByCurrentThread(),
"removeFromVersionsForInFlightEvents called without taking the table lock on "
+ getFullName());
- return inFlightEvents.remove(versionNumber);
+ return inFlightEvents.remove(isInsertEvent, versionNumber);
}
/**
@@ -816,16 +820,19 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
* collection is already at the max size defined by
* <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and
* does not add it
- * @param versionNumber version number to add
+ * @param isInsertEvent if true, add eventId to list of eventIds for in-flight Insert
+ * events. If false, add versionNumber to list of versions for in-flight DDL events
+ * @param versionNumber when isInsertEvent is true, it's eventId to add
+ * when isInsertEvent is false, it's version number to add
* @return True if version number was added, false if the collection is at its max
* capacity
*/
- public void addToVersionsForInflightEvents(long versionNumber) {
+ public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) {
// we generally don't take locks on Incomplete tables since they are atomically
// replaced during load
Preconditions.checkState(
this instanceof IncompleteTable || tableLock_.isHeldByCurrentThread());
- if (!inFlightEvents.add(versionNumber)) {
+ if (!inFlightEvents.add(isInsertEvent, versionNumber)) {
LOG.warn(String.format("Could not add %s version to the table %s. This could "
+ "cause unnecessary refresh of the table when the event is received by the "
+ "Events processor.", versionNumber, getFullName()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
index 1aed6d1..11d38f6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
@@ -34,41 +34,66 @@ public class InFlightEvents {
// maximum number of catalog versions to store for in-flight events for this table
private static final int DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS = 10;
+ // maximum number of eventIds to store for in-flight events for this table
+ private static final int DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS = 100;
+
private static final Logger LOG = LoggerFactory.getLogger(InFlightEvents.class);
- // FIFO list of versions for all the in-flight metastore events in this table
+ // FIFO list of versions for all the in-flight metastore DDL events in this table
// This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_EVENTS size. Anything which
// is attempted to be added to this list when its at maximum capacity is ignored
private final LinkedList<Long> versionsForInflightEvents_ = new LinkedList<>();
+ // FIFO list of eventIds for all the in-flight metastore Insert events in this table
+ // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS size. Anything
+ // which is attempted to be added to this list when its at maximum capacity is ignored
+ private final LinkedList<Long> idsForInflightDmlEvents_ = new LinkedList<>();
+
// maximum number of versions to store
- private final int capacity_;
+ private final int capacity_for_versions_;
+
+ // maximum number of eventIds to store
+ private final int capacity_for_eventIds_;
public InFlightEvents() {
- this.capacity_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS;
+ this.capacity_for_versions_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS;
+ this.capacity_for_eventIds_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS;
}
public InFlightEvents(int capacity) {
Preconditions.checkState(capacity > 0);
- this.capacity_ = capacity;
+ this.capacity_for_versions_ = capacity;
+ this.capacity_for_eventIds_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS;
}
-
/**
* Gets the current list of versions for in-flight events for this table
+ * @param isInsertEvent if true, return list of eventIds for in-flight Insert events
+ * if false, return list of versions for in-flight DDL events
*/
- public List<Long> getAll() {
- return ImmutableList.copyOf(versionsForInflightEvents_);
+ public List<Long> getAll(boolean isInsertEvent) {
+ if (isInsertEvent) {
+ return ImmutableList.copyOf(idsForInflightDmlEvents_);
+ } else {
+ return ImmutableList.copyOf(versionsForInflightEvents_);
+ }
}
/**
* Removes a given version from the collection of version numbers for in-flight
* events.
- *
- * @param versionNumber version number to remove from the collection
+ * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
+ * Insert events. If false, remove version number from list of versions for in-flight
+ * DDL events.
+ * @param versionNumber when isInsertEvent is true, it's eventId to remove
+ * when isInsertEvent is false, it's version number to remove
* @return true if the version was found and successfully removed, false
* otherwise
*/
- public boolean remove(long versionNumber) {
- return versionsForInflightEvents_.remove(versionNumber);
+ public boolean remove(boolean isInsertEvent, long versionNumber) {
+ if (isInsertEvent) {
+ return idsForInflightDmlEvents_.remove(versionNumber);
+ } else {
+ return versionsForInflightEvents_.remove(versionNumber);
+ }
}
/**
@@ -76,23 +101,45 @@ public class InFlightEvents {
* collection is already at the max size defined by
* <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and
* does not add it
- *
- * @param versionNumber version number to add
+ * @param isInsertEvent If true, add eventId to list of eventIds for in-flight Insert
+ * events. If false, add versionNumber to list of versions for in-flight DDL events.
+ * @param versionNumber when isInsertEvent is true, it's eventId to add
+ * when isInsertEvent is false, it's version number to add
* @return True if version number was added, false if the collection is at its max
* capacity
*/
- public boolean add(long versionNumber) {
- if (versionsForInflightEvents_.size() == capacity_) {
- LOG.warn(String.format("Number of versions to be stored is at "
- + " its max capacity %d. Ignoring add request for version number %d.",
- DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
- return false;
+ public boolean add(boolean isInsertEvent, long versionNumber) {
+ if (isInsertEvent) {
+ if (idsForInflightDmlEvents_.size() == capacity_for_eventIds_) {
+ LOG.warn(String.format("Number of Insert events to be stored is at "
+ + " its max capacity %d. Ignoring add request for eventId %d.",
+ DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
+ return false;
+ }
+ idsForInflightDmlEvents_.add(versionNumber);
+ } else {
+ if (versionsForInflightEvents_.size() == capacity_for_versions_) {
+ LOG.warn(String.format("Number of DDL events to be stored is at "
+ + "its max capacity %d. Ignoring add request for version %d.",
+ DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
+ return false;
+ }
+ versionsForInflightEvents_.add(versionNumber);
}
- versionsForInflightEvents_.add(versionNumber);
return true;
}
- public int size() {
- return versionsForInflightEvents_.size();
+ /**
+ * Get the size of in-flight DDL or DML events list
+ * @param isInsertEvent if true, return size of Insert events list
+ * if false, return size of DDL events list
+ * @return size of events list
+ */
+ public int size(boolean isInsertEvent) {
+ if (isInsertEvent) {
+ return idsForInflightDmlEvents_.size();
+ } else {
+ return versionsForInflightEvents_.size();
+ }
}
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 2be56e9..e64578b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -269,7 +269,7 @@ public class MetastoreEvents {
protected final String tblName_;
// eventId of the event. Used instead of calling getter on event_ everytime
- protected final long eventId_;
+ protected long eventId_;
// eventType from the NotificationEvent
protected final MetastoreEventType eventType_;
@@ -419,13 +419,15 @@ public class MetastoreEvents {
* serviceId and version. More details on complete flow of self-event handling
* logic can be read in <code>MetastoreEventsProcessor</code> documentation.
*
+ * @param isInsertEvent if true, check in flight events list of Insert event
+ * if false, check events list of DDL
* @return True if this event is a self-generated event. If the returned value is
* true, this method also clears the version number from the catalog database/table.
* Returns false if the version numbers or service id don't match
*/
- protected boolean isSelfEvent() {
+ protected boolean isSelfEvent(boolean isInsertEvent) {
try {
- if (catalog_.evaluateSelfEvent(getSelfEventContext())) {
+ if (catalog_.evaluateSelfEvent(isInsertEvent, getSelfEventContext())) {
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc();
return true;
}
@@ -434,6 +436,8 @@ public class MetastoreEvents {
}
return false;
}
+
+ protected boolean isSelfEvent() { return isSelfEvent(false); }
}
public static String getStringProperty(
@@ -766,19 +770,25 @@ public class MetastoreEvents {
@Override
public SelfEventContext getSelfEventContext() {
- throw new UnsupportedOperationException("Self-event evaluation is not implemented"
- + " for insert event type");
+ if (insertPartition_ != null) {
+ // create selfEventContext for insert partition event
+ List<TPartitionKeyValue> tPartSpec =
+ getTPartitionSpecFromHmsPartition(msTbl_, insertPartition_);
+ return new SelfEventContext(dbName_, tblName_, Arrays.asList(tPartSpec),
+ insertPartition_.getParameters(), eventId_);
+ } else {
+ // create selfEventContext for insert table event
+ return new SelfEventContext(
+ dbName_, tblName_, null, msTbl_.getParameters(), eventId_);
+ }
}
- /**
- * Currently we do not check for self-events in Inserts. Existing self-events logic
- * cannot be used for insert events since firing insert event does not allow us to
- * modify table parameters in HMS. Hence, we cannot get CatalogServiceIdentifiers in
- * Insert Events.
- * TODO: Handle self-events for insert case.
- */
@Override
public void process() throws MetastoreNotificationException {
+ if (isSelfEvent(true)) {
+ infoLog("Not processing the event as it is a self-event");
+ return;
+ }
// Reload the whole table if it's a transactional table.
if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
insertPartition_ = null;
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
index 83aa69b..10dd4eb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
@@ -32,15 +32,21 @@ import org.apache.impala.thrift.TPartitionKeyValue;
public class SelfEventContext {
private final String dbName_;
private final String tblName_;
+ private final long insertEventId_;
// version number from the event object parameters used for self-event detection
private final long versionNumberFromEvent_;
// service id from the event object parameters used for self-event detection
- private final String serviceIdFromEvent_;
+ private final String serviceidFromEvent_;
private final List<List<TPartitionKeyValue>> partitionKeyValues_;
SelfEventContext(String dbName, String tblName,
Map<String, String> parameters) {
- this(dbName, tblName, null, parameters);
+ this(dbName, tblName, null, parameters, -1);
+ }
+
+ SelfEventContext(String dbName, String tblName,
+ List<List<TPartitionKeyValue>> partitionKeyValues, Map<String, String> parameters) {
+ this(dbName, tblName, partitionKeyValues, parameters, -1);
}
/**
@@ -55,17 +61,17 @@ public class SelfEventContext {
*/
SelfEventContext(String dbName, @Nullable String tblName,
@Nullable List<List<TPartitionKeyValue>> partitionKeyValues,
- Map<String, String> parameters) {
+ Map<String, String> parameters, long eventId) {
Preconditions.checkNotNull(parameters);
this.dbName_ = Preconditions.checkNotNull(dbName);
this.tblName_ = tblName;
this.partitionKeyValues_ = partitionKeyValues;
+ insertEventId_ = eventId;
versionNumberFromEvent_ = Long.parseLong(
MetastoreEvents.getStringProperty(parameters,
MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
- serviceIdFromEvent_ =
- MetastoreEvents.getStringProperty(parameters,
- MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
+ serviceidFromEvent_ = MetastoreEvents.getStringProperty(
+ parameters, MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
}
public String getDbName() {
@@ -76,13 +82,13 @@ public class SelfEventContext {
return tblName_;
}
+ public long getIdFromEvent() { return insertEventId_; }
+
public long getVersionNumberFromEvent() {
return versionNumberFromEvent_;
}
- public String getServiceIdFromEvent() {
- return serviceIdFromEvent_;
- }
+ public String getServiceIdFromEvent() { return serviceidFromEvent_; }
public List<List<TPartitionKeyValue>> getPartitionKeyValues() {
return partitionKeyValues_ == null ?
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index c3957b9..c62ac6f 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -126,6 +126,8 @@ public class BackendConfig {
return backendCfg_.hms_event_polling_interval_s;
}
+ public boolean isInsertEventsEnabled() { return backendCfg_.enable_insert_events; }
+
public boolean isOrcScannerEnabled() {
return backendCfg_.enable_orc_scanner;
}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 63a7b81..3ac2a05 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -711,7 +711,7 @@ public class CatalogOpExecutor {
// pre-existing there is no alter table event generated. Hence we should
// only add the versions for in-flight events when we are sure that the
// partition was really added.
- catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
+ catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
addTableToCatalogUpdate(refreshedTable, response.result);
}
reloadMetadata = false;
@@ -851,7 +851,7 @@ public class CatalogOpExecutor {
reloadTableSchema, null, "ALTER TABLE " + params.getAlter_type().name());
// now that HMS alter operation has succeeded, add this version to list of
// inflight events in catalog table if event processing is enabled
- catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
+ catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
addTableToCatalogUpdate(tbl, response.result);
}
} finally {
@@ -3098,7 +3098,7 @@ public class CatalogOpExecutor {
"'%s' and the new table name '%s' may fix the problem." , tableName.toString(),
newTableName.toString()));
}
- catalog_.addVersionsForInflightEvents(result.second, newCatalogVersion);
+ catalog_.addVersionsForInflightEvents(false, result.second, newCatalogVersion);
// TODO(todd): if client is a 'v2' impalad, only send back invalidation
response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
@@ -3678,7 +3678,7 @@ public class CatalogOpExecutor {
// catalog service identifiers
if (catalog_.getCatalogServiceId().equals(serviceId)) {
Preconditions.checkNotNull(version);
- hdfsPartition.addToVersionsForInflightEvents(Long.parseLong(version));
+ hdfsPartition.addToVersionsForInflightEvents(false, Long.parseLong(version));
}
}
@@ -4430,7 +4430,7 @@ public class CatalogOpExecutor {
}
/**
- * Populates insert event data and calls fireInsertEventAysnc() if external event
+ * Populates insert event data and calls fireInsertEvents() if external event
* processing is enabled. This is no-op if event processing is disabled or there are
* no existing partitions affected by this insert.
*
@@ -4440,11 +4440,16 @@ public class CatalogOpExecutor {
*/
private void createInsertEvents(Table table,
List<FeFsPartition> affectedExistingPartitions, boolean isInsertOverwrite) {
- if (!catalog_.isEventProcessingActive() ||
- affectedExistingPartitions.size() == 0) return;
+ boolean isInsertEventsEnabled = BackendConfig.INSTANCE.isInsertEventsEnabled();
+ if (!catalog_.isEventProcessingActive() || !isInsertEventsEnabled
+ || affectedExistingPartitions.size() == 0) {
+ return;
+ }
// List of all insert events that we call HMS fireInsertEvent() on.
List<InsertEventInfo> insertEventInfos = new ArrayList<>();
+ // List of all partitions that we insert into
+ List<HdfsPartition> partitions = new ArrayList<>();
// Map of partition names to file names of all existing partitions touched by the
// insert.
@@ -4471,58 +4476,54 @@ public class CatalogOpExecutor {
// Find the delta of the files added by the insert if it is not an overwrite
// operation. HMS fireListenerEvent() expects an empty list if no new files are
// added or if the operation is an insert overwrite.
+ HdfsPartition hdfsPartition = (HdfsPartition) part;
Set<String> deltaFiles = new HashSet<>();
List<String> partVals = null;
if (!isInsertOverwrite) {
- String partitionName = part.getPartitionName() + "/";
+ String partitionName = hdfsPartition.getPartitionName() + "/";
Set<String> filesPostInsert =
partitionFilesMapPostInsert.get(partitionName);
if (table.getNumClusteringCols() > 0) {
Set<String> filesBeforeInsert =
partitionFilesMapBeforeInsert.get(partitionName);
deltaFiles = Sets.difference(filesBeforeInsert, filesPostInsert);
- partVals = part.getPartitionValuesAsStrings(true);
+ partVals = hdfsPartition.getPartitionValuesAsStrings(true);
} else {
Map.Entry<String, Set<String>> entry =
partitionFilesMapBeforeInsert.entrySet().iterator().next();
deltaFiles = Sets.difference(entry.getValue(), filesPostInsert);
}
LOG.info("{} new files detected for table {} partition {}.",
- filesPostInsert.size(), table.getTableName(), part.getPartitionName());
+ filesPostInsert.size(), table.getTableName(),
+ hdfsPartition.getPartitionName());
}
if (deltaFiles != null || isInsertOverwrite) {
// Collect all the insert events.
- insertEventInfos.add(new InsertEventInfo(table.getDb().getName(),
- table.getName(), partVals, deltaFiles, isInsertOverwrite));
+ insertEventInfos.add(
+ new InsertEventInfo(partVals, deltaFiles, isInsertOverwrite));
+ if (partVals != null) {
+ // insert into partition
+ partitions.add(hdfsPartition);
+ }
} else {
LOG.info("No new files were created, and is not a replace. Skipping "
+ "generating INSERT event.");
}
}
- // Firing insert events by making calls to HMS APIs can be slow for tables with
- // large number of partitions. Hence, we fire the insert events asynchronously.
- fireInsertEventsAsync(insertEventInfos);
- }
-
- /**
- * Helper method to fire insert events asynchronously. This creates a single thread
- * to execute the fireInsertEvent method and shuts down the thread after it has
- * finished. In case of any exception, we just log the failure of firing insert events.
- */
- private void fireInsertEventsAsync(List<InsertEventInfo> insertEventInfos) {
- ExecutorService fireInsertEventThread = Executors.newSingleThreadExecutor();
- CompletableFuture.runAsync(() -> {
- for (InsertEventInfo info : insertEventInfos) {
- try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
- MetaStoreUtil.fireInsertEvent(metaStoreClient.getHiveClient(), info);
- } catch (Exception e) {
- LOG.error("Failed to fire insert event. Some tables might not be"
- + " refreshed on other impala clusters.", e);
+ MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient();
+ List<Long> eventIds = MetastoreShim.fireInsertEvents(metaStoreClient,
+ insertEventInfos, table.getDb().getName(), table.getName());
+ if (!eventIds.isEmpty()) {
+ if (partitions.size() == 0) { // insert into table
+ catalog_.addVersionsForInflightEvents(true, table, eventIds.get(0));
+ } else { // insert into partition
+ for (int par_idx = 0; par_idx < partitions.size(); par_idx++) {
+ partitions.get(par_idx).addToVersionsForInflightEvents(
+ true, eventIds.get(par_idx));
}
}
- }, Executors.newSingleThreadExecutor()).thenRun(() ->
- fireInsertEventThread.shutdown());
+ }
}
/**
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index 7bc463f..81b62f6 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -30,9 +30,6 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.FireEventRequest;
-import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
-import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
@@ -317,13 +314,18 @@ public class MetaStoreUtil {
}
/**
+ * Check if the hms table is a bucketed table or not
+ */
+ public static boolean isBucketedTable(Table msTbl) {
+ Preconditions.checkNotNull(msTbl);
+ return msTbl.getSd().getNumBuckets() > 0;
+ }
+
+ /**
* A helper class that encapsulates all the information needed to fire and insert event
* with HMS.
*/
public static class InsertEventInfo {
- private String dbName;
- private String tableName;
-
// List of partition values corresponding to the partition keys in
// a partitioned table. This is null for non-partitioned table.
private List<String> partVals;
@@ -337,48 +339,15 @@ public class MetaStoreUtil {
// false otherwise.
private boolean isOverwrite;
- public InsertEventInfo(String dbName, String tableName, List<String> partVals,
- Collection<String> newFiles, boolean isOverwrite) {
- this.dbName = dbName;
- this.tableName = tableName;
+ public InsertEventInfo(
+ List<String> partVals, Collection<String> newFiles, boolean isOverwrite) {
this.partVals = partVals;
this.newFiles = newFiles;
this.isOverwrite = isOverwrite;
}
- }
- /**
- * Fires an insert event to HMS notification log. For partitioned table, each
- * existing partition touched by the insert will fire a separate insert event.
- *
- * @param msClient Metastore client,
- * @param info A singe insert event encapsulating the information needed to fire insert
- * event with HMS.
- */
- public static void fireInsertEvent(IMetaStoreClient msClient,
- InsertEventInfo info) throws TException {
- Preconditions.checkNotNull(msClient);
- Preconditions.checkNotNull(info.dbName);
- Preconditions.checkNotNull(info.tableName);
- Preconditions.checkNotNull(info.newFiles);
- LOG.debug("Firing an insert event for {}", info.tableName);
- FireEventRequestData data = new FireEventRequestData();
- InsertEventRequestData insertData = new InsertEventRequestData();
- data.setInsertData(insertData);
- FireEventRequest rqst = new FireEventRequest(true, data);
- rqst.setDbName(info.dbName);
- rqst.setTableName(info.tableName);
- insertData.setFilesAdded(new ArrayList<>(info.newFiles));
- insertData.setReplace(info.isOverwrite);
- if (info.partVals != null) rqst.setPartitionVals(info.partVals);
- msClient.fireListenerEvent(rqst);
- }
-
- /**
- * Check if the hms table is a bucketed table or not
- */
- public static boolean isBucketedTable(Table msTbl) {
- Preconditions.checkNotNull(msTbl);
- return msTbl.getSd().getNumBuckets() > 0;
+ public List<String> getPartVals() { return this.partVals; }
+ public Collection<String> getNewFiles() { return this.newFiles; }
+ public boolean isOverwrite() { return this.isOverwrite; }
}
}
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 41995a8..ba4a0b5 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -90,6 +91,7 @@ import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.service.FeSupport;
import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.TestUtils;
import org.apache.impala.thrift.TAlterDbParams;
import org.apache.impala.thrift.TAlterDbSetOwnerParams;
import org.apache.impala.thrift.TAlterDbType;
@@ -107,6 +109,7 @@ import org.apache.impala.thrift.TAlterTableSetRowFormatParams;
import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
import org.apache.impala.thrift.TAlterTableType;
import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
+import org.apache.impala.thrift.TCatalogServiceRequestHeader;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TColumnType;
import org.apache.impala.thrift.TCreateDbParams;
@@ -134,12 +137,14 @@ import org.apache.impala.thrift.TTableStats;
import org.apache.impala.thrift.TTypeNode;
import org.apache.impala.thrift.TTypeNodeType;
import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.thrift.TUpdateCatalogRequest;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -712,6 +717,67 @@ public class MetastoreEventsProcessorTest {
}
/**
+ * Test insert from impala. Insert into table and partition from impala
+ * should be treated as self-event.
+ */
+ @Test
+ public void testInsertFromImpala() throws ImpalaException {
+ Assume.assumeTrue("Skipping this test because it only works with Hive-3 or greater",
+ TestUtils.getHiveMajorVersion() >= 3);
+ // Test insert into multiple partitions
+ createDatabaseFromImpala(TEST_DB_NAME, null);
+ String tableToInsertPart = "tbl_with_mul_part";
+ createTableFromImpala(TEST_DB_NAME, tableToInsertPart, true);
+ String tableToInsertMulPart = "tbl_to_insert_mul_part";
+ createTableFromImpala(TEST_DB_NAME, tableToInsertMulPart, true);
+ // add first partition
+ TPartitionDef partitionDef = new TPartitionDef();
+ partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1"));
+ partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "100"));
+ alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
+ alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
+ // add second partition
+ partitionDef = new TPartitionDef();
+ partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1"));
+ partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "200"));
+ alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
+ alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
+ eventsProcessor_.processEvents();
+ // count self event from here, numberOfSelfEventsBefore=4 as we have 4 ADD PARTITION
+ // events
+ long numberOfSelfEventsBefore =
+ eventsProcessor_.getMetrics()
+ .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
+ .getCount();
+ // insert into partition
+ insertFromImpala(tableToInsertPart, true, "1", "100");
+ insertFromImpala(tableToInsertPart, true, "1", "200");
+ // insert into multiple partition
+ Set<String> created_partitions = new HashSet<String>();
+ String partition1 = "p1=1/p2=100/";
+ String partition2 = "p1=1/p2=200/";
+ created_partitions.add(partition1);
+ created_partitions.add(partition2);
+ insertMulPartFromImpala(tableToInsertMulPart, tableToInsertPart, created_partitions);
+ eventsProcessor_.processEvents();
+
+ // Test insert into table
+ String tableToInsert = "tbl_to_insert";
+ createTableFromImpala(TEST_DB_NAME, tableToInsert, false);
+ insertFromImpala(tableToInsert, false, "", "");
+ eventsProcessor_.processEvents();
+
+ long selfEventsCountAfter =
+ eventsProcessor_.getMetrics()
+ .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
+ .getCount();
+ // 2 single insert partition events, 1 multi insert partitions which includes 2 single
+ // insert events 1 single insert table event
+ assertEquals("Unexpected number of self-events generated",
+ numberOfSelfEventsBefore + 5, selfEventsCountAfter);
+ }
+
+ /**
* Test generates a sequence of create_table, insert and drop_table in the event stream
* to make sure when the insert event is processed on a removed table, it doesn't cause
* any issues with the event processing.
@@ -835,9 +901,10 @@ public class MetastoreEventsProcessorTest {
List <String> newFiles = addFilesToDirectory(parentPath, "testFile.",
totalNumberOfFilesToAdd, isOverwrite);
try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
- MetaStoreUtil.fireInsertEvent(metaStoreClient.getHiveClient(),
- new InsertEventInfo(msTbl.getDbName(), msTbl.getTableName(), null,
- newFiles, isOverwrite));
+ List<InsertEventInfo> insertEventInfos = new ArrayList<>();
+ insertEventInfos.add(new InsertEventInfo(null, newFiles, isOverwrite));
+ MetastoreShim.fireInsertEventHelper(metaStoreClient.getHiveClient(),
+ insertEventInfos, msTbl.getDbName(), msTbl.getTableName());
}
}
@@ -2711,6 +2778,57 @@ public class MetastoreEventsProcessorTest {
catalogOpExecutor_.execDdlRequest(req);
}
+ /**
+ * Insert multiple partitions into table from Impala
+ */
+ private void insertMulPartFromImpala(String tblName1, String tblName2,
+ Set<String> created_partitions) throws ImpalaException {
+ String insert_mul_part = String.format(
+ "insert into table %s partition(p1, p2) select * from %s", tblName1, tblName2);
+ TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
+ TEST_DB_NAME, tblName1, insert_mul_part, created_partitions);
+ catalogOpExecutor_.updateCatalog(testInsertRequest);
+ }
+
+ /**
+ * Insert into table or partition from Impala
+ * @param tblName
+ * @param isPartitioned
+ * @return
+ */
+ private void insertFromImpala(String tblName, boolean isPartitioned, String p1val,
+ String p2val) throws ImpalaException {
+ String partition = String.format("partition (p1=%s, p2='%s')", p1val, p2val);
+ String test_insert_tbl = String.format("insert into table %s %s values ('a','aa') ",
+ tblName, isPartitioned ? partition : "");
+ Set<String> created_partitions = new HashSet<String>();
+ String created_part_str =
+ isPartitioned ? String.format("p1=%s/p2=%s/", p1val, p2val) : "";
+ created_partitions.add(created_part_str);
+ TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
+ TEST_DB_NAME, tblName, test_insert_tbl, created_partitions);
+ catalogOpExecutor_.updateCatalog(testInsertRequest);
+ }
+
+ /**
+ * Create DML request to Catalog
+ * @param dBName
+ * @param tableName
+ * @param redacted_sql_stmt
+ * @param created_partitions
+ * @return
+ */
+ private TUpdateCatalogRequest createTestTUpdateCatalogRequest(String dBName,
+ String tableName, String redacted_sql_stmt, Set<String> created_partitions) {
+ TUpdateCatalogRequest tUpdateCatalogRequest = new TUpdateCatalogRequest();
+ tUpdateCatalogRequest.setDb_name(dBName);
+ tUpdateCatalogRequest.setTarget_table(tableName);
+ tUpdateCatalogRequest.setCreated_partitions((created_partitions));
+ tUpdateCatalogRequest.setHeader(new TCatalogServiceRequestHeader());
+ tUpdateCatalogRequest.getHeader().setRedacted_sql_stmt(redacted_sql_stmt);
+ return tUpdateCatalogRequest;
+ }
+
private TColumn getScalarColumn(String colName, TPrimitiveType type) {
TTypeNode tTypeNode = new TTypeNode(TTypeNodeType.SCALAR);
tTypeNode.setScalar_type(new TScalarType(type));
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index f39f55f..cdd1a48 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -21,6 +21,7 @@ import pytest
from tests.common.skip import SkipIfHive2
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import HIVE_MAJOR_VERSION
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.util.hive_utils import HiveDbWrapper
from tests.util.event_processor_utils import EventProcessorUtils
@@ -268,11 +269,6 @@ class TestEventProcessing(CustomClusterTestSuite):
# add partition
"alter table {0}.{1} add if not exists partition (year=1111, month=1)".format(
db_name, tbl2),
- # insert into a existing partition; generates ALTER_PARTITION
- # TODO add support for insert_events (IMPALA-8632)
- # "insert into table {0}.{1} partition (year, month) "
- # "select * from functional.alltypessmall where year=2009 and month=1".format(
- # db_name, tbl2),
# compute stats will generates ALTER_PARTITION
"compute stats {0}.{1}".format(db_name, tbl2),
"alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name)],
@@ -295,6 +291,11 @@ class TestEventProcessing(CustomClusterTestSuite):
"alter table {0}.{1} drop if exists partition (year=2100, month=1)".format(
db_name, tbl_name)]
}
+ if HIVE_MAJOR_VERSION >= 3:
+ # insert into a existing partition; generates INSERT events
+ self_event_test_queries[True].append("insert into table {0}.{1} partition "
+ "(year, month) select * from functional.alltypessmall where year=2009 "
+ "and month=1".format(db_name, tbl2))
return self_event_test_queries
def __get_hive_test_queries(self, db_name, recover_tbl_name):