You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/10/04 12:52:23 UTC

[impala] 02/02: IMPALA-8592: Add support for insert events for 'LOAD DATA' statements from Impala

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cf9c443ddca7403d2a92b2a01a374749a72e5f8f
Author: Yu-Wen Lai <yu...@cloudera.com>
AuthorDate: Wed Sep 21 22:35:33 2022 -0700

    IMPALA-8592: Add support for insert events for 'LOAD DATA' statements
    from Impala
    
    In this patch, we use TUpdateCatalogRequest to refresh metadata after
    'LOAD DATA' instead of TResetMetadataRequest so that we can reuse the
    code for 'INSERT' statements. It will fire an insert event just same
    as what we did for 'INSERT' statements.
    
    We also fix the inconsistent indentation in event_processor_utils.py.
    
    Testing:
    - Run existing test_load.py
    - Added test_load_data_from_impala() in test_event_processing.py
    
    Change-Id: I7f1b470f40e0aaf891c9f3f327af393b2f9c74bc
    Reviewed-on: http://gerrit.cloudera.org:8080/19052
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
---
 be/src/service/client-request-state.cc             |  67 +++++++------
 be/src/service/client-request-state.h              |   3 +
 common/thrift/Frontend.thrift                      |   6 ++
 .../org/apache/impala/common/FileSystemUtil.java   |  12 +++
 .../java/org/apache/impala/service/Frontend.java   |  24 +++--
 tests/metadata/test_event_processing.py            |  56 +++++++++++
 tests/util/event_processor_utils.py                | 110 ++++++++++++---------
 7 files changed, 194 insertions(+), 84 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 00d6a1908..1c8fa9e6e 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -797,34 +797,40 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
   request_result_set_.reset(new vector<TResultRow>);
   request_result_set_->push_back(response.load_summary);
 
-  // Now refresh the table metadata.
-  TCatalogOpRequest reset_req;
-  reset_req.__set_sync_ddl(exec_request_->query_options.sync_ddl);
-  reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
-  reset_req.__set_reset_metadata_params(TResetMetadataRequest());
-  reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
-  reset_req.reset_metadata_params.header.__set_want_minimal_response(
-      FLAGS_use_local_catalog);
-  reset_req.reset_metadata_params.__set_is_refresh(true);
-  reset_req.reset_metadata_params.__set_table_name(
-      exec_request_->load_data_request.table_name);
-  if (exec_request_->load_data_request.__isset.partition_spec) {
-    reset_req.reset_metadata_params.__set_partition_spec(
-        exec_request_->load_data_request.partition_spec);
-  }
-  reset_req.reset_metadata_params.__set_sync_ddl(
-      exec_request_->query_options.sync_ddl);
-  catalog_op_executor_.reset(
-      new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
+  // We use TUpdateCatalogRequest to refresh the table metadata so that it will
+  // fire an insert event just like an insert statement.
+  TUpdatedPartition updatedPartition;
+  updatedPartition.files.insert(updatedPartition.files.end(),
+    response.loaded_files.begin(), response.loaded_files.end());
+  TUpdateCatalogRequest catalog_update;
+  // The partition_name is an empty string for unpartitioned tables.
+  catalog_update.updated_partitions[response.partition_name] = updatedPartition;
+
+  catalog_update.__set_sync_ddl(exec_request_->query_options.sync_ddl);
+  catalog_update.__set_header(GetCatalogServiceRequestHeader());
+  catalog_update.target_table = exec_request_->load_data_request.table_name.table_name;
+  catalog_update.db_name = exec_request_->load_data_request.table_name.db_name;
+  catalog_update.is_overwrite = exec_request_->load_data_request.overwrite;
+
+  const TNetworkAddress& address =
+      MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
+  CatalogServiceConnection client(
+      ExecEnv::GetInstance()->catalogd_client_cache(), address, &status);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+  }
 
