You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/08 21:48:25 UTC

[impala] branch master updated: IMPALA-8637: Implement transaction handling and locking for ACID queries

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ca8ca3  IMPALA-8637: Implement transaction handling and locking for ACID queries
8ca8ca3 is described below

commit 8ca8ca3aad504d30521546ed0ef49e3a8c1b4038
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Jul 31 19:31:01 2019 +0200

    IMPALA-8637: Implement transaction handling and locking for ACID queries
    
    Adds background thread to the Frontend that periodically heartbeats
    the opened transactions and locks. Most of the logic is implemented
    in the newly added TransactionKeepalive class.
    
    TransactionKeepalive keeps track of the creation time of the
    transactions and locks, and only heartbeat them if they are older
    then the sleep interval. This way we don't heartbeat short-running
    queries unnecessarily.
    
    Testing:
    Added an exhaustive test that checks heartbeating. It creates a long
    running transaction with sleep(). While the transaction is executing
    the test periodically checks whether there is an OPEN transaction that
    has sent a heartbeat to HMS. If it found one then the test succeeds.
    
    TODOs:
     * add metrics
    
    Change-Id: Iaa37899b24aa114be642bf8772b4e0f882865cfa
    Reviewed-on: http://gerrit.cloudera.org:8080/13968
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc             |   4 +
 be/src/service/frontend.cc                         |   5 +
 be/src/service/frontend.h                          |   4 +
 .../org/apache/impala/compat/MetastoreShim.java    |  18 ++
 .../org/apache/impala/compat/MetastoreShim.java    |  87 +++++++-
 .../java/org/apache/impala/service/Frontend.java   | 114 ++++++++--
 .../org/apache/impala/service/JniFrontend.java     |  10 +-
 .../impala/service/TransactionKeepalive.java       | 247 +++++++++++++++++++++
 tests/query_test/test_acid.py                      |  67 ++++++
 9 files changed, 537 insertions(+), 19 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 2ba7b9e..64f63a8 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1126,6 +1126,10 @@ Status ClientRequestState::UpdateCatalog() {
       }
       if (InTransaction()) {
         // UpdateCatalog() succeeded and already committed the transaction for us.
+        int64_t txn_id = GetTransactionId();
+        if (!frontend_->UnregisterTransaction(txn_id).ok()) {
+          LOG(ERROR) << Substitute("Failed to unregister transaction $0", txn_id);
+        }
         ClearTransactionState();
         query_events_->MarkEvent("Transaction committed");
       }
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index f9f8f9e..baf1089 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -111,6 +111,7 @@ Frontend::Frontend() {
     {"buildTestDescriptorTable", "([B)[B", &build_test_descriptor_table_id_},
     {"callQueryCompleteHooks", "([B)V", &call_query_complete_hooks_id_},
     {"abortTransaction", "(J)V", &abort_txn_},
+    {"unregisterTransaction", "(J)V", &unregister_txn_},
   };
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -270,6 +271,10 @@ Status Frontend::AbortTransaction(int64_t transaction_id) {
   return JniUtil::CallJniMethod(fe_, abort_txn_, transaction_id);
 }
 
