You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/30 00:26:38 UTC

[impala] branch master updated (5ced916 -> 1090b3b)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 5ced916  IMPALA-8454 (part 2): Initial support for recursive file listing within a partition
     new 15a33d1  IMPALA-7971: Add support for insert events in event processor.
     new 1090b3b  IMPALA-8467: ParquetPlainEncoder::Decode leads to multiple test failures in ASAN builds

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/parquet/parquet-common.h               |  14 +-
 be/src/service/client-request-state.cc             |   1 +
 common/thrift/CatalogService.thrift                |   3 +
 .../impala/catalog/CatalogServiceCatalog.java      |  30 ++++
 .../org/apache/impala/catalog/HdfsPartition.java   |  15 ++
 .../impala/catalog/events/MetastoreEvents.java     | 133 +++++++++++++++++-
 .../catalog/events/MetastoreEventsProcessor.java   |   3 +
 .../apache/impala/service/CatalogOpExecutor.java   | 127 +++++++++++++++--
 .../events/MetastoreEventsProcessorTest.java       | 152 ++++++++++++++++++++-
 fe/src/test/resources/hive-site.xml.py             |   1 +
 tests/custom_cluster/test_event_processing.py      | 140 +++++++++++++++++++
 11 files changed, 605 insertions(+), 14 deletions(-)
 create mode 100644 tests/custom_cluster/test_event_processing.py


