You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:24 UTC

[15/50] [abbrv] hive git commit: HIVE-19416 : merge master into branch (Sergey Shelukhin) 0719

http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 0000000,33f24fb..080cc52
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@@ -1,0 -1,504 +1,509 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hive.metastore.txn;
+ 
+ import com.google.common.annotations.VisibleForTesting;
++
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.classification.InterfaceStability;
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.hive.common.ValidTxnList;
+ import org.apache.hadoop.hive.common.ValidWriteIdList;
+ import org.apache.hadoop.hive.common.classification.RetrySemantics;
+ import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+ 
+ import java.sql.SQLException;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ 
+ /**
+  * A handler to answer transaction related calls that come into the metastore
+  * server.
+  */
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ public interface TxnStore extends Configurable {
+ 
+   enum MUTEX_KEY {
+     Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
+     WriteSetCleaner, CompactionScheduler, WriteIdAllocator, MaterializationRebuild
+   }
+   // Compactor states (Should really be enum)
+   String INITIATED_RESPONSE = "initiated";
+   String WORKING_RESPONSE = "working";
+   String CLEANING_RESPONSE = "ready for cleaning";
+   String FAILED_RESPONSE = "failed";
+   String SUCCEEDED_RESPONSE = "succeeded";
+   String ATTEMPTED_RESPONSE = "attempted";
+ 
+   int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000;
+ 
+   /**
+    * Get information about open transactions.  This gives extensive information about the
+    * transactions rather than just the list of transactions.  This should be used when the need
+    * is to see information about the transactions (e.g. show transactions).
+    * @return information about open transactions
+    * @throws MetaException
+    */
+   @RetrySemantics.ReadOnly
+   GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
+ 
+   /**
+    * Get list of valid transactions.  This gives just the list of transactions that are open.
+    * @return list of open transactions, as well as a high water mark.
+    * @throws MetaException
+    */
+   @RetrySemantics.ReadOnly
+   GetOpenTxnsResponse getOpenTxns() throws MetaException;
+ 
+   /**
+    * Get the count for open transactions.
+    * @throws MetaException
+    */
+   @RetrySemantics.ReadOnly
+   void countOpenTxns() throws MetaException;
+ 
+   /**
+    * Open a set of transactions
+    * @param rqst request to open transactions
+    * @return information on opened transactions
+    * @throws MetaException
+    */
+   @RetrySemantics.Idempotent
+   OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
+ 
+   @RetrySemantics.Idempotent
+   long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException;
+ 
+   /**
+    * Abort (rollback) a transaction.
+    * @param rqst info on transaction to abort
+    * @throws NoSuchTxnException
+    * @throws MetaException
+    */
+   @RetrySemantics.Idempotent
+   void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException;
+ 
+   /**
+    * Abort (rollback) a list of transactions in one request.
+    * @param rqst info on transactions to abort
+    * @throws NoSuchTxnException
+    * @throws MetaException
+    */
+   @RetrySemantics.Idempotent
+   void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException;
+ 
+   /**
+    * Commit a transaction
+    * @param rqst info on transaction to commit
+    * @throws NoSuchTxnException
+    * @throws TxnAbortedException
+    * @throws MetaException
+    */
+   @RetrySemantics.Idempotent
+   void commitTxn(CommitTxnRequest rqst)
+     throws NoSuchTxnException, TxnAbortedException,  MetaException;
+ 
+   /**
+    * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+    * @param rqst info on table/partitions and writeid snapshot to replicate.
+    * @throws MetaException in case of failure
+    */
+   @RetrySemantics.Idempotent
+   void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException;
+ 
+   /**
+    * Get invalidation info for the materialization. Currently, the materialization information
+    * only contains information about whether there was update/delete operations on the source
+    * tables used by the materialization since it was created.
+    * @param cm creation metadata for the materialization
+    * @param validTxnList valid transaction list for snapshot taken for current query
+    * @throws MetaException
+    */
+   @RetrySemantics.Idempotent
+   Materialization getMaterializationInvalidationInfo(
+       final CreationMetadata cm, final String validTxnList)
+           throws MetaException;
+ 
++  @RetrySemantics.ReadOnly
++  long getTxnIdForWriteId(String dbName, String tblName, long writeId)
++      throws MetaException;
++
+   LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId)
+       throws MetaException;
+ 
+   boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId)
+       throws MetaException;
+ 
+   long cleanupMaterializationRebuildLocks(ValidTxnList validTxnList, long timeout)
+       throws MetaException;
+ 
+     /**
+      * Gets the list of valid write ids for the given table wrt to current txn
+      * @param rqst info on transaction and list of table names associated with given transaction
+      * @throws NoSuchTxnException
+      * @throws MetaException
+      */
+   @RetrySemantics.ReadOnly
+   GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
+           throws NoSuchTxnException,  MetaException;
+ 
+   /**
+    * Allocate a write ID for the given table and associate it with a transaction
+    * @param rqst info on transaction and table to allocate write id
+    * @throws NoSuchTxnException
+    * @throws TxnAbortedException
+    * @throws MetaException
+    */
+   AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
+     throws NoSuchTxnException, TxnAbortedException, MetaException;
+ 
+   /**
+    * Called on conversion of existing table to full acid.  Sets initial write ID to a high
+    * enough value so that we can assign unique ROW__IDs to data in existing files.
+    */
+   void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) throws MetaException;
+ 
+   /**
+    * Obtain a lock.
+    * @param rqst information on the lock to obtain.  If the requester is part of a transaction
+    *             the txn information must be included in the lock request.
+    * @return info on the lock, including whether it was obtained.
+    * @throws NoSuchTxnException
+    * @throws TxnAbortedException
+    * @throws MetaException
+    */
+   @RetrySemantics.CannotRetry
+   LockResponse lock(LockRequest rqst)
+     throws NoSuchTxnException, TxnAbortedException, MetaException;
+ 
+   /**
+    * Check whether a lock has been obtained.  This is used after {@link #lock} returned a wait
+    * state.
+    * @param rqst info on the lock to check
+    * @return info on the state of the lock
+    * @throws NoSuchTxnException
+    * @throws NoSuchLockException
+    * @throws TxnAbortedException
+    * @throws MetaException
+    */
+   @RetrySemantics.SafeToRetry
+   LockResponse checkLock(CheckLockRequest rqst)
+     throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+ 
+   /**
+    * Unlock a lock.  It is not legal to call this if the caller is part of a txn.  In that case
+    * the txn should be committed or aborted instead.  (Note someday this will change since
+    * multi-statement transactions will allow unlocking in the transaction.)
+    * @param rqst lock to unlock
+    * @throws NoSuchLockException
+    * @throws TxnOpenException
+    * @throws MetaException
+    */
+   @RetrySemantics.Idempotent
+   void unlock(UnlockRequest rqst)
+     throws NoSuchLockException, TxnOpenException, MetaException;
+ 
+   /**
+    * Get information on current locks.
+    * @param rqst lock information to retrieve
+    * @return lock information.
+    * @throws MetaException
+    */
+   @RetrySemantics.ReadOnly
+   ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException;
+ 
+   /**
+    * Send a heartbeat for a lock or a transaction
+    * @param ids lock and/or txn id to heartbeat
+    * @throws NoSuchTxnException
+    * @throws NoSuchLockException
+    * @throws TxnAbortedException
+    * @throws MetaException
+    */
+   @RetrySemantics.SafeToRetry
+   void heartbeat(HeartbeatRequest ids)
+     throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, MetaException;
+ 
+   /**
+    * Heartbeat a group of transactions together
+    * @param rqst set of transactions to heartbat
+    * @return info on txns that were heartbeated
+    * @throws MetaException
+    */
+   @RetrySemantics.SafeToRetry
+   HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
+     throws MetaException;
+ 
+   /**
+    * Submit a compaction request into the queue.  This is called when a user manually requests a
+    * compaction.
+    * @param rqst information on what to compact
+    * @return id of the compaction that has been started or existing id if this resource is already scheduled
+    * @throws MetaException
+    */
+   @RetrySemantics.Idempotent
+   CompactionResponse compact(CompactionRequest rqst) throws MetaException;
+ 
+   /**
+    * Show list of current compactions.
+    * @param rqst info on which compactions to show
+    * @return compaction information
+    * @throws MetaException
+    */
+   @RetrySemantics.ReadOnly
+   ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException;
+ 
+   /**
+    * Add information on a set of dynamic partitions that participated in a transaction.
+    * @param rqst dynamic partition info.
+    * @throws NoSuchTxnException
+    * @throws TxnAbortedException
+    * @throws MetaException
+    */
+   @RetrySemantics.SafeToRetry
+   void addDynamicPartitions(AddDynamicPartitions rqst)
+       throws NoSuchTxnException,  TxnAbortedException, MetaException;
+ 
+   /**
+    * Clean up corresponding records in metastore tables.
+    * @param type Hive object type
+    * @param db database object
+    * @param table table object
+    * @param partitionIterator partition iterator
+    * @throws MetaException
+    */
+   @RetrySemantics.Idempotent
+   void cleanupRecords(HiveObjectType type, Database db, Table table,
+                              Iterator<Partition> partitionIterator) throws MetaException;
+ 
+   @RetrySemantics.Idempotent
+   void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName,
+       String newCatName, String newDbName, String newTabName, String newPartName)
+       throws MetaException;
+ 
+   /**
+    * Timeout transactions and/or locks.  This should only be called by the compactor.
+    */
+   @RetrySemantics.Idempotent
+   void performTimeOuts();
+ 
+   /**
+    * This will look through the completed_txn_components table and look for partitions or tables
+    * that may be ready for compaction.  Also, look through txns and txn_components tables for
+    * aborted transactions that we should add to the list.
+    * @param maxAborted Maximum number of aborted queries to allow before marking this as a
+    *                   potential compaction.
+    * @return list of CompactionInfo structs.  These will not have id, type,
+    * or runAs set since these are only potential compactions not actual ones.
+    */
+   @RetrySemantics.ReadOnly
+   Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
+ 
+   /**
+    * Sets the user to run as.  This is for the case
+    * where the request was generated by the user and so the worker must set this value later.
+    * @param cq_id id of this entry in the queue
+    * @param user user to run the jobs as
+    */
+   @RetrySemantics.Idempotent
+   void setRunAs(long cq_id, String user) throws MetaException;
+ 
+   /**
+    * This will grab the next compaction request off of
+    * the queue, and assign it to the worker.
+    * @param workerId id of the worker calling this, will be recorded in the db
+    * @return an info element for this compaction request, or null if there is no work to do now.
+    */
+   @RetrySemantics.ReadOnly
+   CompactionInfo findNextToCompact(String workerId) throws MetaException;
+ 
+   /**
+    * This will mark an entry in the queue as compacted
+    * and put it in the ready to clean state.
+    * @param info info on the compaction entry to mark as compacted.
+    */
+   @RetrySemantics.SafeToRetry
+   void markCompacted(CompactionInfo info) throws MetaException;
+ 
+   /**
+    * Find entries in the queue that are ready to
+    * be cleaned.
+    * @return information on the entry in the queue.
+    */
+   @RetrySemantics.ReadOnly
+   List<CompactionInfo> findReadyToClean() throws MetaException;
+ 
+   /**
+    * This will remove an entry from the queue after
+    * it has been compacted.
+    * 
+    * @param info info on the compaction entry to remove
+    */
+   @RetrySemantics.CannotRetry
+   void markCleaned(CompactionInfo info) throws MetaException;
+ 
+   /**
+    * Mark a compaction entry as failed.  This will move it to the compaction history queue with a
+    * failed status.  It will NOT clean up aborted transactions in the table/partition associated
+    * with this compaction.
+    * @param info information on the compaction that failed.
+    * @throws MetaException
+    */
+   @RetrySemantics.CannotRetry
+   void markFailed(CompactionInfo info) throws MetaException;
+ 
+   /**
+    * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
+    * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)).
+    */
+   @RetrySemantics.SafeToRetry
+   void cleanTxnToWriteIdTable() throws MetaException;
+ 
+   /**
+    * Clean up aborted transactions from txns that have no components in txn_components.  The reson such
+    * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
+    * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+    */
+   @RetrySemantics.SafeToRetry
+   void cleanEmptyAbortedTxns() throws MetaException;
+ 
+   /**
+    * This will take all entries assigned to workers
+    * on a host return them to INITIATED state.  The initiator should use this at start up to
+    * clean entries from any workers that were in the middle of compacting when the metastore
+    * shutdown.  It does not reset entries from worker threads on other hosts as those may still
+    * be working.
+    * @param hostname Name of this host.  It is assumed this prefixes the thread's worker id,
+    *                 so that like hostname% will match the worker id.
+    */
+   @RetrySemantics.Idempotent
+   void revokeFromLocalWorkers(String hostname) throws MetaException;
+ 
+   /**
+    * This call will return all compaction queue
+    * entries assigned to a worker but over the timeout back to the initiated state.
+    * This should be called by the initiator on start up and occasionally when running to clean up
+    * after dead threads.  At start up {@link #revokeFromLocalWorkers(String)} should be called
+    * first.
+    * @param timeout number of milliseconds since start time that should elapse before a worker is
+    *                declared dead.
+    */
+   @RetrySemantics.Idempotent
+   void revokeTimedoutWorkers(long timeout) throws MetaException;
+ 
+   /**
+    * Queries metastore DB directly to find columns in the table which have statistics information.
+    * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+    * table level stats are examined.
+    * @throws MetaException
+    */
+   @RetrySemantics.ReadOnly
+   List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
+ 
+   /**
+    * Record the highest write id that the {@code ci} compaction job will pay attention to.
+    */
+   @RetrySemantics.Idempotent
+   void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException;
+ 
+   /**
+    * For any given compactable entity (partition, table if not partitioned) the history of compactions
+    * may look like "sssfffaaasffss", for example.  The idea is to retain the tail (most recent) of the
+    * history such that a configurable number of each type of state is present.  Any other entries
+    * can be purged.  This scheme has advantage of always retaining the last failure/success even if
+    * it's not recent.
+    * @throws MetaException
+    */
+   @RetrySemantics.SafeToRetry
+   void purgeCompactionHistory() throws MetaException;
+ 
+   /**
+    * WriteSet tracking is used to ensure proper transaction isolation.  This method deletes the 
+    * transaction metadata once it becomes unnecessary.  
+    */
+   @RetrySemantics.SafeToRetry
+   void performWriteSetGC();
+ 
+   /**
+    * Determine if there are enough consecutive failures compacting a table or partition that no
+    * new automatic compactions should be scheduled.  User initiated compactions do not do this
+    * check.
+    * @param ci  Table or partition to check.
+    * @return true if it is ok to compact, false if there have been too many failures.
+    * @throws MetaException
+    */
+   @RetrySemantics.ReadOnly
+   boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
+ 
+   @VisibleForTesting
+   int numLocksInLockTable() throws SQLException, MetaException;
+ 
+   @VisibleForTesting
+   long setTimeout(long milliseconds);
+ 
+   @RetrySemantics.Idempotent
+   MutexAPI getMutexAPI();
+ 
+   /**
+    * This is primarily designed to provide coarse grained mutex support to operations running
+    * inside the Metastore (of which there could be several instances).  The initial goal is to 
+    * ensure that various sub-processes of the Compactor don't step on each other.
+    * 
+    * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly.
+    */
+   interface MutexAPI {
+     /**
+      * The {@code key} is name of the lock. Will acquire and exclusive lock or block.  It retuns
+      * a handle which must be used to release the lock.  Each invocation returns a new handle.
+      */
+     LockHandle acquireLock(String key) throws MetaException;
+ 
+     /**
+      * Same as {@link #acquireLock(String)} but takes an already existing handle as input.  This 
+      * will associate the lock on {@code key} with the same handle.  All locks associated with
+      * the same handle will be released together.
+      * @param handle not NULL
+      */
+     void acquireLock(String key, LockHandle handle) throws MetaException;
+     interface LockHandle {
+       /**
+        * Releases all locks associated with this handle.
+        */
+       void releaseLocks();
+     }
+   }
+ 
+   /**
+    * Once a {@link java.util.concurrent.ThreadPoolExecutor} Worker submits a job to the cluster,
+    * it calls this to update the metadata.
+    * @param id {@link CompactionInfo#id}
+    */
+   @RetrySemantics.Idempotent
+   void setHadoopJobId(String hadoopJobId, long id);
+ 
+   /**
+    * Add the ACID write event information to writeNotificationLog table.
+    * @param acidWriteEvent
+    */
+   @RetrySemantics.Idempotent
+   void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException;
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 0000000,fa291d5..aac5811
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@@ -1,0 -1,471 +1,481 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p/>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p/>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hive.metastore.txn;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+ import org.apache.hadoop.hive.common.ValidReadTxnList;
+ import org.apache.hadoop.hive.common.ValidTxnList;
+ import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+ import org.apache.hadoop.hive.common.ValidWriteIdList;
+ import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 -import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
 -import org.apache.hadoop.hive.metastore.api.Table;
 -import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