-  status = catalog_op_executor_->Exec(reset_req);
+  TUpdateCatalogResponse resp;
+  status = client.DoRpc(
+      &CatalogServiceClientWrapper::UpdateCatalog, catalog_update, &resp);
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
   }
 
   status = parent_server_->ProcessCatalogUpdateResult(
-      *catalog_op_executor_->update_catalog_result(),
+      resp.result,
       exec_request_->query_options.sync_ddl);
   {
     lock_guard<mutex> l(lock_);
@@ -1429,13 +1435,7 @@ Status ClientRequestState::UpdateCatalog() {
     const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
     TUpdateCatalogRequest catalog_update;
     catalog_update.__set_sync_ddl(exec_request_->query_options.sync_ddl);
-    catalog_update.__set_header(TCatalogServiceRequestHeader());
-    catalog_update.header.__set_requesting_user(effective_user());
-    catalog_update.header.__set_client_ip(session()->network_address.hostname);
-    catalog_update.header.__set_want_minimal_response(FLAGS_use_local_catalog);
-    catalog_update.header.__set_redacted_sql_stmt(
-        query_ctx_.client_request.__isset.redacted_stmt ?
-            query_ctx_.client_request.redacted_stmt : query_ctx_.client_request.stmt);
+    catalog_update.__set_header(GetCatalogServiceRequestHeader());
     DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state();
     if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update)) {
       VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
@@ -2042,4 +2042,15 @@ string ClientRequestState::RetryStateToString(RetryState state) {
           {ClientRequestState::RetryState::RETRIED, "RETRIED"}};
   return retry_state_strings.at(state);
 }