+Status Frontend::UnregisterTransaction(int64_t transaction_id) {
+  return JniUtil::CallJniMethod(fe_, unregister_txn_, transaction_id);
+}
+
 bool Frontend::IsAuthorizationError(const Status& status) {
   return !status.ok() && status.GetDetail().find("AuthorizationException") == 0;
 }
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index 2483f50..d703406 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -170,6 +170,9 @@ class Frontend {
   /// Aborts transaction with the given transaction id.
   Status AbortTransaction(int64_t transaction_id);
 
+  /// Unregisters an already committed transaction.
+  Status UnregisterTransaction(int64_t transaction_id);
+
   /// Returns true if the error returned by the FE was due to an AuthorizationException.
   static bool IsAuthorizationError(const Status& status);
 
@@ -222,6 +225,7 @@ class Frontend {
   jmethodID show_create_function_id_; // JniFrontend.showCreateFunction
   jmethodID call_query_complete_hooks_id_; // JniFrontend.callQueryCompleteHooks
   jmethodID abort_txn_; // JniFrontend.abortTransaction()
+  jmethodID unregister_txn_; // JniFrontend.abortTransaction()
 
   // Only used for testing.
   jmethodID build_test_descriptor_table_id_; // JniFrontend.buildTestDescriptorTable()
diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index 2ea270a..93437e9 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -328,6 +329,23 @@ public class MetastoreShim {
   /**
    * Hive-3 only function
    */
+  public static boolean heartbeat(IMetaStoreClient client,
+      long txnId, long lockId) throws TransactionException {
+    throw new UnsupportedOperationException("heartbeat is not supported.");
+  }
+
+  /**
+   * Hive-3 only function
+   */
+  public static long acquireLock(IMetaStoreClient client, long txnId,
+      List<LockComponent> lockComponents, int lockRetries, int retryWaitSeconds)
+      throws TransactionException {
+    throw new UnsupportedOperationException("acquireLock is not supported.");
+  }
+
+  /**
+   * Hive-3 only function
+   */
   public static long allocateTableWriteId(IMetaStoreClient client, long txnId,
       String dbName, String tableName) throws TransactionException {
     throw new UnsupportedOperationException("allocateTableWriteId is not supported.");
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 371e0d9..954327a 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -26,9 +26,9 @@ import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
@@ -38,13 +38,21 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
@@ -68,8 +76,10 @@ import org.apache.impala.compat.HiveMetadataFormatUtils;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.Frontend;
 import org.apache.impala.service.MetadataOp;
+import org.apache.impala.service.TransactionKeepalive;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TResultSet;
+import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
@@ -79,6 +89,7 @@ import com.google.common.base.Preconditions;
  * between major versions of Hive. This implements the shimmed methods for Hive 3.
  */
 public class MetastoreShim {
+  public static final Logger LOG = Logger.getLogger(MetastoreShim.class);
   private static final String EXTWRITE = "EXTWRITE";
   private static final String EXTREAD = "EXTREAD";
   private static final String HIVEBUCKET2 = "HIVEBUCKET2";
@@ -494,6 +505,80 @@ public class MetastoreShim {
   }
 
   /**
+   * Heartbeats a transaction and/or lock to keep them alive.
+   * @param client is the HMS client to be used.
+   * @param txnId is the transaction id.
+   * @param lockId is the lock id.
+   * @return True on success, false if the transaction or lock is non-existent
+   * anymore.
+   * @throws In case of any other failures.
+   */
+  public static boolean heartbeat(IMetaStoreClient client,
+      long txnId, long lockId) throws TransactionException {
+    String errorMsg = "Caught exception during heartbeating transaction " +
+        String.valueOf(txnId) + " lock " + String.valueOf(lockId);
+    LOG.info("Sending heartbeat");
+    try {
+      client.heartbeat(txnId, lockId);
+    } catch (NoSuchLockException e) {
+      LOG.info(errorMsg, e);
+      return false;
+    } catch (NoSuchTxnException e) {
+      LOG.info(errorMsg, e);
+      return false;
+    } catch (TxnAbortedException e) {
+      LOG.info(errorMsg, e);
+      return false;
+    } catch (TException e) {
+      throw new TransactionException(e.getMessage());
+    }
+    return true;
+  }
+
+  /**
+   * Creates a lock for the given lock components. Returns the acquired lock, this
+   * might involve some waiting.
+   * @param client is the HMS client to be used.
+   * @param lockComponents the lock components to include in this lock.
+   * @param lockRetries the number of retries to acquire the lock.
+   * @param retryWaitSeconds wait interval between retries.
+   * @return the lock id
+   * @throws TransactionException in case of failure
+   */
+  public static long acquireLock(IMetaStoreClient client, long txnId,
+      List<LockComponent> lockComponents, int lockRetries, int retryWaitSeconds)
+          throws TransactionException {
+    LockRequestBuilder lockRequestBuilder = new LockRequestBuilder();
+    lockRequestBuilder.setUser("Impala");
+    lockRequestBuilder.setTransactionId(txnId);
+    for (LockComponent lockComponent : lockComponents) {
+      lockRequestBuilder.addLockComponent(lockComponent);
+    }
+    LockRequest lockRequest = lockRequestBuilder.build();
+    try {
+      LockResponse lockResponse = client.lock(lockRequest);
+      long lockId = lockResponse.getLockid();
+      int retries = 0;
+      while (lockResponse.getState() == LockState.WAITING && retries < lockRetries) {
+        try {
+          Thread.sleep(retryWaitSeconds * 1000);
+          ++retries;
+          lockResponse = client.checkLock(lockId);
+        } catch (InterruptedException e) {
+          // Since wait time and number of retries is configurable it wouldn't add
+          // much value to make acquireLock() interruptible so we just swallow the
+          // exception here.
+        }
+      }
+      if (lockResponse.getState() == LockState.ACQUIRED) return lockId;
+      throw new TransactionException("Failed to acquire lock for transaction " +
+          String.valueOf(txnId));
+    } catch (TException e) {
+      throw new TransactionException(e.getMessage());
+    }
+  }
+
+  /**
    * Allocates a write id for the given table.
    * @param client is the HMS client to be used.
    * @param txnId is the transaction id.
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index f282246..72c8764 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -20,6 +20,7 @@ package org.apache.impala.service;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -36,6 +37,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.impala.analysis.AlterDbStmt;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
@@ -183,6 +188,12 @@ public class Frontend {
   private static final int INCONSISTENT_METADATA_NUM_RETRIES =
       BackendConfig.INSTANCE.getLocalCatalogMaxFetchRetries();
 
+  // Number of retries to acquire an HMS ACID lock.
+  private static final int LOCK_RETRIES = 10;
+
+  // Time interval between retries of acquiring an HMS ACID lock
+  private static final int LOCK_RETRY_WAIT_SECONDS = 3;
+
   /**
    * Plan-time context that allows capturing various artifacts created
    * during the process.
@@ -257,6 +268,8 @@ public class Frontend {
   // Stores metastore clients for direct accesses to HMS.
   private final MetaStoreClientPool metaStoreClientPool_;
 
+  private final TransactionKeepalive transactionKeepalive_;
+
   public Frontend(AuthorizationFactory authzFactory) throws ImpalaException {
     this(authzFactory, FeCatalogManager.createFromBackendConfig());
   }
@@ -290,6 +303,11 @@ public class Frontend {
         BackendConfig.INSTANCE);
     queryHookManager_ = QueryEventHookManager.createFromConfig(BackendConfig.INSTANCE);
     metaStoreClientPool_ = new MetaStoreClientPool(1, 0);
+    if (MetastoreShim.getMajorVersion() > 2) {
+      transactionKeepalive_ = new TransactionKeepalive(metaStoreClientPool_);
+    } else {
+      transactionKeepalive_ = null;
+    }
   }
 
   public FeCatalog getCatalog() { return catalogManager_.getOrCreateCatalog(); }
@@ -1292,11 +1310,15 @@ public class Frontend {
         if (AcidUtils.isTransactionalTable(
             targetTable.getMetaStoreTable().getParameters())) {
           // TODO (IMPALA-8788): should load table write ids in transaction context.
-          long txnId = openTransaction();
+          long txnId = openTransaction(queryCtx);
           queryCtx.setTransaction_id(txnId);
-          timeline.markEvent("Transaction opened");
+          timeline.markEvent("Transaction opened (" + String.valueOf(txnId) + ")");
           Long writeId = allocateWriteIdIfNeeded(queryCtx, targetTable);
-          if (writeId != null) insertStmt.setWriteId(writeId);
+          if (writeId != null) {
+            Collection<FeTable> tables = stmtTableCache.tables.values();
+            createLockForInsert(txnId, tables, targetTable, insertStmt.isOverwrite());
+            insertStmt.setWriteId(writeId);
+          }
         }
       } else if (analysisResult.isLoadDataStmt()) {
         result.stmt_type = TStmtType.LOAD;
@@ -1644,26 +1666,42 @@ public class Frontend {
   }
 
   /**
-   * Opens a new transaction.
+   * Opens a new transaction and registers it to the keepalive object.
+   * @param queryCtx context of the query that requires the transaction.
    * @return the transaction id.
    * @throws TransactionException
    */
-  private long openTransaction() throws TransactionException {
-    IMetaStoreClient hmsClient = metaStoreClientPool_.getClient().getHiveClient();
-    return MetastoreShim.openTransaction(hmsClient, "Impala");
+  private long openTransaction(TQueryCtx queryCtx) throws TransactionException {
+    try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
+      IMetaStoreClient hmsClient = client.getHiveClient();
+      long transactionId = MetastoreShim.openTransaction(hmsClient, "Impala");
+      transactionKeepalive_.addTransaction(transactionId, queryCtx);
+      return transactionId;
+    }
   }
 
   /**
    * Aborts a transaction.
-   * @param transactionId is the id of the transaction to abort.
+   * @param txnId is the id of the transaction to abort.
    * @throws TransactionException
    * TODO: maybe we should make it async.
    */
-  public void abortTransaction(long transactionId) throws TransactionException {
-    Long txnId = transactionId;
-    LOG.error("Aborting transaction: " + txnId.toString());
-    IMetaStoreClient hmsClient = metaStoreClientPool_.getClient().getHiveClient();
-    MetastoreShim.abortTransaction(hmsClient, transactionId);
+  public void abortTransaction(long txnId) throws TransactionException {
+    LOG.error("Aborting transaction: " + Long.toString(txnId));
+    try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
+      IMetaStoreClient hmsClient = client.getHiveClient();
+      transactionKeepalive_.deleteTransaction(txnId);
+      MetastoreShim.abortTransaction(hmsClient, txnId);
+    }
+  }
+
+  /**
+   * Unregisters an already committed transaction.
+   * @param txnId is the id of the transaction to clear.
+   */
+  public void unregisterTransaction(long txnId) {
+    LOG.info("Unregistering already committed transaction: " + Long.toString(txnId));
+    transactionKeepalive_.deleteTransaction(txnId);
   }
 
   /**
@@ -1681,10 +1719,52 @@ public class Frontend {
     if (!AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
       return null;
     }
+    try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
+      IMetaStoreClient hmsClient = client.getHiveClient();
+      long txnId = queryCtx.getTransaction_id();
+      return MetastoreShim.allocateTableWriteId(hmsClient, txnId, table.getDb().getName(),
+          table.getName());
+    }
+  }
 
-    IMetaStoreClient hmsClient = this.metaStoreClientPool_.getClient().getHiveClient();
-    long txnId = queryCtx.getTransaction_id();
-    return MetastoreShim.allocateTableWriteId(hmsClient, txnId, table.getDb().getName(),
-        table.getName());
+  /**
+   * Creates Lock object for the insert statement.
+   * @param txnId the transaction id to be used.
+   * @param tables the tables in the query.
+   * @param targetTable the target table of INSERT. Must be transactional.
+   * @param isOverwrite
+   * @throws TransactionException
+   */
+  private void createLockForInsert(Long txnId, Collection<FeTable> tables,
+      FeTable targetTable, boolean isOverwrite) throws TransactionException {
+    Preconditions.checkState(
+        AcidUtils.isTransactionalTable(targetTable.getMetaStoreTable().getParameters()));
+    List<LockComponent> lockComponents = new ArrayList<>(tables.size());
+    for (FeTable table : tables) {
+      if (!AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
+        continue;
+      }
+      LockComponent lockComponent = new LockComponent();
+      lockComponent.setDbname(table.getDb().getName());
+      lockComponent.setTablename(table.getName());
+      lockComponent.setLevel(LockLevel.TABLE);
+      if (table == targetTable) {
+        if (isOverwrite) {
+          lockComponent.setType(LockType.EXCLUSIVE);
+        } else {
+          lockComponent.setType(LockType.SHARED_READ);
+        }
+        lockComponent.setOperationType(DataOperationType.INSERT);
+      } else {
+        lockComponent.setType(LockType.SHARED_READ);
+        lockComponent.setOperationType(DataOperationType.SELECT);
+      }
+      lockComponents.add(lockComponent);
+    }
+    try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
+      IMetaStoreClient hmsClient = client.getHiveClient();
+      MetastoreShim.acquireLock(hmsClient, txnId, lockComponents, LOCK_RETRIES,
+          LOCK_RETRY_WAIT_SECONDS);
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index c73ca06..f550956 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -639,7 +639,7 @@ public class JniFrontend {
 
   /**
    * Aborts a transaction.
-   * @param transactionId the id of the transaction to commit.
+   * @param transactionId the id of the transaction to abort.
    * @throws TransactionException
    */
   public void abortTransaction(long transactionId) throws TransactionException {
@@ -647,6 +647,14 @@ public class JniFrontend {
   }
 
   /**
+   * Unregister an already committed transaction.
+   * @param transactionId the id of the transaction to clear.
+   */
+  public void unregisterTransaction(long transactionId) {
+    this.frontend_.unregisterTransaction(transactionId);
+  }
+
+  /**
    * Returns an error string describing configuration issue with the groups mapping
    * provider implementation.
    */
diff --git a/fe/src/main/java/org/apache/impala/service/TransactionKeepalive.java b/fe/src/main/java/org/apache/impala/service/TransactionKeepalive.java
new file mode 100644
index 0000000..21ccbce
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/service/TransactionKeepalive.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.service;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.impala.catalog.MetaStoreClientPool;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.common.TransactionException;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.thrift.TQueryCtx;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.sun.tools.javac.code.Attribute.Array;
+
+/**
+ * Object of this class creates a daemon thread that periodically heartbeats the
+ * registered transactions and locks to HMS to keep them alive.
+ * TODO(IMPALA-8788) once we start opening a transaction for every query we should
+ * re-think our wait policy to spread out RPCs in time.
+ */
+public class TransactionKeepalive {
+  public static final Logger LOG = Logger.getLogger(TransactionKeepalive.class);
+
+  // TODO: should be calculated from hive.txn.timeout or metastore.txn.timeout
+  private static final long SLEEP_INTERVAL_SECONDS = 60;
+  private static final long MILLION = 1000000L;
+  private static final long BILLION = 1000000000L;
+
+  final private Thread daemonThread_;
+
+  private final MetaStoreClientPool metaStoreClientPool_;
+
+  public static class HeartbeatContext {
+    public TQueryCtx queryCtx;
+    public long creationTime;
+
+    public HeartbeatContext(TQueryCtx queryCtx, long creationTime) {
+      this.queryCtx = queryCtx;
+      this.creationTime = creationTime;
+    }
+  }
+
+  // Map of transactions
+  private Map<Long, HeartbeatContext> transactions_ = new HashMap<>();
+
+  // Maps of locks.
+  private Map<Long, HeartbeatContext> locks_ = new HashMap<>();
+
+  private class DaemonThread implements Runnable {
+    /**
+     * Background thread does the periodic heartbeating.
+     */
+    @Override
+    public void run() {
+      Random rand = new Random();
+      try {
+        // Let's sleep for a random interval to make the different coordinators
+        // out-of-sync to each other. This way we probably lower the workload on HMS.
+        Thread.sleep(rand.nextInt((int)(SLEEP_INTERVAL_SECONDS * 1000)));
+      } catch (Throwable e) {
+        LOG.error("Unexpected exception thrown", e);
+      }
+      while (true) {
+        try {
+          // Let's deepcopy the transactions and locks to narrow the critical section.
+          Map<Long, HeartbeatContext> copyOfTransactions;
+          Map<Long, HeartbeatContext> copyOfLocks;
+          synchronized (TransactionKeepalive.this) {
+            copyOfTransactions = transactions_.entrySet().stream().collect(
+                Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+            copyOfLocks = locks_.entrySet().stream().collect(
+                Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+          }
+          long durationOfHeartbeatingMillis = 0;
+          if (!copyOfTransactions.isEmpty() || !copyOfLocks.isEmpty()) {
+            LOG.info("There are " + String.valueOf(copyOfTransactions.size()) +
+                " open transactions and " + String.valueOf(copyOfLocks) +
+                " independent locks in TransactionKeepalive. Start heartbeating them.");
+            long startHeartbeating = System.nanoTime();
+            sendHeartbeatsFor(copyOfTransactions, copyOfLocks);
+            durationOfHeartbeatingMillis =
+                (System.nanoTime() - startHeartbeating) / MILLION;
+            LOG.info("Heartbeating the transactions and locks took " +
+                durationOfHeartbeatingMillis + " milliseconds.");
+          }
+          long sleepMillis = SLEEP_INTERVAL_SECONDS * 1000 - durationOfHeartbeatingMillis;
+          if (sleepMillis > 0) {
+            long randomness = rand.nextInt((int)(sleepMillis / 10));
+            Thread.sleep(sleepMillis + randomness);
+          }
+        } catch (Throwable e) {
+          LOG.error("Unexpected exception thrown", e);
+        }
+      }
+    }
+
+    /**
+     * Sends heartbeats for transactions and locks that are old enough, i.e. older than
+     * the sleep interval.
+     * TODO: we can be more clever than that and should also take into consideration
+     * metastore.txn.timeout as well.
+     */
+    private void sendHeartbeatsFor(Map<Long, HeartbeatContext> transactions,
+        Map<Long, HeartbeatContext> locks) {
+      try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
+        IMetaStoreClient hmsClient = client.getHiveClient();
+        for (Map.Entry<Long, HeartbeatContext> entry : transactions.entrySet()) {
+          HeartbeatContext ctx = entry.getValue();
+          // Only heartbeat old transactions
+          if (oldEnough(ctx)) {
+            Long transactionId = entry.getKey();
+            sendHeartbeat(hmsClient, transactionId, 0L, ctx);
+          }
+        }
+        for (Map.Entry<Long, HeartbeatContext> entry : locks.entrySet()) {
+          HeartbeatContext ctx = entry.getValue();
+          // Only heartbeat old locks
+          if (oldEnough(ctx)) {
+            Long lockId = entry.getKey();
+            sendHeartbeat(hmsClient, 0L, lockId, ctx);
+          }
+        }
+      }
+    }
+
+    /**
+     * Determines whether a transaction or lock is old enough for heartbeating.
+     * @param heartbeatContext context information about creation time.
+     * @return True if we should heartbeat this entry.
+     */
+    private boolean oldEnough(HeartbeatContext heartbeatContext) {
+      Long ageInSeconds = (System.nanoTime() - heartbeatContext.creationTime) / BILLION;
+      return ageInSeconds > SLEEP_INTERVAL_SECONDS;
+    }
+
+    /**
+     * Sends a single heartbeat for 'transactionId' or 'lockId'.
+     */
+    private void sendHeartbeat(IMetaStoreClient hmsClient, long transactionId,
+        long lockId, HeartbeatContext context) {
+      // One of the values must be zero, but only one.
+      Preconditions.checkState(transactionId == 0 || lockId == 0);
+      Preconditions.checkState(transactionId != 0 || lockId != 0);
+      try {
+        if (!MetastoreShim.heartbeat(hmsClient, transactionId, lockId)) {
+          // Transaction or lock doesn't exist anymore, let's remove them.
+          if (transactionId != 0) {
+            LOG.warn("Transaction " + String.valueOf(transactionId) + " of query " +
+                context.queryCtx.query_id.toString() + " doesn't exist anymore. Stop " +
+                "heartbeating it.");
+            TransactionKeepalive.this.deleteTransaction(transactionId);
+          }
+          if (lockId != 0) {
+            LOG.warn("Lock " + String.valueOf(lockId) + " of query " +
+                context.queryCtx.query_id.toString() + " doesn't exist anymore. Stop " +
+                "heartbeating it.");
+            TransactionKeepalive.this.deleteLock(lockId);
+          }
+        }
+      } catch (TransactionException e) {
+        LOG.warn("Caught exception during heartbeating transaction " +
+            String.valueOf(transactionId) + " lock " + String.valueOf(lockId) +
+            " for query " + context.queryCtx.query_id.toString(), e);
+      }
+    }
+  }
+
+  /**
+   * Creates TransactionKeepalive object and starts the background thread.
+   */
+  public TransactionKeepalive(MetaStoreClientPool metaStoreClientPool) {
+    Preconditions.checkNotNull(metaStoreClientPool);
+    metaStoreClientPool_ = metaStoreClientPool;
+    daemonThread_ = new Thread(new DaemonThread());
+    daemonThread_.setDaemon(true);
+    daemonThread_.setName("Transaction keepalive thread");
+    daemonThread_.start();
+  }
+
+  /**
+   * Add transaction to heartbeat. Associated locks shouldn't be added.
+   */
+  synchronized public void addTransaction(Long transactionId, TQueryCtx queryCtx) {
+    Preconditions.checkNotNull(transactionId);
+    Preconditions.checkNotNull(queryCtx);
+    Preconditions.checkState(!transactions_.containsKey(transactionId));
+    HeartbeatContext ctx = new HeartbeatContext(queryCtx, System.nanoTime());
+    transactions_.put(transactionId, ctx);
+  }
+
+  /**
+   * Add lock to heartbeat. This should be a lock without a transaction context.
+   */
+  synchronized public void addLock(Long lockId, TQueryCtx queryCtx) {
+    Preconditions.checkNotNull(lockId);
+    Preconditions.checkNotNull(queryCtx);
+    Preconditions.checkState(!locks_.containsKey(lockId));
+    HeartbeatContext ctx = new HeartbeatContext(queryCtx, System.nanoTime());
+    locks_.put(lockId, ctx);
+  }
+
+  /**
+   * Stop heartbeating transaction.
+   */
+  synchronized public void deleteTransaction(Long transactionId) {
+    Preconditions.checkNotNull(transactionId);
+    if (transactions_.remove(transactionId) == null) {
+      LOG.info("Transaction id " + transactionId + " was already removed from " +
+          "TransactionKeepalive object or never existed.");
+    };
+  }
+
+  /**
+   * Stop heartbeating lock.
+   */
+  synchronized public void deleteLock(Long lockId) {
+    Preconditions.checkNotNull(lockId);
+    if (locks_.remove(lockId) == null) {
+      LOG.info("Lock id " + lockId + " was already removed from " +
+          "TransactionKeepalive object or never existed.");
+    };
+  }
+}
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index 208315d..6ac72dd 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -16,6 +16,10 @@
 # under the License.
 
 # Functional tests for ACID integration with Hive.
+
+import pytest
+import time
+
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import (SkipIfHive2, SkipIfCatalogV2, SkipIfS3, SkipIfABFS,
                                SkipIfADLS, SkipIfIsilon, SkipIfLocal)
@@ -84,3 +88,66 @@ class TestAcid(ImpalaTestSuite):
 #  TRUNCATE, once HIVE-20137 is implemented.
 #  INSERT OVERWRITE with empty result set, once HIVE-21750 is fixed.
 #  Negative test for LOAD DATA INPATH and all other SQL that we don't support.
+
+  @SkipIfHive2.acid
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  @pytest.mark.execute_serially
+  def test_acid_heartbeats(self, vector, unique_database):
+    """Tests heartbeating of transactions. Creates a long-running query via
+    some jitting and in the meanwhile it periodically checks whether there is
+    a transaction that has sent a heartbeat since its start.
+    """
+    if self.exploration_strategy() != 'exhaustive': pytest.skip()
+    last_open_txn_start_time = self._latest_open_transaction()
+    dummy_tbl = "{}.{}".format(unique_database, "dummy")
+    self.execute_query("create table {} (i int) tblproperties"
+                       "('transactional'='true',"
+                       "'transactional_properties'='insert_only')".format(dummy_tbl))
+    try:
+      handle = self.execute_query_async(
+          "insert into {} values (sleep(200000))".format(dummy_tbl))
+      MAX_ATTEMPTS = 10
+      attempt = 0
+      success = False
+      while attempt < MAX_ATTEMPTS:
+        if self._any_open_heartbeated_transaction_since(last_open_txn_start_time):
+          success = True
+          break
+        attempt += 1
+        time.sleep(20)
+      assert success
+    finally:
+      self.client.cancel(handle)
+
+  def _latest_open_transaction(self):
+    max_start = 0
+    for txn in self._get_impala_transactions():
+      start = txn['start_time']
+      if start > max_start:
+        max_start = start
+    return max_start
+
+  def _any_open_heartbeated_transaction_since(self, since_start_time):
+    for txn in self._get_impala_transactions():
+      if txn['state'] == 'OPEN':
+        start = txn['start_time']
+        if start > since_start_time and start != txn['last_heartbeat']:
+          return True
+    return False
+
+  def _get_impala_transactions(self):
+    transactions = self.run_stmt_in_hive("SHOW TRANSACTIONS")
+    for transaction_line in transactions.split('\n')[2:-1]:
+      transaction_columns = transaction_line.split(',')
+      txn_dict = dict()
+      txn_dict['state'] = transaction_columns[1]
+      txn_dict['start_time'] = int(transaction_columns[2])
+      txn_dict['last_heartbeat'] = int(transaction_columns[3])
+      txn_dict['user'] = transaction_columns[4]
+      if txn_dict['user'] != 'Impala':
+        continue
+      yield txn_dict