++import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.util.Collections;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.BitSet;
+ import java.util.List;
+ import java.util.Map;
+ 
+ public class TxnUtils {
+   private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
+ 
++  // Transactional stats states
++  static final public char STAT_OPEN = 'o';
++  static final public char STAT_INVALID = 'i';
++  static final public char STAT_COMMITTED = 'c';
++  static final public char STAT_OBSOLETE = 's';
++
+   /**
+    * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
+    * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that the caller intends to
+    * read the files, and thus treats both open and aborted transactions as invalid.
+    * @param txns txn list from the metastore
+    * @param currentTxn Current transaction that the user has open.  If this is greater than 0 it
+    *                   will be removed from the exceptions list so that the user sees his own
+    *                   transaction as valid.
+    * @return a valid txn list.
+    */
+   public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+     /*
+      * The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
+      * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
+      * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should
+      * include the latest committed set.
+      */
+     long highWaterMark = (currentTxn > 0) ? Math.min(currentTxn, txns.getTxn_high_water_mark())
+                                           : txns.getTxn_high_water_mark();
+ 
+     // Open txns are already sorted in ascending order. This list may or may not include HWM
+     // but it is guaranteed that list won't have txn > HWM. But, if we overwrite the HWM with currentTxn
+     // then need to truncate the exceptions list accordingly.
+     List<Long> openTxns = txns.getOpen_txns();
+ 
+     // We care only about open/aborted txns below currentTxn and hence the size should be determined
+     // for the exceptions list. The currentTxn will be missing in openTxns list only in rare case like
+     // txn is aborted by AcidHouseKeeperService and compactor actually cleans up the aborted txns.
+     // So, for such cases, we get negative value for sizeToHwm with found position for currentTxn, and so,
+     // we just negate it to get the size.
+     int sizeToHwm = (currentTxn > 0) ? Collections.binarySearch(openTxns, currentTxn) : openTxns.size();
+     sizeToHwm = (sizeToHwm < 0) ? (-sizeToHwm) : sizeToHwm;
+     long[] exceptions = new long[sizeToHwm];
+     BitSet inAbortedBits = BitSet.valueOf(txns.getAbortedBits());
+     BitSet outAbortedBits = new BitSet();
+     long minOpenTxnId = Long.MAX_VALUE;
+     int i = 0;
+     for (long txn : openTxns) {
+       // For snapshot isolation, we don't care about txns greater than current txn and so stop here.
+       // Also, we need not include current txn to exceptions list.
+       if ((currentTxn > 0) && (txn >= currentTxn)) {
+         break;
+       }
+       if (inAbortedBits.get(i)) {
+         outAbortedBits.set(i);
+       } else if (minOpenTxnId == Long.MAX_VALUE) {
+         minOpenTxnId = txn;
+       }
+       exceptions[i++] = txn;
+     }
+     return new ValidReadTxnList(exceptions, outAbortedBits, highWaterMark, minOpenTxnId);
+   }
+ 
+   /**
+    * Transform a {@link org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse} to a
+    * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}.  This assumes that the caller intends to
+    * read the files, and thus treats both open and aborted transactions as invalid.
+    * @param currentTxnId current txn ID for which we get the valid write ids list
+    * @param list valid write ids list from the metastore
+    * @return a valid write IDs list for the whole transaction.
+    */
+   public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId,
+                                                               List<TableValidWriteIds> validIds) {
+     ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId);
+     for (TableValidWriteIds tableWriteIds : validIds) {
+       validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds));
+     }
+     return validTxnWriteIdList;
+   }
+ 
+   /**
+    * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
+    * {@link org.apache.hadoop.hive.common.ValidReaderWriteIdList}.  This assumes that the caller intends to
+    * read the files, and thus treats both open and aborted write ids as invalid.
+    * @param tableWriteIds valid write ids for the given table from the metastore
+    * @return a valid write IDs list for the input table
+    */
+   public static ValidReaderWriteIdList createValidReaderWriteIdList(TableValidWriteIds tableWriteIds) {
+     String fullTableName = tableWriteIds.getFullTableName();
+     long highWater = tableWriteIds.getWriteIdHighWaterMark();
+     List<Long> invalids = tableWriteIds.getInvalidWriteIds();
+     BitSet abortedBits = BitSet.valueOf(tableWriteIds.getAbortedBits());
+     long[] exceptions = new long[invalids.size()];
+     int i = 0;
+     for (long writeId : invalids) {
+       exceptions[i++] = writeId;
+     }
+     if (tableWriteIds.isSetMinOpenWriteId()) {
+       return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater,
+                                         tableWriteIds.getMinOpenWriteId());
+     } else {
+       return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater);
+     }
+   }
+ 
+   /**
+    * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
+    * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}.  This assumes that the caller intends to
+    * compact the files, and thus treats only open transactions/write ids as invalid.  Additionally any
+    * writeId &gt; highestOpenWriteId is also invalid.  This is to avoid creating something like
+    * delta_17_120 where writeId 80, for example, is still open.
+    * @param tableValidWriteIds table write id list from the metastore
+    * @return a valid write id list.
+    */
+   public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValidWriteIds tableValidWriteIds) {
+     String fullTableName = tableValidWriteIds.getFullTableName();
+     long highWater = tableValidWriteIds.getWriteIdHighWaterMark();
+     long minOpenWriteId = Long.MAX_VALUE;
+     List<Long> invalids = tableValidWriteIds.getInvalidWriteIds();
+     BitSet abortedBits = BitSet.valueOf(tableValidWriteIds.getAbortedBits());
+     long[] exceptions = new long[invalids.size()];
+     int i = 0;
+     for (long writeId : invalids) {
+       if (abortedBits.get(i)) {
+         // Only need aborted since we don't consider anything above minOpenWriteId
+         exceptions[i++] = writeId;
+       } else {
+         minOpenWriteId = Math.min(minOpenWriteId, writeId);
+       }
+     }
+     if(i < exceptions.length) {
+       exceptions = Arrays.copyOf(exceptions, i);
+     }
+     highWater = minOpenWriteId == Long.MAX_VALUE ? highWater : minOpenWriteId - 1;
+     BitSet bitSet = new BitSet(exceptions.length);
+     bitSet.set(0, exceptions.length); // for ValidCompactorWriteIdList, everything in exceptions are aborted
+     if (minOpenWriteId == Long.MAX_VALUE) {
+       return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater);
+     } else {
+       return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater, minOpenWriteId);
+     }
+   }
+ 
+   public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) {
+     // This is based on the existing valid write ID list that was built for a select query;
+     // therefore we assume all the aborted txns, etc. were already accounted for.
+     // All we do is adjust the high watermark to only include contiguous txns.
+     Long minOpenWriteId = ids.getMinOpenWriteId();
+     if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) {
+       return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1);
+     }
+     return ids;
+   }
+ 
+   /**
+    * Get an instance of the TxnStore that is appropriate for this store
+    * @param conf configuration
+    * @return txn store
+    */
+   public static TxnStore getTxnStore(Configuration conf) {
+     String className = MetastoreConf.getVar(conf, ConfVars.TXN_STORE_IMPL);
+     try {
+       TxnStore handler = JavaUtils.getClass(className, TxnStore.class).newInstance();
+       handler.setConf(conf);
+       return handler;
+     } catch (Exception e) {
+       LOG.error("Unable to instantiate raw store directly in fastpath mode", e);
+       throw new RuntimeException(e);
+     }
+   }
+ 
+   /**
+    * Note, users are responsible for using the correct TxnManager. We do not look at
+    * SessionState.get().getTxnMgr().supportsAcid() here
+    * Should produce the same result as
+    * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)}.
+    * @return true if table is a transactional table, false otherwise
+    */
+   public static boolean isTransactionalTable(Table table) {
+     if (table == null) {
+       return false;
+     }
+     Map<String, String> parameters = table.getParameters();
+     String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
++    return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
++  }
++
++  public static boolean isTransactionalTable(Map<String, String> parameters) {
++    if (parameters == null) {
++      return false;
++    }
++    String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+     return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
+   }
+ 
+   /**
+    * Should produce the same result as
+    * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)}.
+    */
+   public static boolean isAcidTable(Table table) {
+     return TxnUtils.isTransactionalTable(table) &&
+       TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY.equals(table.getParameters()
+       .get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES));
+   }
+ 
+   /**
+    * Should produce the result as <dbName>.<tableName>.
+    */
+   public static String getFullTableName(String dbName, String tableName) {
+     return dbName.toLowerCase() + "." + tableName.toLowerCase();
+   }
+ 
+   public static String[] getDbTableName(String fullTableName) {
+     return fullTableName.split("\\.");
+   }
+ 
+ 
+ 
+   /**
+    * Build a query (or queries if one query is too big but only for the case of 'IN'
+    * composite clause. For the case of 'NOT IN' clauses, multiple queries change
+    * the semantics of the intended query.
+    * E.g., Let's assume that input "inList" parameter has [5, 6] and that
+    * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause,
+    * Then having two delete statements changes the semantics of the inteneded SQL statement.
+    * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence
+    * is not equal to 'delete from T where a not in (5, 6)'.)
+    * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters.
+    *
+    * Note that this method currently support only single column for
+    * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and
+    * AND-based composite 'NOT IN' clause.
+    * For example, for 'IN' clause case, the method will build a query with OR.
+    * E.g., "id in (1,2,3) OR id in (4,5,6)".
+    * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND.
+    *
+    * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN'
+    * clauses in a query".
+    *
+    * @param queries   OUT: Array of query strings
+    * @param prefix    IN:  Part of the query that comes before IN list
+    * @param suffix    IN:  Part of the query that comes after IN list
+    * @param inList    IN:  the list with IN list values
+    * @param inColumn  IN:  single column name of IN list operator
+    * @param addParens IN:  add a pair of parenthesis outside the IN lists
+    *                       e.g. "(id in (1,2,3) OR id in (4,5,6))"
+    * @param notIn     IN:  is this for building a 'NOT IN' composite clause?
+    * @return          OUT: a list of the count of IN list values that are in each of the corresponding queries
+    */
+   public static List<Integer> buildQueryWithINClause(Configuration conf,
+                                             List<String> queries,
+                                             StringBuilder prefix,
+                                             StringBuilder suffix,
+                                             List<Long> inList,
+                                             String inColumn,
+                                             boolean addParens,
+                                             boolean notIn) {
+     List<String> inListStrings = new ArrayList<>(inList.size());
+     for (Long aLong : inList) {
+       inListStrings.add(aLong.toString());
+     }
+     return buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
+         inListStrings, inColumn, addParens, notIn);
+ 
+   }
+   /**
+    * Build a query (or queries if one query is too big but only for the case of 'IN'
+    * composite clause. For the case of 'NOT IN' clauses, multiple queries change
+    * the semantics of the intended query.
+    * E.g., Let's assume that input "inList" parameter has [5, 6] and that
+    * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause,
+    * Then having two delete statements changes the semantics of the inteneded SQL statement.
+    * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence
+    * is not equal to 'delete from T where a not in (5, 6)'.)
+    * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters.
+    *
+    * Note that this method currently support only single column for
+    * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and
+    * AND-based composite 'NOT IN' clause.
+    * For example, for 'IN' clause case, the method will build a query with OR.
+    * E.g., "id in (1,2,3) OR id in (4,5,6)".
+    * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND.
+    *
+    * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN'
+    * clauses in a query".
+    *
+    * @param queries   OUT: Array of query strings
+    * @param prefix    IN:  Part of the query that comes before IN list
+    * @param suffix    IN:  Part of the query that comes after IN list
+    * @param inList    IN:  the list with IN list values
+    * @param inColumn  IN:  single column name of IN list operator
+    * @param addParens IN:  add a pair of parenthesis outside the IN lists
+    *                       e.g. "(id in (1,2,3) OR id in (4,5,6))"
+    * @param notIn     IN:  is this for building a 'NOT IN' composite clause?
+    * @return          OUT: a list of the count of IN list values that are in each of the corresponding queries
+    */
+   public static List<Integer> buildQueryWithINClauseStrings(Configuration conf, List<String> queries, StringBuilder prefix,
+       StringBuilder suffix, List<String> inList, String inColumn, boolean addParens, boolean notIn) {
+     // Get configuration parameters
+     int maxQueryLength = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH);
+     int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
+ 
+     // Check parameter set validity as a public method.
+     if (inList == null || inList.size() == 0 || maxQueryLength <= 0 || batchSize <= 0) {
+       throw new IllegalArgumentException("The IN list is empty!");
+     }
+ 
+     // Define constants and local variables.
+     int inListSize = inList.size();
+     StringBuilder buf = new StringBuilder();
+ 
+     int cursor4InListArray = 0,  // cursor for the "inList" array.
+         cursor4InClauseElements = 0,  // cursor for an element list per an 'IN'/'NOT IN'-clause.
+         cursor4queryOfInClauses = 0;  // cursor for in-clause lists per a query.
+     boolean nextItemNeeded = true;
+     boolean newInclausePrefixJustAppended = false;
+     StringBuilder nextValue = new StringBuilder("");
+     StringBuilder newInclausePrefix =
+       new StringBuilder(notIn ? " and " + inColumn + " not in (":
+ 	                        " or " + inColumn + " in (");
+     List<Integer> ret = new ArrayList<>();
+     int currentCount = 0;
+ 
+     // Loop over the given inList elements.
+     while( cursor4InListArray < inListSize || !nextItemNeeded) {
+       if (cursor4queryOfInClauses == 0) {
+         // Append prefix
+         buf.append(prefix);
+         if (addParens) {
+           buf.append("(");
+         }
+         buf.append(inColumn);
+ 
+         if (notIn) {
+           buf.append(" not in (");
+         } else {
+           buf.append(" in (");
+         }
+         cursor4queryOfInClauses++;
+         newInclausePrefixJustAppended = false;
+       }
+ 
+       // Get the next "inList" value element if needed.
+       if (nextItemNeeded) {
+         nextValue.setLength(0);
+         nextValue.append(String.valueOf(inList.get(cursor4InListArray++)));
+         nextItemNeeded = false;
+       }
+ 
+       // Compute the size of a query when the 'nextValue' is added to the current query.
+       int querySize = querySizeExpected(buf.length(), nextValue.length(), suffix.length(), addParens);
+ 
+       if (querySize > maxQueryLength * 1024) {
+         // Check an edge case where the DIRECT_SQL_MAX_QUERY_LENGTH does not allow one 'IN' clause with single value.
+         if (cursor4queryOfInClauses == 1 && cursor4InClauseElements == 0) {
+           throw new IllegalArgumentException("The current " + ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH.getVarname() + " is set too small to have one IN clause with single value!");
+         }
+ 
+         // Check en edge case to throw Exception if we can not build a single query for 'NOT IN' clause cases as mentioned at the method comments.
+         if (notIn) {
+           throw new IllegalArgumentException("The NOT IN list has too many elements for the current " + ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH.getVarname() + "!");
+         }
+ 
+         // Wrap up the current query string since we can not add another "inList" element value.
+         if (newInclausePrefixJustAppended) {
+           buf.delete(buf.length()-newInclausePrefix.length(), buf.length());
+         }
+ 
+         buf.setCharAt(buf.length() - 1, ')'); // replace the "commar" to finish a 'IN' clause string.
+ 
+         if (addParens) {
+           buf.append(")");
+         }
+ 
+         buf.append(suffix);
+         queries.add(buf.toString());
+         ret.add(currentCount);
+ 
+         // Prepare a new query string.
+         buf.setLength(0);
+         currentCount = 0;
+         cursor4queryOfInClauses = cursor4InClauseElements = 0;
+         querySize = 0;
+         newInclausePrefixJustAppended = false;
+         continue;
+       } else if (cursor4InClauseElements >= batchSize-1 && cursor4InClauseElements != 0) {
+         // Finish the current 'IN'/'NOT IN' clause and start a new clause.
+         buf.setCharAt(buf.length() - 1, ')'); // replace the "commar".
+         buf.append(newInclausePrefix.toString());
+ 
+         newInclausePrefixJustAppended = true;
+ 
+         // increment cursor for per-query IN-clause list
+         cursor4queryOfInClauses++;
+         cursor4InClauseElements = 0;
+       } else {
+         buf.append(nextValue.toString()).append(",");
+         currentCount++;
+         nextItemNeeded = true;
+         newInclausePrefixJustAppended = false;
+         // increment cursor for elements per 'IN'/'NOT IN' clause.
+         cursor4InClauseElements++;
+       }
+     }
+ 
+     // Finish the last query.
+     if (newInclausePrefixJustAppended) {
+         buf.delete(buf.length()-newInclausePrefix.length(), buf.length());
+       }
+     buf.setCharAt(buf.length() - 1, ')'); // replace the commar.
+     if (addParens) {
+       buf.append(")");
+     }
+     buf.append(suffix);
+     queries.add(buf.toString());
+     ret.add(currentCount);
+     return ret;
+   }
+ 
+   /**
+    * Compute and return the size of a query statement with the given parameters as input variables.
+    *
+    * @param sizeSoFar     size of the current contents of the buf
+    * @param sizeNextItem      size of the next 'IN' clause element value.
+    * @param suffixSize    size of the suffix for a quey statement
+    * @param addParens     Do we add an additional parenthesis?
+    */
+   private static int querySizeExpected(int sizeSoFar,
+                                        int sizeNextItem,
+                                        int suffixSize,
+                                        boolean addParens) {
+ 
+     int size = sizeSoFar + sizeNextItem + suffixSize;
+ 
+     if (addParens) {
+        size++;
+     }
+ 
+     return size;
+   }
+ }