+
+TCatalogServiceRequestHeader ClientRequestState::GetCatalogServiceRequestHeader() {
+  TCatalogServiceRequestHeader header = TCatalogServiceRequestHeader();
+  header.__set_requesting_user(effective_user());
+  header.__set_client_ip(session()->network_address.hostname);
+  header.__set_want_minimal_response(FLAGS_use_local_catalog);
+  header.__set_redacted_sql_stmt(
+      query_ctx_.client_request.__isset.redacted_stmt ?
+          query_ctx_.client_request.redacted_stmt : query_ctx_.client_request.stmt);
+  return header;
+}
 }
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 29aa9848f..69430df16 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -809,6 +809,9 @@ class ClientRequestState {
   /// Logs audit and column lineage events. Expects that Wait() has already finished.
   /// Grabs lock_ for polling the query_status(). Hence do not call it under lock_.
   void LogQueryEvents();
+
+  /// Helper function to get common header
+  TCatalogServiceRequestHeader GetCatalogServiceRequestHeader();
 };
 
 }
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 371eec1c5..23ffd69ba 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -379,6 +379,12 @@ struct TLoadDataResp {
   // A result row that contains information on the result of the LOAD operation. This
   // includes details like the number of files moved as part of the request.
   1: required Data.TResultRow load_summary
+
+  // The loaded file paths
+  2: required list<string> loaded_files
+
+  // This is needed to issue TUpdateCatalogRequest
+  3: string partition_name = ""
 }
 
 enum TCatalogOpType {
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 1c2b75175..899cc71c0 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -198,6 +198,15 @@ public class FileSystemUtil {
    */
   public static int relocateAllVisibleFiles(Path sourceDir, Path destDir)
       throws IOException {
+    return relocateAllVisibleFiles(sourceDir, destDir, null);
+  }
+
+  /**
+   * This method does the same as #relocateAllVisibleFiles(Path, Path) but it also adds
+   * loaded files into #loadedFiles.
+   */
+  public static int relocateAllVisibleFiles(Path sourceDir, Path destDir,
+        List<Path> loadedFiles) throws IOException {
     FileSystem destFs = destDir.getFileSystem(CONF);
     FileSystem sourceFs = sourceDir.getFileSystem(CONF);
     Preconditions.checkState(destFs.isDirectory(destDir));
@@ -226,6 +235,9 @@ public class FileSystemUtil {
             appendToBaseFileName(destFile.getName(), uuid.toString()));
       }
       FileSystemUtil.relocateFile(fStatus.getPath(), destFile, false);
+      if (loadedFiles != null) {
+        loadedFiles.add(destFile);
+      }
       ++numFilesMoved;
     }
     return numFilesMoved;
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 79fdf0aac..eeb5b59ea 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -97,6 +97,7 @@ import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeDataSource;
 import org.apache.impala.catalog.FeDataSourceTable;
 import org.apache.impala.catalog.FeDb;
+import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeIcebergTable;
@@ -880,10 +881,13 @@ public class Frontend {
     // Get the destination for the load. If the load is targeting a partition,
     // this the partition location. Otherwise this is the table location.
     String destPathString = null;
+    String partitionName = null;
     FeCatalog catalog = getCatalog();
     if (request.isSetPartition_spec()) {
-      destPathString = catalog.getHdfsPartition(tableName.getDb(),
-          tableName.getTbl(), request.getPartition_spec()).getLocation();
+      FeFsPartition partition = catalog.getHdfsPartition(tableName.getDb(),
+          tableName.getTbl(), request.getPartition_spec());
+      destPathString = partition.getLocation();
+      partitionName = partition.getPartitionName();
     } else {
       destPathString = catalog.getTable(tableName.getDb(), tableName.getTbl())
           .getMetaStoreTable().getSd().getLocation();
@@ -898,12 +902,12 @@ public class Frontend {
     // file move.
     Path tmpDestPath = FileSystemUtil.makeTmpSubdirectory(destPath);
 
-    int filesLoaded = 0;
+    int numFilesLoaded = 0;
     if (sourceFs.isDirectory(sourcePath)) {
-      filesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath, tmpDestPath);
+      numFilesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath, tmpDestPath);
     } else {
       FileSystemUtil.relocateFile(sourcePath, tmpDestPath, true);
-      filesLoaded = 1;
+      numFilesLoaded = 1;
     }
 
     // If this is an OVERWRITE, delete all files in the destination.
@@ -911,17 +915,23 @@ public class Frontend {
       FileSystemUtil.deleteAllVisibleFiles(destPath);
     }
 
+    List<Path> filesLoaded = new ArrayList<>();
     // Move the files from the temporary location to the final destination.
-    FileSystemUtil.relocateAllVisibleFiles(tmpDestPath, destPath);
+    FileSystemUtil.relocateAllVisibleFiles(tmpDestPath, destPath, filesLoaded);
     // Cleanup the tmp directory.
     destFs.delete(tmpDestPath, true);
     TLoadDataResp response = new TLoadDataResp();
     TColumnValue col = new TColumnValue();
     String loadMsg = String.format(
         "Loaded %d file(s). Total files in destination location: %d",
-        filesLoaded, FileSystemUtil.getTotalNumVisibleFiles(destPath));
+        numFilesLoaded, FileSystemUtil.getTotalNumVisibleFiles(destPath));
     col.setString_val(loadMsg);
     response.setLoad_summary(new TResultRow(Lists.newArrayList(col)));
+    response.setLoaded_files(filesLoaded.stream().map(Path::toString)
+        .collect(Collectors.toList()));
+    if (partitionName != null && !partitionName.isEmpty()) {
+      response.setPartition_name(partitionName);
+    }
     return response;
   }
 
diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py
index dfe98ea3d..b8d5553ca 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -14,6 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from subprocess import check_call
+import pytest
+
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2
 from tests.util.event_processor_utils import EventProcessorUtils
@@ -375,6 +378,59 @@ class TestEventProcessing(ImpalaTestSuite):
     assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
