You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/01/04 01:39:27 UTC

[impala] 01/02: IMPALA-8984: Fix race condition in creating Kudu table

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 229317af44d11bda20c8fd9de7f91abb3e2920db
Author: skyyws <sk...@163.com>
AuthorDate: Mon Sep 30 10:18:32 2019 +0800

    IMPALA-8984: Fix race condition in creating Kudu table
    
    This patch fixes the race condition when using 'CREATE IF NOT EXISTS'
    to create the same managed kudu table in parallel. Note that it won't
    happend if Kudu-HMS integration is enable. The bug would cause the
    table being deleted in Kudu but reserving in HMS.
    
    The solution is adding check for HMS table existence before creating
    it in HMS and after obtaining 'metastoreDdlLock_'. If the HMS table is
    created by other concurrent threads, just return as
    'Table already exists'. So we don't fail in creating the HMS table and
    won't rollback the creation of kudu table.
    
    Tests:
      * Add custom cluster test test_concurrent_kudu_create.py
      * Ran all front-end tests
    
    Change-Id: I1a4047bcdaa6b346765b96e8c36bb747a2b0091d
    Reviewed-on: http://gerrit.cloudera.org:8080/14319
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 10 +++-
 .../impala/service/KuduCatalogOpExecutor.java      | 47 ++++++++-------
 .../custom_cluster/test_concurrent_kudu_create.py  | 66 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 22 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f177e65..14b67df 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2245,7 +2245,15 @@ public class CatalogOpExecutor {
       synchronized (metastoreDdlLock_) {
         if (createHMSTable) {
           try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-            msClient.getHiveClient().createTable(newTable);
+            boolean tableInMetastore =
+                msClient.getHiveClient().tableExists(newTable.getDbName(),
+                                                     newTable.getTableName());
+            if (!tableInMetastore) {
+              msClient.getHiveClient().createTable(newTable);
+            } else {
+              addSummary(response, "Table already exists.");
+              return false;
+            }
           }
         }
         // Add the table to the catalog cache
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index eef1e5f..0508334 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -66,6 +66,8 @@ import com.google.common.collect.Sets;
 public class KuduCatalogOpExecutor {
   public static final Logger LOG = Logger.getLogger(KuduCatalogOpExecutor.class);
 
+  private static final Object kuduDdlLock_ = new Object();
+
   /**
    * Create a table in Kudu with a schema equivalent to the schema stored in 'msTbl'.
    * Throws an exception if 'msTbl' represents an external table or if the table couldn't
@@ -84,28 +86,31 @@ public class KuduCatalogOpExecutor {
     }
     KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
     try {
-      // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
-      // (see KUDU-1710).
-      boolean tableExists = kudu.tableExists(kuduTableName);
-      if (tableExists && params.if_not_exists) return;
+      // Acquire lock to protect table existence check and table creation, see IMPALA-8984
+      synchronized (kuduDdlLock_) {
+        // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
+        // (see KUDU-1710).
+        boolean tableExists = kudu.tableExists(kuduTableName);
+        if (tableExists && params.if_not_exists) return;
 
-      // if table is managed or external with external.purge.table = true in
-      // tblproperties we should create the Kudu table if it does not exist
-      if (tableExists) {
-        throw new ImpalaRuntimeException(String.format(
-            "Table '%s' already exists in Kudu.", kuduTableName));
-      }
-      Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
-      Schema schema = createTableSchema(params);
-      CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
-      org.apache.kudu.client.KuduTable table =
-          kudu.createTable(kuduTableName, schema, tableOpts);
-      // Populate table ID from Kudu table if Kudu's integration with the Hive
-      // Metastore is enabled.
-      if (KuduTable.isHMSIntegrationEnabled(masterHosts)) {
-        String tableId = table.getTableId();
-        Preconditions.checkNotNull(tableId);
-        msTbl.getParameters().put(KuduTable.KEY_TABLE_ID, tableId);
+        // if table is managed or external with external.purge.table = true in
+        // tblproperties we should create the Kudu table if it does not exist
+        if (tableExists) {
+          throw new ImpalaRuntimeException(String.format(
+              "Table '%s' already exists in Kudu.", kuduTableName));
+        }
+        Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
+        Schema schema = createTableSchema(params);
+        CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
+        org.apache.kudu.client.KuduTable table =
+            kudu.createTable(kuduTableName, schema, tableOpts);
+        // Populate table ID from Kudu table if Kudu's integration with the Hive
+        // Metastore is enabled.
+        if (KuduTable.isHMSIntegrationEnabled(masterHosts)) {
+          String tableId = table.getTableId();
+          Preconditions.checkNotNull(tableId);
+          msTbl.getParameters().put(KuduTable.KEY_TABLE_ID, tableId);
+        }
       }
     } catch (Exception e) {
       throw new ImpalaRuntimeException(String.format("Error creating Kudu table '%s'",
diff --git a/tests/custom_cluster/test_concurrent_kudu_create.py b/tests/custom_cluster/test_concurrent_kudu_create.py
new file mode 100644
index 0000000..900fffa
--- /dev/null
+++ b/tests/custom_cluster/test_concurrent_kudu_create.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import threading
+import time
+
+from multiprocessing.pool import ThreadPool
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+TBL_NAME = "test_concurrent_kudu_create"
+
+
+class TestConcurrentKuduCreate(CustomClusterTestSuite):
+  """Test concurrent create kudu managed table"""
+
+  @pytest.mark.execute_serially
+  def test_concurrent_create_kudu_table(self, unique_database):
+    table_name = unique_database + "." + TBL_NAME
+    test_self = self
+
+    class ThreadLocalClient(threading.local):
+      def __init__(self):
+        self.client = test_self.create_impala_client()
+
+    tls = ThreadLocalClient()
+
+    def run_create_table_if_not_exists():
+      self.execute_query_expect_success(
+        tls.client, "create table if not exists %s "
+                    "(id int, primary key(id)) stored as kudu" % table_name)
+      tls.client.close()
+
+    # Drop table before run test if exists
+    self.execute_query("drop table if exists %s" % table_name)
+    NUM_ITERS = 20
+    for i in xrange(NUM_ITERS):
+      # Run several commands by specific time interval to reproduce this bug
+      pool = ThreadPool(processes=3)
+      r1 = pool.apply_async(run_create_table_if_not_exists)
+      r2 = pool.apply_async(run_create_table_if_not_exists)
+      # Sleep to make race conflict happens in different places
+      time.sleep(1)
+      r3 = pool.apply_async(run_create_table_if_not_exists)
+      r1.get()
+      r2.get()
+      r3.get()
+      pool.terminate()
+      # If hit IMPALA-8984, this query would be failed due to table been deleted in kudu
+      self.execute_query_expect_success(tls.client, "select * from %s" % table_name)
+      self.execute_query("drop table if exists %s" % table_name)