[impala] 01/02: IMPALA-7971: Add support for insert events in event processor.

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 15a33d1baaf4723794e52825a5c4082ff7487507
Author: Anurag Mantripragada <an...@cloudera.com>
AuthorDate: Thu Mar 28 20:25:55 2019 -0700

    IMPALA-7971: Add support for insert events in event processor.
    
    This patch adds support for detecting and processing insert events
    triggered by impala as well as external engines (eg.Hive).
    
    Inserts from Impala will fire an insert event notification.
    Using this event, event-processor will refresh table/partition.
    Both insert into and overwrite are supported for tables/partitions.
    
    Known Issues:
    1. Inserts into tables from Hive are ignored by the event processor
       as these inserts create an ALTER event first followed by an
       INSERT event. The alter will invalidate table making the refresh
       a no-op. Insert into partitions from hive will create an INSERT
       event first followed by an ALTER event. In this case, there is
       an unnecessary table invalidate after a refresh.
    2. Existing self-events logic cannot be used for insert events since
       firing insert event does not allow us to modify table parameters in
       HMS. This means we cannot get the CatalogServiceIdentifiers in insert
       events. Therefore, the event-processor will also refresh the tables
       for which insert operation is performed through Impala.
    
    Testing:
    1. Added new custom cluster tests to run different insert commands from
    hive and verified new data is available in Impala without invalidate
    metadata.
    
    2. Added a test in MetastoreEventsProcessor for testing insert events.
    
    Change-Id: I7c48c5ca4bde18d532c582980aebbc25f1bf1c52
    Reviewed-on: http://gerrit.cloudera.org:8080/12889
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
---
 be/src/service/client-request-state.cc             |   1 +
 common/thrift/CatalogService.thrift                |   3 +
 .../impala/catalog/CatalogServiceCatalog.java      |  30 ++++
 .../org/apache/impala/catalog/HdfsPartition.java   |  15 ++
 .../impala/catalog/events/MetastoreEvents.java     | 133 +++++++++++++++++-
 .../catalog/events/MetastoreEventsProcessor.java   |   3 +
 .../apache/impala/service/CatalogOpExecutor.java   | 127 +++++++++++++++--
 .../events/MetastoreEventsProcessorTest.java       | 152 ++++++++++++++++++++-
 fe/src/test/resources/hive-site.xml.py             |   1 +
 tests/custom_cluster/test_event_processing.py      | 140 +++++++++++++++++++
 10 files changed, 592 insertions(+), 13 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 5e667fd..d3d3a04 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1079,6 +1079,7 @@ Status ClientRequestState::UpdateCatalog() {
 
       catalog_update.target_table = finalize_params.table_name;
       catalog_update.db_name = finalize_params.table_db;
+      catalog_update.is_overwrite = finalize_params.is_overwrite;
 
       Status cnxn_status;
       const TNetworkAddress& address =
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index cd5b775..f970d17 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -189,6 +189,9 @@ struct TUpdateCatalogRequest {
   // List of partitions that are new and need to be created. May
   // include the root partition (represented by the empty string).
   6: required set<string> created_partitions;
+
+  // True if the update corresponds to an "insert overwrite" operation
+  7: required bool is_overwrite;
 }
 
 // Response from a TUpdateCatalogRequest
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index cd64b09..ea4486e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2107,6 +2107,36 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Refresh table if exists. Returns true if reloadTable() succeeds, false
+   * otherwise. Throws CatalogException if reloadTable() is unsuccessful. Throws
+   * DatabaseNotFoundException if Db doesn't exist.
+   */
+  public boolean refreshTableIfExists(String dbName, String tblName)
+      throws CatalogException {
+    Table table = getTable(dbName, tblName);
+    if (table == null || table instanceof IncompleteTable) return false;
+    reloadTable(table);
+    return true;
+  }
+
+  /**
+   * Refresh partition if exists. Returns true if reloadPartitition() succeeds, false
+   * otherwise. Throws CatalogException if reloadPartition() is unsuccessful. Throws
+   * DatabaseNotFoundException if Db doesn't exist.
+   */
+  public boolean refreshPartitionIfExists(String dbName, String tblName,
+      Map<String, String> partSpec) throws CatalogException {
+    Table table = getTable(dbName, tblName);
+    if (table == null || table instanceof IncompleteTable) return false;
+    List<TPartitionKeyValue> tPartSpec = new ArrayList<>(partSpec.size());
+    for (Map.Entry<String, String> entry : partSpec.entrySet()) {
+      tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
+    }
+    reloadPartition(table, tPartSpec);
+    return true;
+  }
+
+  /**
    * Adds a new role with the given name and grant groups to the AuthorizationPolicy.
    * If a role with the same name already exists it will be overwritten.
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index b559d44..5ccb9ed 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -855,6 +856,20 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     return Lists.transform(encodedFileDescriptors_, FileDescriptor.FROM_BYTES);
   }
 
+  /**
+   * Returns a set of fully qualified file names in the partition.
+   */
+  public Set<String> getFileNames() {
+    List<FileDescriptor> fdList = getFileDescriptors();
+    Set<String> fileNames = new HashSet<>(fdList.size());
+    // Fully qualified file names.
+    String location = getLocation();
+    for (FileDescriptor fd : fdList) {
+      fileNames.add(location + Path.SEPARATOR + fd.getFileName());
+    }
+    return fileNames;
+  }
+
   public void setFileDescriptors(List<FileDescriptor> descriptors) {
     // Store an eagerly transformed-and-copied list so that we drop the memory usage
     // of the flatbuffer wrapper.
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 8deff92..23ba456 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -18,11 +18,13 @@
 package org.apache.impala.catalog.events;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -36,6 +38,7 @@ import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONCreateDatabaseMessage;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage;
@@ -82,6 +85,7 @@ public class MetastoreEvents {
     ADD_PARTITION("ADD_PARTITION"),
     ALTER_PARTITION("ALTER_PARTITION"),
     DROP_PARTITION("DROP_PARTITION"),
+    INSERT("INSERT"),
     OTHER("OTHER");
 
     private final String eventType_;
@@ -155,6 +159,9 @@ public class MetastoreEvents {
         case ALTER_PARTITION:
           // alter partition events triggers invalidate table currently
           return new AlterPartitionEvent(catalog_, metrics_, event);
+        case INSERT:
+          // Insert events trigger refresh on a table/partition currently
+          return new InsertEvent(catalog_, metrics_, event);
         default:
           // ignore all the unknown events by creating a IgnoredEvent
           return new IgnoredEvent(catalog_, metrics_, event);
@@ -453,6 +460,9 @@ public class MetastoreEvents {
    * Base class for all the table events
    */
   public static abstract class MetastoreTableEvent extends MetastoreEvent {
+    // tblName from the event
+    protected final String tblName_;
+
     // tbl object from the Notification event, corresponds to the before tableObj in
     // case of alter events
     protected org.apache.hadoop.hive.metastore.api.Table msTbl_;
@@ -460,7 +470,7 @@ public class MetastoreEvents {
     private MetastoreTableEvent(CatalogServiceCatalog catalogServiceCatalog,
         Metrics metrics, NotificationEvent event) {
       super(catalogServiceCatalog, metrics, event);
-      Preconditions.checkNotNull(tblName_);
+      tblName_ = Preconditions.checkNotNull(event.getTableName());
       debugLog("Creating event {} of type {} on table {}", eventId_, eventType_,
           getFullyQualifiedTblName());
     }
@@ -653,6 +663,121 @@ public class MetastoreEvents {
   }
 
   /**
+   *  Metastore event handler for INSERT events. Handles insert events at both table
+   *  and partition scopes.
+   */
+  public static class InsertEvent extends MetastoreTableEvent {
+
+    // Represents the partition for this insert. Null if the table is unpartitioned.
+    private final org.apache.hadoop.hive.metastore.api.Partition insertPartition_;
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory instead
+     */
+    @VisibleForTesting
+    InsertEvent(CatalogServiceCatalog catalog, Metrics metrics,
+        NotificationEvent event) throws MetastoreNotificationException {
+      super(catalog, metrics, event);
+      Preconditions.checkArgument(MetastoreEventType.INSERT.equals(eventType_));
+      InsertMessage insertMessage =
+          MetastoreEventsProcessor.getMessageFactory()
+              .getDeserializer().getInsertMessage(event.getMessage());
+      try {
+        msTbl_ = Preconditions.checkNotNull(insertMessage.getTableObj());
+        insertPartition_ = insertMessage.getPtnObj();
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(debugString("Unable to "
+            + "parse insert message"), e);
+      }
+    }
+
+    /**
+     * Currently we do not check for self-events in Inserts. Existing self-events logic
+     * cannot be used for insert events since firing insert event does not allow us to
+     * modify table parameters in HMS. Hence, we cannot get CatalogServiceIdentifiers in
+     * Insert Events.
+     * TODO: Handle self-events for insert case.
+     */
+    @Override
+    public void process() throws MetastoreNotificationException {
+      if (insertPartition_ != null)
+        processPartitionInserts();
+      else {
+        processTableInserts();
+      }
+    }
+
+    /**
+     * Process partition inserts
+     */
+    private void processPartitionInserts() throws MetastoreNotificationException {
+      // For partitioned table, refresh the partition only.
+      Preconditions.checkNotNull(insertPartition_);
+      Map<String, String> partSpec = new HashMap<>();
+      List<org.apache.hadoop.hive.metastore.api.FieldSchema> fsList =
+          msTbl_.getPartitionKeys();
+      List<String> partVals = insertPartition_.getValues();
+      Preconditions.checkNotNull(partVals);
+      Preconditions.checkState(fsList.size() == partVals.size());
+      for (int i = 0; i < fsList.size(); i++) {
+        partSpec.put(fsList.get(i).getName(), partVals.get(i));
+      }
+      try {
+        // Ignore event if table or database is not in catalog. Throw exception if
+        // refresh fails.
+        if (!catalog_.refreshPartitionIfExists(dbName_, tblName_, partSpec)) {
+          debugLog("Refresh of table {} partition {} after insert "
+                  + "event failed as the table is not present in the catalog.",
+              getFullyQualifiedTblName(), Joiner.on(",").withKeyValueSeparator("=")
+                  .join(partSpec));
+        } else {
+          infoLog("Table {} partition {} has been refreshed after insert.",
+              getFullyQualifiedTblName(), Joiner.on(",").withKeyValueSeparator("=")
+                  .join(partSpec));
+        }
+      } catch (DatabaseNotFoundException e) {
+        debugLog("Refresh of table {} partition {} for insert "
+                + "event failed as the database is not present in the catalog.",
+            getFullyQualifiedTblName(), Joiner.on(",").withKeyValueSeparator("=")
+                .join(partSpec));
+      } catch (CatalogException e) {
+        throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
+                + "partition on table {} partition {} failed. Event processing cannot "
+                + "continue. Issue and invalidate command to reset the event processor "
+                + "state.", getFullyQualifiedTblName(),
+            Joiner.on(",").withKeyValueSeparator("=").join(partSpec)));
+      }
+    }
+
+    /**
+     *  Process unpartitioned table inserts
+     */
+    private void processTableInserts() throws MetastoreNotificationException {
+      // For non-partitioned tables, refresh the whole table.
+      Preconditions.checkArgument(insertPartition_ == null);
+      try {
+        // Ignore event if table or database is not in the catalog. Throw exception if
+        // refresh fails.
+        if (!catalog_.refreshTableIfExists(dbName_, tblName_)) {
+          debugLog("Automatic refresh table {} failed as the table is not "
+              + "present in the catalog. ", getFullyQualifiedTblName());
+        } else {
+          infoLog("Table {} has been refreshed after insert.",
+              getFullyQualifiedTblName());
+        }
+      } catch (DatabaseNotFoundException e) {
+        debugLog("Automatic refresh of table {} insert failed as the "
+            + "database is not present in the catalog.", getFullyQualifiedTblName());
+      } catch (CatalogException e) {
+        throw new MetastoreNotificationNeedsInvalidateException(
+            debugString("Refresh table {} failed. Event processing "
+                + "cannot continue. Issue an invalidate metadata command to reset "
+                + "the event processor state.", getFullyQualifiedTblName()));
+      }
+    }
+  }
+
+  /**
    * MetastoreEvent for ALTER_TABLE event type
    */
   public static class AlterTableEvent extends TableInvalidatingEvent {
@@ -1116,6 +1241,12 @@ public class MetastoreEvents {
             + "it does not exist anymore", getFullyQualifiedTblName());
       }
     }
+
+    protected static String getStringProperty(
+        Map<String, String> params, String key, String defaultVal) {
+      if (params == null) return defaultVal;
+      return params.getOrDefault(key, defaultVal);
+    }
   }
 
   public static class AddPartitionEvent extends TableInvalidatingEvent {
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 9bd38b3..679344a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -115,6 +115,9 @@ import org.slf4j.LoggerFactory;
  * |             |              |            |            |
  * | DROP EVENT  | Remove       | Remove     | Ignore     |
  * |             |              |            |            |
+ * |             |              |            |            |
+ * | INSERT EVENT| Refresh      | Ignore     | Ignore     |
+ * |             |              |            |            |
  * +-------------+--------------+------------+------------+
  * </pre>
  *
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 4adca56..e8491ed 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.service;
 
+import com.google.common.collect.Iterables;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -44,6 +45,9 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -3555,6 +3559,7 @@ public class CatalogOpExecutor {
     final Timer.Context context
         = table.getMetrics().getTimer(HdfsTable.CATALOG_UPDATE_DURATION_METRIC).time();
     try {
+      // Get new catalog version for table in insert.
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
       // Collects the cache directive IDs of any cached table/partitions that were
@@ -3572,6 +3577,9 @@ public class CatalogOpExecutor {
       TableName tblName = new TableName(table.getDb().getName(), table.getName());
       List<String> errorMessages = Lists.newArrayList();
       HashSet<String> partsToLoadMetadata = null;
+      Collection<? extends FeFsPartition> parts =
+          FeCatalogUtils.loadAllPartitions((HdfsTable) table);
+      List<FeFsPartition> affectedExistingPartitions = new ArrayList<>();
       if (table.getNumClusteringCols() > 0) {
         // Set of all partition names targeted by the insert that need to be created
         // in the Metastore (partitions that do not currently exist in the catalog).
@@ -3581,8 +3589,6 @@ public class CatalogOpExecutor {
         HashSet<String> partsToCreate =
             Sets.newHashSet(update.getCreated_partitions());
         partsToLoadMetadata = Sets.newHashSet(partsToCreate);
-        Collection<? extends FeFsPartition> parts =
-            FeCatalogUtils.loadAllPartitions((HdfsTable)table);
         for (FeFsPartition partition: parts) {
           // TODO: In the BE we build partition names without a trailing char. In FE
           // we build partition name with a trailing char. We should make this
@@ -3591,13 +3597,16 @@ public class CatalogOpExecutor {
 
           // Attempt to remove this partition name from from partsToCreate. If remove
           // returns true, it indicates the partition already exists.
-          if (partsToCreate.remove(partName) && partition.isMarkedCached()) {
-            // The partition was targeted by the insert and is also a cached. Since
-            // data was written to the partition, a watch needs to be placed on the
-            // cache cache directive so the TableLoadingMgr can perform an async
-            // refresh once all data becomes cached.
-            cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId(
-                partition.getParameters()));
+          if (partsToCreate.remove(partName)) {
+            affectedExistingPartitions.add(partition);
+            if (partition.isMarkedCached()) {
+              // The partition was targeted by the insert and is also a cached. Since
+              // data was written to the partition, a watch needs to be placed on the
+              // cache cache directive so the TableLoadingMgr can perform an async
+              // refresh once all data becomes cached.
+              cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId(
+                  partition.getParameters()));
+            }
           }
           if (partsToCreate.size() == 0) break;
         }
@@ -3683,8 +3692,11 @@ public class CatalogOpExecutor {
             throw new InternalException("Error adding partitions", e);
           }
         }
+      } else {
+        // For non-partitioned table, only single part exists
+        FeFsPartition singlePart = Iterables.getOnlyElement((List<FeFsPartition>) parts);
+        affectedExistingPartitions.add(singlePart);
       }
-
       // Submit the watch request for the given cache directives.
       if (!cacheDirIds.isEmpty()) {
         catalog_.watchCacheDirs(cacheDirIds, tblName.toThrift());
@@ -3703,6 +3715,9 @@ public class CatalogOpExecutor {
       }
 
       loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata);
+      // After loading metadata, fire insert events if external event processing is
+      // enabled.
+      createInsertEvents(table, affectedExistingPartitions, update.is_overwrite);
       addTableToCatalogUpdate(table, response.result);
     } finally {
       context.stop();
@@ -3718,6 +3733,98 @@ public class CatalogOpExecutor {
   }
 
   /**
+   * Populates insert event data and calls fireInsertEvent() if external event processing
+   * is enabled. This is no-op if event processing is disabled or there are no existing
+   * partitions affected by this insert.
+   *
+   * @param affectedExistingPartitions List of existing partitions touched by the insert.
+   * @param isInsertOverwrite indicates if the operation was an insert overwrite. If it
+   *     is not, all the new files added by this insert is calculated.
+   */
+  private void createInsertEvents(Table table,
+      List<FeFsPartition> affectedExistingPartitions, boolean isInsertOverwrite) {
+    if (!catalog_.isExternalEventProcessingEnabled() ||
+        affectedExistingPartitions.size() == 0) return;
+    // Map of partition ids to file names of all existing partitions touched by the
+    // insert.
+    Map<Long, Set<String>> partitionFilesMap = new HashMap<>();
+    if (!isInsertOverwrite) {
+      for (FeFsPartition partition : affectedExistingPartitions) {
+        partitionFilesMap.put(partition.getId(),
+            ((HdfsPartition) partition).getFileNames());
+      }
+    }
+    // If table is partitioned, we add all existing partitions touched by this insert
+    // to the insert event. If it is not an insert overwrite operation, we find new
+    // files added by this insert.
+    Collection<? extends FeFsPartition> partsPostInsert;
+    partsPostInsert = table.getNumClusteringCols() > 0 ?
+        ((FeFsTable)table).loadPartitions(partitionFilesMap.keySet()) :
+            FeCatalogUtils.loadAllPartitions((HdfsTable) table);
+    for (FeFsPartition part : partsPostInsert) {
+      // Find the delta of the files added by the insert if it is not an overwrite
+      // operation. HMS fireListenerEvent() expects an empty list if no new files are
+      // added or if the operation is an insert overwrite.
+      Set<String> deltaFiles = new HashSet<>();
+      List<String> partVals = null;
+      if (!isInsertOverwrite) {
+        Set<String> filesPostInsert = ((HdfsPartition) part).getFileNames();
+        if (table.getNumClusteringCols() > 0) {
+          Set<String> filesBeforeInsert = partitionFilesMap.get(part.getId());
+          deltaFiles = Sets.difference(filesBeforeInsert, filesPostInsert);
+          partVals = part.getPartitionValuesAsStrings(true);
+        } else {
+          Map.Entry<Long, Set<String>> entry =
+              partitionFilesMap.entrySet().iterator().next();
+          deltaFiles = Sets.difference(entry.getValue(), filesPostInsert);
+        }
+        LOG.info("{} new files detected for table {} partition {}.",
+            filesPostInsert.size(), table.getTableName(), part.getPartitionName());
+      }
+      if (deltaFiles != null || isInsertOverwrite) {
+        fireInsertEvent(table, partVals, deltaFiles, isInsertOverwrite);
+      }
+      else {
+        LOG.info("No new files were created, and is not a replace. Skipping "
+            + "generating INSERT event.");
+      }
+    }
+  }
+
+  /**
+   *  Fires an insert event to HMS notification log. For partitioned table, each
+   *  existing partition touched by the insert will fire a separate insert event.
+   *
+   * @param newFiles Set of all the 'new' files added by this insert. This is empty in
+   * case of insert overwrite.
+   * @param partVals List of partition values corresponding to the partition keys in
+   * a partitioned table. This is null for non-partitioned table.
+   * @param isOverwrite If true, sets the 'replace' flag to true indicating that the
+   * operation was an insert overwrite in the notification log. Will set the same to
+   * false otherwise.
+   */
+  private void fireInsertEvent(Table tbl, List<String> partVals,
+      Set<String> newFiles, boolean isOverwrite) {
+    LOG.debug("Firing an insert event for {}.", tbl.getName());
+    FireEventRequestData data = new FireEventRequestData();
+    InsertEventRequestData insertData = new InsertEventRequestData();
+    data.setInsertData(insertData);
+    FireEventRequest rqst = new FireEventRequest(true, data);
+    rqst.setDbName(tbl.getDb().getName());
+    rqst.setTableName(tbl.getName());
+    insertData.setFilesAdded(new ArrayList<>(newFiles));
+    insertData.setReplace(isOverwrite);
+    rqst.setPartitionVals(partVals);
+
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().fireListenerEvent(rqst);
+    } catch (Exception e) {
+      LOG.error("Failed to fire insert event. Some tables might not be"
+          + " refreshed on other impala clusters.", e);
+    }
+  }
+
+  /**
    * Returns an existing, loaded table from the Catalog. Throws an exception if any
    * of the following are true:
    * - The table does not exist
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 36db63c..7b15057 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -30,17 +30,22 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Preconditions;
-
+import com.google.common.collect.Iterables;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -57,6 +62,8 @@ import org.apache.impala.authorization.NoopAuthorizationFactory.NoopAuthorizatio
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.DatabaseNotFoundException;
+import org.apache.impala.catalog.FeCatalogUtils;
+import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
@@ -65,9 +72,11 @@ import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.events.MetastoreEvents.AlterTableEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.InsertEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
+import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.CatalogOpExecutor;
@@ -511,6 +520,145 @@ public class MetastoreEventsProcessorTest {
   }
 
   /**
+   * Test insert events. Test creates a partitioned and a non-partitioned table and
+   * calls insertEvent tests on them.
+   */
+  @Test
+  public void testInsertEvents() throws TException, ImpalaException {
+    // Test insert into partition
+    createDatabase(TEST_DB_NAME, null);
+    String tableToInsertPart = "tbl_to_insert_part";
+    createTable(TEST_DB_NAME, tableToInsertPart, null, true);
+    testInsertEvents(TEST_DB_NAME, tableToInsertPart, true);
+
+    // Test insert into table
+    String tableToInsertNoPart = "tbl_to_insert_no_part";
+    createTable(TEST_DB_NAME, tableToInsertNoPart, null, false);
+    testInsertEvents(TEST_DB_NAME, tableToInsertNoPart,false);
+  }
+
+  /**
+   * Helper to test insert events. Creates a fake InsertEvent notification in the
+   * catalog and processes it. To simulate an insert, we load a file using FS APIs and
+   * verify the new file shows up after table/partition refresh.
+   */
+  public void testInsertEvents(String dbName, String tblName,
+      boolean isPartitionInsert) throws TException,
+      ImpalaException {
+
+    if (isPartitionInsert) {
+      // Add a partition
+      List<List<String>> partVals = new ArrayList<>();
+      partVals.add(new ArrayList<>(Arrays.asList("testPartVal")));
+      addPartitions(dbName, tblName, partVals);
+    }
+    eventsProcessor_.processEvents();
+
+    // To simulate an insert, load a file into the partition location.
+    String parentPathString =
+        "/test-warehouse/" + dbName +".db/" + tblName;
+    String filePathString = isPartitionInsert ? "/p1=testPartVal/testFile.0" :
+        "/testFile.0";
+    Path parentPath =
+        FileSystemUtil.createFullyQualifiedPath(new Path(parentPathString));
+    FileSystem fs = null;
+    FSDataOutputStream out = null;
+    List<String> newFiles = new ArrayList<>(1);
+    try {
+      fs = parentPath.getFileSystem(FileSystemUtil.getConfiguration());
+      // put a test file into the location.
+      out = fs.create(new Path(parentPathString + filePathString));
+      newFiles.add(filePathString);
+      org.apache.hadoop.hive.metastore.api.Partition partition = null ;
+      if (isPartitionInsert) {
+        // Get the partition from metastore. This should now contain the new file.
+        GetPartitionsRequest request = new GetPartitionsRequest();
+        request.setDbName(dbName);
+        try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+          partition = metaStoreClient.getHiveClient().getPartition(dbName,
+              tblName, "p1=testPartVal");
+        }
+      }
+      // Simulate a load table
+      Table tbl = catalog_.getOrLoadTable(dbName, tblName);
+      NotificationEvent fakeInsertEvent = createFakeInsertEvent(tbl.getMetaStoreTable(),
+          partition, false, newFiles);
+      InsertEvent insertEvent = new InsertEvent(catalog_,
+          eventsProcessor_.getMetrics(), fakeInsertEvent);
+
+      // Process this event, this should refresh the table.
+      insertEvent.process();
+
+      // Now check if the table is refreshed by checking the files size. A partition
+      // refresh will make the new file show up in the partition. NOTE: This is same
+      // for table and partition inserts as impala treats a non-partitioned table as a
+      // table with a single partition.
+      Table tblAfterInsert = catalog_.getTable(dbName, tblName);
+      Collection<? extends FeFsPartition> partsAfterInsert =
+          FeCatalogUtils.loadAllPartitions((HdfsTable) tblAfterInsert);
+      assertTrue("Partition not found after insert.",
+          partsAfterInsert.size() > 0);
+      FeFsPartition singlePart =
+          Iterables.getOnlyElement((List<FeFsPartition>) partsAfterInsert);
+      Set<String> filesAfterInsertForTable =
+          (((HdfsPartition) singlePart).getFileNames());
+      assertTrue("File count mismatch after insert.",
+          filesAfterInsertForTable.size() == 1);
+
+      // Create another event for overwrite
+      NotificationEvent fakeInsertOverwriteEvent =
+          createFakeInsertEvent(tbl.getMetaStoreTable(),
+              partition, true, newFiles);
+      InsertEvent insertOverwriteEvent = new InsertEvent(catalog_,
+          eventsProcessor_.getMetrics(), fakeInsertOverwriteEvent);
+      insertOverwriteEvent.process();
+
+      // Overwrite is expected to behave similarly.
+      Table tblAfterInsertOverwrite = catalog_.getTable(dbName, tblName);
+      Collection<? extends FeFsPartition> partsAfterInsertOverwrite =
+          FeCatalogUtils.loadAllPartitions((HdfsTable) tblAfterInsertOverwrite);
+      assertTrue("Partition not found after insert.",
+          partsAfterInsertOverwrite.size() > 0);
+      FeFsPartition singlePartAfterInsertOverwrite =
+          Iterables.getOnlyElement((List<FeFsPartition>) partsAfterInsertOverwrite);
+      Set<String> filesAfterInsertOverwriteForTable =
+          (((HdfsPartition) singlePartAfterInsertOverwrite).getFileNames());
+      assertTrue("File count mismatch after insert.",
+          filesAfterInsertOverwriteForTable.size() == 1);
+    } catch (IOException e) {
+      throw new MetastoreNotificationException(e);
+    } finally {
+      if (out != null) {
+        try {
+          out.close();
+        } catch (IOException e) {
+          throw new MetastoreNotificationException(e);
+        } finally {
+          FileSystemUtil.deleteIfExists(parentPath);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper to create a fake insert notification event.
+   */
+  private NotificationEvent createFakeInsertEvent(
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      org.apache.hadoop.hive.metastore.api.Partition partition,
+      boolean isInsertOverwrite, List<String> newFiles) {
+    NotificationEvent fakeEvent = new NotificationEvent();
+    fakeEvent.setTableName(msTbl.getTableName());
+    fakeEvent.setDbName(msTbl.getDbName());
+    fakeEvent.setEventId(eventIdGenerator.incrementAndGet());
+    fakeEvent.setMessage(MetastoreEventsProcessor.getMessageFactory()
+        .buildInsertMessage(msTbl, partition, isInsertOverwrite, newFiles).toString());
+    fakeEvent.setEventType("INSERT");
+    return fakeEvent;
+
+  }
+
+  /**
    * Test generates ALTER_TABLE events for various cases (table rename, parameter change,
    * add/remove/change column) and makes sure that the table is updated on the CatalogD
    * side after the ALTER_TABLE event is processed.
diff --git a/fe/src/test/resources/hive-site.xml.py b/fe/src/test/resources/hive-site.xml.py
index 9c0ca7a..18e0011 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -97,6 +97,7 @@ CONFIG.update({
  'hive.metastore.notifications.add.thrift.objects': 'true',
  'hive.metastore.transactional.event.listeners': 'org.apache.hive.hcatalog.listener.DbNotificationListener',
  'hcatalog.message.factory.impl.json': 'org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory',
+ 'hive.metastore.dml.events': 'true',
 })
 
 # Database and JDO-related configs:
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
new file mode 100644
index 0000000..c936807
--- /dev/null
+++ b/tests/custom_cluster/test_event_processing.py
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import json
+import time
+from datetime import datetime
+from tests.common.environ import build_flavor_timeout
+import requests
+from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.util.hive_utils import HiveDbWrapper
+
+
+@SkipIfS3.hive
+@SkipIfABFS.hive
+@SkipIfADLS.hive
+@SkipIfIsilon.hive
+@SkipIfLocal.hive
+class TestEventProcessing(CustomClusterTestSuite):
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=2"
+  )
+  def test_insert_events(self):
+    """
+    Test for insert event processing. Events are created in Hive and processed in Impala.
+    The following cases are tested :
+    Insert into table --> for partitioned and non-partitioned table
+    Insert overwrite table --> for partitioned and non-partitioned table
+    Insert into partition --> for partitioned table
+    """
+    db_name = 'test_db'
+    with HiveDbWrapper(self, db_name):
+     # Test table with no partitions.
+     TBL_INSERT_NOPART = 'tbl_insert_nopart'
+     last_synced_event_id = self.get_last_synced_event_id()
+     self.run_stmt_in_hive("create table %s.%s (id int, val int)"
+         % (db_name, TBL_INSERT_NOPART))
+     # Test insert into table, this will fire an insert event.
+     self.run_stmt_in_hive("insert into %s.%s values(101, 200)"
+         % (db_name, TBL_INSERT_NOPART))
+     # With MetastoreEventProcessor running, the insert event will be processsed. Query
+     # the table from Impala
+     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+     data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
+     assert data.split('\t') == ['101', '200']
+     # Test insert overwrite. Overwite the existing value.
+     last_synced_event_id = self.get_last_synced_event_id()
+     self.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
+         % (db_name, TBL_INSERT_NOPART))
+     # Make sure the event has been processed.
+     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+     # Query table from Impala
+     data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
+     assert data.split('\t') == ['101', '201']
+
+     # Test partitioned table.
+     last_synced_event_id = self.get_last_synced_event_id()
+     TBL_INSERT_PART = 'tbl_insert_part'
+     self.run_stmt_in_hive("create table %s.%s (id int, name string) "
+         "partitioned by(day int, month int, year int)" % (db_name, TBL_INSERT_PART))
+     # Insert data into partitions
+     self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
+         "values(101, 'x')" % (db_name, TBL_INSERT_PART))
+     #  Make sure the event is  processed.
+     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+     # Test if the data is present in Impala
+     data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_PART))
+     assert data.split('\t') == ['101', 'x', '28', '3', '2019']
+
+     # Test inserting into existing partitions.
+     last_synced_event_id = self.get_last_synced_event_id()
+     self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
+         "values(102, 'y')" % (db_name, TBL_INSERT_PART))
+     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+     data = self.execute_scalar("select count(*) from %s.%s where day=28 and month=3 "
+         "and year=2019" % (db_name, TBL_INSERT_PART))
+     assert data.split('\t') == ['2']
+
+     # Test insert overwrite into existing partitions
+     last_synced_event_id = self.get_last_synced_event_id()
+     self.run_stmt_in_hive("insert overwrite table %s.%s partition(day=28, month=03, "
+         "year=2019)" "values(101, 'z')" % (db_name, TBL_INSERT_PART))
+     assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+     data = self.execute_scalar("select * from %s.%s where day=28 and month=3 and"
+         " year=2019 and id=101" % (db_name, TBL_INSERT_PART))
+     assert data.split('\t') == ['101', 'z', '28', '3', '2019']
+
+  def wait_for_insert_event_processing(self, previous_event_id):
+    """ Wait till the event processor has finished processing insert events. This is
+    detected by scrapping the /events webpage for changes in last_synced_event_id.
+    Since two events are created for every insert done through hive, we wait till the
+    event id is incremented by at least two. Returns true if at least two events were
+    processed within 10 sec. False otherwise.
+    """
+    new_event_id = self.get_last_synced_event_id()
+    success = True
+    start_time = datetime.now()
+    while new_event_id - previous_event_id < 2:
+      new_event_id = self.get_last_synced_event_id()
+      # Prevent infinite loop
+      time_delta = (datetime.now() - start_time).total_seconds()
+      if time_delta > 10:
+        success = False
+        break
+    # Wait for catalog update to be propagated.
+    time.sleep(build_flavor_timeout(2, slow_build_timeout=4))
+    return success
+
+  def get_last_synced_event_id(self):
+    """
+    Scrape the /events webpage and return the last_synced_event_id.
+    """
+    response = requests.get("http://localhost:25020/events?json")
+    assert response.status_code == requests.codes.ok
+    varz_json = json.loads(response.text)
+    metrics = varz_json["event_processor_metrics"].strip().split('\n')
+    kv_map = {}
+    for kv in metrics:
+      if len(kv) > 0:
+        pair = kv.split(':')
+        kv_map[pair[0].strip()] = pair[1].strip()
+
+    last_synced_event_id = int(kv_map['last-synced-event-id'])
+    return last_synced_event_id


[impala] 02/02: IMPALA-8467: ParquetPlainEncoder::Decode leads to multiple test failures in ASAN builds

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1090b3b2e4df1caa52dfcb8dfeb9484ee939f5d0
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Mon Apr 29 17:42:19 2019 +0200

    IMPALA-8467: ParquetPlainEncoder::Decode leads to multiple test failures
    in ASAN builds
    
    Fixed the buffer overflow failure.
    
    Testing:
    All BE tests passed with ASAN.
    
    Change-Id: I2ac77f5f135f063bf6b8f7406d5b41535190d5a2
    Reviewed-on: http://gerrit.cloudera.org:8080/13176
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/parquet-common.h | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/be/src/exec/parquet/parquet-common.h b/be/src/exec/parquet/parquet-common.h
index a7184ba..5a90a04 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -319,7 +319,6 @@ inline int DecodeWithConversion(const uint8_t* buffer, const uint8_t* buffer_end
 /// double         | DOUBLE
 /// Decimal4Value  | INT32
 /// Decimal8Value  | INT64
-/// TimestampValue | INT96
 template <typename InternalType, parquet::Type::type PARQUET_TYPE>
 void ParquetPlainEncoder::DecodeNoBoundsCheck(const uint8_t* buffer,
     const uint8_t* buffer_end, int fixed_len_size, InternalType* v) {
@@ -333,6 +332,19 @@ void ParquetPlainEncoder::DecodeNoBoundsCheck(const uint8_t* buffer,
 }
 
 template <>
+inline void ParquetPlainEncoder::
+DecodeNoBoundsCheck<TimestampValue, parquet::Type::INT96>(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, TimestampValue* v) {
+  int byte_size = EncodedByteSize(parquet::Type::INT96, -1);
+  DCHECK_GE(buffer_end - buffer, byte_size);
+
+  /// We copy only 12 bytes from the input buffer but the destination is 16 bytes long
+  /// because of padding. The most significant 4 bits remain uninitialized as they are not
+  /// used.
+  memcpy(v, buffer, byte_size);
+}
+
+template <>
 inline void ParquetPlainEncoder::DecodeNoBoundsCheck<int64_t, parquet::Type::INT32>(
     const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int64_t* v) {
   DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v);