+  @pytest.mark.execute_serially
+  def test_load_data_from_impala(self, unique_database):
+    tbl_nopart = "tbl_nopart"
+    tbl_part = "tbl_part"
+    staging_dir = "/tmp/{0}".format(unique_database)
+    check_call(["hdfs", "dfs", "-mkdir", staging_dir])
+    try:
+      self.execute_query(
+        "drop table if exists {0}.{1} purge".format(unique_database, tbl_nopart))
+      self.execute_query(
+        "drop table if exists {0}.{1} purge".format(unique_database, tbl_part))
+
+      self.execute_query(
+        "create table {0}.{1} like functional_parquet.tinytable stored as parquet"
+        .format(unique_database, tbl_nopart))
+      self.execute_query(
+        "create table {0}.{1} like functional_parquet.alltypessmall stored as \
+        parquet".format(unique_database, tbl_part))
+      EventProcessorUtils.wait_for_event_processing(self)
+
+      check_call([
+        "hdfs", "dfs", "-cp", "/test-warehouse/tinytable_parquet", staging_dir])
+      last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
+      self.execute_query("load data inpath '{0}/tinytable_parquet' \
+        into table {1}.{2}".format(staging_dir, unique_database, tbl_nopart))
+      # Check if there is an insert event fired after load data statement.
+      events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
+      assert len(events) == 1
+      last_event = events[0]
+      assert last_event.dbName == unique_database
+      assert last_event.tableName == tbl_nopart
+      assert last_event.eventType == "INSERT"
+
+      check_call(["hdfs", "dfs", "-cp", "/test-warehouse/alltypessmall_parquet",
+        staging_dir])
+      self.execute_query(
+        "alter table {0}.{1} add partition (year=2009,month=1)".format(
+          unique_database, tbl_part))
+      last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
+      self.execute_query(
+        "load data inpath '{0}/alltypessmall_parquet/year=2009/month=1' \
+        into table {1}.{2} partition (year=2009,month=1)".format(
+          staging_dir, unique_database, tbl_part))
+      # Check if there is an insert event fired after load data statement.
+      events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
+      assert len(events) == 1
+      last_event = events[0]
+      assert last_event.dbName == unique_database
+      assert last_event.tableName == tbl_part
+      assert last_event.eventType == "INSERT"
+    finally:
+      check_call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", staging_dir])
+
   def __get_transactional_tblproperties(self, is_transactional):
     """
     Util method to generate the tblproperties for transactional tables
diff --git a/tests/util/event_processor_utils.py b/tests/util/event_processor_utils.py
index 81a44c499..261440cdc 100644
--- a/tests/util/event_processor_utils.py
+++ b/tests/util/event_processor_utils.py
@@ -23,68 +23,72 @@ import logging
 import requests
 import time
 import json
+from hive_metastore.ttypes import NotificationEventRequest
+
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
 LOG = logging.getLogger('event_processor_utils')
 LOG.setLevel(level=logging.DEBUG)
 
+
 class EventProcessorUtils(object):
 
   DEFAULT_CATALOG_URL = "http://localhost:25020"
 
   @staticmethod
   def wait_for_event_processing(test_suite, timeout=10):
-      """Waits till the event processor has synced to the latest event id from metastore
-         or the timeout value in seconds whichever is earlier"""
-      if EventProcessorUtils.get_event_processor_status() == "DISABLED":
-        return
-      success = False
-      assert timeout > 0
-      assert test_suite.hive_client is not None
-      current_event_id = EventProcessorUtils.get_current_notification_id(
-        test_suite.hive_client)
-      LOG.info("Waiting until events processor syncs to event id:" + str(current_event_id))
-      end_time = time.time() + timeout
-      while time.time() < end_time:
-        last_synced_id = EventProcessorUtils.get_last_synced_event_id()
-        if last_synced_id >= current_event_id:
-          LOG.debug(
-            "Metric last-synced-event-id has reached the desired value:" + str(
-              last_synced_id))
-          success = True
-          break
-        time.sleep(0.1)
-      if not success:
-        raise Exception(
-          "Event processor did not sync till last known event id {0} \
-          within {1} seconds".format(current_event_id, timeout))
-      if isinstance(test_suite, CustomClusterTestSuite):
-        impala_cluster = test_suite.cluster
-      else:
-        impala_cluster = ImpalaCluster.get_e2e_test_cluster()
-      # Wait until the impalad catalog versions agree with the catalogd's version.
-      catalogd_version = impala_cluster.catalogd.service.get_catalog_version()
-      for impalad in impala_cluster.impalads:
-        impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version,
-          allow_greater=True)
-      return success
+    """Waits till the event processor has synced to the latest event id from metastore
+    or the timeout value in seconds whichever is earlier"""
+    if EventProcessorUtils.get_event_processor_status() == "DISABLED":
+      return
+    success = False
+    assert timeout > 0
+    assert test_suite.hive_client is not None
+    current_event_id = EventProcessorUtils.get_current_notification_id(
+      test_suite.hive_client)
+    LOG.info("Waiting until events processor syncs to event id:" + str(
+      current_event_id))
+    end_time = time.time() + timeout
+    while time.time() < end_time:
+      last_synced_id = EventProcessorUtils.get_last_synced_event_id()
+      if last_synced_id >= current_event_id:
+        LOG.debug(
+          "Metric last-synced-event-id has reached the desired value:" + str(
+            last_synced_id))
+        success = True
+        break
+      time.sleep(0.1)
+    if not success:
+      raise Exception(
+        "Event processor did not sync till last known event id {0} \
+        within {1} seconds".format(current_event_id, timeout))
+    if isinstance(test_suite, CustomClusterTestSuite):
+      impala_cluster = test_suite.cluster
+    else:
+      impala_cluster = ImpalaCluster.get_e2e_test_cluster()
+    # Wait until the impalad catalog versions agree with the catalogd's version.
+    catalogd_version = impala_cluster.catalogd.service.get_catalog_version()
+    for impalad in impala_cluster.impalads:
+      impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version,
+        allow_greater=True)
+    return success
 
   @staticmethod
   def get_event_processor_metrics():
-     """Scrapes the catalog's /events webpage and return a dictionary with the event
-     processor metrics."""
-     response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL)
-     assert response.status_code == requests.codes.ok
-     varz_json = json.loads(response.text)
-     metrics = varz_json["event_processor_metrics"].strip().splitlines()
+    """Scrapes the catalog's /events webpage and return a dictionary with the event
+    processor metrics."""
+    response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL)
+    assert response.status_code == requests.codes.ok
+    varz_json = json.loads(response.text)
+    metrics = varz_json["event_processor_metrics"].strip().splitlines()
 
-     # Helper to strip a pair of elements
-     def strip_pair(p):
-       return (p[0].strip(), p[1].strip())
+    # Helper to strip a pair of elements
+    def strip_pair(p):
+      return (p[0].strip(), p[1].strip())
 
-     pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
-     return dict(pairs)
+    pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
+    return dict(pairs)
 
   @staticmethod
   def get_int_metric(metric_key, default_val=None):
@@ -104,10 +108,10 @@ class EventProcessorUtils(object):
 
   @staticmethod
   def get_num_skipped_events():
-      """Returns number of skipped events from metrics"""
-      metrics = EventProcessorUtils.get_event_processor_metrics()
-      assert "events-skipped" in metrics.keys()
-      return int(metrics['events-skipped'])
+    """Returns number of skipped events from metrics"""
+    metrics = EventProcessorUtils.get_event_processor_metrics()
+    assert "events-skipped" in metrics.keys()
+    return int(metrics['events-skipped'])
 
   @staticmethod
   def get_event_processor_status():
@@ -123,3 +127,11 @@ class EventProcessorUtils(object):
     """Returns the current notification from metastore"""
     assert hive_client is not None
     return int(hive_client.get_current_notificationEventId().eventId)
+
+  @staticmethod
+  def get_next_notification(hive_client, last_event_id):
+    """Returns notification events from metastore"""
+    assert hive_client is not None
+    assert last_event_id > 0
+    notification_event_request = NotificationEventRequest(lastEvent=last_event_id)
+    return hive_client.get_next_notification(notification_event_request).events