You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:24 UTC
[15/50] [abbrv] hive git commit: HIVE-19416 : merge master into
branch (Sergey Shelukhin) 0719
http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 0000000,33f24fb..080cc52
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@@ -1,0 -1,504 +1,509 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hive.metastore.txn;
+
+ import com.google.common.annotations.VisibleForTesting;
++
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.classification.InterfaceStability;
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.hive.common.ValidTxnList;
+ import org.apache.hadoop.hive.common.ValidWriteIdList;
+ import org.apache.hadoop.hive.common.classification.RetrySemantics;
+ import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+
+ import java.sql.SQLException;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ /**
+ * A handler to answer transaction related calls that come into the metastore
+ * server.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ public interface TxnStore extends Configurable {
+
+ enum MUTEX_KEY {
+ Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
+ WriteSetCleaner, CompactionScheduler, WriteIdAllocator, MaterializationRebuild
+ }
+ // Compactor states (Should really be enum)
+ String INITIATED_RESPONSE = "initiated";
+ String WORKING_RESPONSE = "working";
+ String CLEANING_RESPONSE = "ready for cleaning";
+ String FAILED_RESPONSE = "failed";
+ String SUCCEEDED_RESPONSE = "succeeded";
+ String ATTEMPTED_RESPONSE = "attempted";
+
+ int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000;
+
+ /**
+ * Get information about open transactions. This gives extensive information about the
+ * transactions rather than just the list of transactions. This should be used when the need
+ * is to see information about the transactions (e.g. show transactions).
+ * @return information about open transactions
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
+
+ /**
+ * Get list of valid transactions. This gives just the list of transactions that are open.
+ * @return list of open transactions, as well as a high water mark.
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ GetOpenTxnsResponse getOpenTxns() throws MetaException;
+
+ /**
+ * Get the count for open transactions.
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ void countOpenTxns() throws MetaException;
+
+ /**
+ * Open a set of transactions
+ * @param rqst request to open transactions
+ * @return information on opened transactions
+ * @throws MetaException
+ */
+ @RetrySemantics.Idempotent
+ OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
+
+ @RetrySemantics.Idempotent
+ long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException;
+
+ /**
+ * Abort (rollback) a transaction.
+ * @param rqst info on transaction to abort
+ * @throws NoSuchTxnException
+ * @throws MetaException
+ */
+ @RetrySemantics.Idempotent
+ void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException;
+
+ /**
+ * Abort (rollback) a list of transactions in one request.
+ * @param rqst info on transactions to abort
+ * @throws NoSuchTxnException
+ * @throws MetaException
+ */
+ @RetrySemantics.Idempotent
+ void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException;
+
+ /**
+ * Commit a transaction
+ * @param rqst info on transaction to commit
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ @RetrySemantics.Idempotent
+ void commitTxn(CommitTxnRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+ * @param rqst info on table/partitions and writeid snapshot to replicate.
+ * @throws MetaException in case of failure
+ */
+ @RetrySemantics.Idempotent
+ void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException;
+
+ /**
+ * Get invalidation info for the materialization. Currently, the materialization information
+ * only contains information about whether there was update/delete operations on the source
+ * tables used by the materialization since it was created.
+ * @param cm creation metadata for the materialization
+ * @param validTxnList valid transaction list for snapshot taken for current query
+ * @throws MetaException
+ */
+ @RetrySemantics.Idempotent
+ Materialization getMaterializationInvalidationInfo(
+ final CreationMetadata cm, final String validTxnList)
+ throws MetaException;
+
++ @RetrySemantics.ReadOnly
++ long getTxnIdForWriteId(String dbName, String tblName, long writeId)
++ throws MetaException;
++
+ LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId)
+ throws MetaException;
+
+ boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId)
+ throws MetaException;
+
+ long cleanupMaterializationRebuildLocks(ValidTxnList validTxnList, long timeout)
+ throws MetaException;
+
+ /**
+ * Gets the list of valid write ids for the given table wrt to current txn
+ * @param rqst info on transaction and list of table names associated with given transaction
+ * @throws NoSuchTxnException
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
+ throws NoSuchTxnException, MetaException;
+
+ /**
+ * Allocate a write ID for the given table and associate it with a transaction
+ * @param rqst info on transaction and table to allocate write id
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Called on conversion of existing table to full acid. Sets initial write ID to a high
+ * enough value so that we can assign unique ROW__IDs to data in existing files.
+ */
+ void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) throws MetaException;
+
+ /**
+ * Obtain a lock.
+ * @param rqst information on the lock to obtain. If the requester is part of a transaction
+ * the txn information must be included in the lock request.
+ * @return info on the lock, including whether it was obtained.
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ @RetrySemantics.CannotRetry
+ LockResponse lock(LockRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait
+ * state.
+ * @param rqst info on the lock to check
+ * @return info on the state of the lock
+ * @throws NoSuchTxnException
+ * @throws NoSuchLockException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ @RetrySemantics.SafeToRetry
+ LockResponse checkLock(CheckLockRequest rqst)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+
+ /**
+ * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case
+ * the txn should be committed or aborted instead. (Note someday this will change since
+ * multi-statement transactions will allow unlocking in the transaction.)
+ * @param rqst lock to unlock
+ * @throws NoSuchLockException
+ * @throws TxnOpenException
+ * @throws MetaException
+ */
+ @RetrySemantics.Idempotent
+ void unlock(UnlockRequest rqst)
+ throws NoSuchLockException, TxnOpenException, MetaException;
+
+ /**
+ * Get information on current locks.
+ * @param rqst lock information to retrieve
+ * @return lock information.
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException;
+
+ /**
+ * Send a heartbeat for a lock or a transaction
+ * @param ids lock and/or txn id to heartbeat
+ * @throws NoSuchTxnException
+ * @throws NoSuchLockException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ @RetrySemantics.SafeToRetry
+ void heartbeat(HeartbeatRequest ids)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+
+ /**
+ * Heartbeat a group of transactions together
+ * @param rqst set of transactions to heartbat
+ * @return info on txns that were heartbeated
+ * @throws MetaException
+ */
+ @RetrySemantics.SafeToRetry
+ HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
+ throws MetaException;
+
+ /**
+ * Submit a compaction request into the queue. This is called when a user manually requests a
+ * compaction.
+ * @param rqst information on what to compact
+ * @return id of the compaction that has been started or existing id if this resource is already scheduled
+ * @throws MetaException
+ */
+ @RetrySemantics.Idempotent
+ CompactionResponse compact(CompactionRequest rqst) throws MetaException;
+
+ /**
+ * Show list of current compactions.
+ * @param rqst info on which compactions to show
+ * @return compaction information
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException;
+
+ /**
+ * Add information on a set of dynamic partitions that participated in a transaction.
+ * @param rqst dynamic partition info.
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ @RetrySemantics.SafeToRetry
+ void addDynamicPartitions(AddDynamicPartitions rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+ /**
+ * Clean up corresponding records in metastore tables.
+ * @param type Hive object type
+ * @param db database object
+ * @param table table object
+ * @param partitionIterator partition iterator
+ * @throws MetaException
+ */
+ @RetrySemantics.Idempotent
+ void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator) throws MetaException;
+
+ @RetrySemantics.Idempotent
+ void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName,
+ String newCatName, String newDbName, String newTabName, String newPartName)
+ throws MetaException;
+
+ /**
+ * Timeout transactions and/or locks. This should only be called by the compactor.
+ */
+ @RetrySemantics.Idempotent
+ void performTimeOuts();
+
+ /**
+ * This will look through the completed_txn_components table and look for partitions or tables
+ * that may be ready for compaction. Also, look through txns and txn_components tables for
+ * aborted transactions that we should add to the list.
+ * @param maxAborted Maximum number of aborted queries to allow before marking this as a
+ * potential compaction.
+ * @return list of CompactionInfo structs. These will not have id, type,
+ * or runAs set since these are only potential compactions not actual ones.
+ */
+ @RetrySemantics.ReadOnly
+ Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
+
+ /**
+ * Sets the user to run as. This is for the case
+ * where the request was generated by the user and so the worker must set this value later.
+ * @param cq_id id of this entry in the queue
+ * @param user user to run the jobs as
+ */
+ @RetrySemantics.Idempotent
+ void setRunAs(long cq_id, String user) throws MetaException;
+
+ /**
+ * This will grab the next compaction request off of
+ * the queue, and assign it to the worker.
+ * @param workerId id of the worker calling this, will be recorded in the db
+ * @return an info element for this compaction request, or null if there is no work to do now.
+ */
+ @RetrySemantics.ReadOnly
+ CompactionInfo findNextToCompact(String workerId) throws MetaException;
+
+ /**
+ * This will mark an entry in the queue as compacted
+ * and put it in the ready to clean state.
+ * @param info info on the compaction entry to mark as compacted.
+ */
+ @RetrySemantics.SafeToRetry
+ void markCompacted(CompactionInfo info) throws MetaException;
+
+ /**
+ * Find entries in the queue that are ready to
+ * be cleaned.
+ * @return information on the entry in the queue.
+ */
+ @RetrySemantics.ReadOnly
+ List<CompactionInfo> findReadyToClean() throws MetaException;
+
+ /**
+ * This will remove an entry from the queue after
+ * it has been compacted.
+ *
+ * @param info info on the compaction entry to remove
+ */
+ @RetrySemantics.CannotRetry
+ void markCleaned(CompactionInfo info) throws MetaException;
+
+ /**
+ * Mark a compaction entry as failed. This will move it to the compaction history queue with a
+ * failed status. It will NOT clean up aborted transactions in the table/partition associated
+ * with this compaction.
+ * @param info information on the compaction that failed.
+ * @throws MetaException
+ */
+ @RetrySemantics.CannotRetry
+ void markFailed(CompactionInfo info) throws MetaException;
+
+ /**
+ * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
+ * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)).
+ */
+ @RetrySemantics.SafeToRetry
+ void cleanTxnToWriteIdTable() throws MetaException;
+
+ /**
+ * Clean up aborted transactions from txns that have no components in txn_components. The reson such
+ * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
+ * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+ */
+ @RetrySemantics.SafeToRetry
+ void cleanEmptyAbortedTxns() throws MetaException;
+
+ /**
+ * This will take all entries assigned to workers
+ * on a host return them to INITIATED state. The initiator should use this at start up to
+ * clean entries from any workers that were in the middle of compacting when the metastore
+ * shutdown. It does not reset entries from worker threads on other hosts as those may still
+ * be working.
+ * @param hostname Name of this host. It is assumed this prefixes the thread's worker id,
+ * so that like hostname% will match the worker id.
+ */
+ @RetrySemantics.Idempotent
+ void revokeFromLocalWorkers(String hostname) throws MetaException;
+
+ /**
+ * This call will return all compaction queue
+ * entries assigned to a worker but over the timeout back to the initiated state.
+ * This should be called by the initiator on start up and occasionally when running to clean up
+ * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called
+ * first.
+ * @param timeout number of milliseconds since start time that should elapse before a worker is
+ * declared dead.
+ */
+ @RetrySemantics.Idempotent
+ void revokeTimedoutWorkers(long timeout) throws MetaException;
+
+ /**
+ * Queries metastore DB directly to find columns in the table which have statistics information.
+ * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+ * table level stats are examined.
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
+
+ /**
+ * Record the highest write id that the {@code ci} compaction job will pay attention to.
+ */
+ @RetrySemantics.Idempotent
+ void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException;
+
+ /**
+ * For any given compactable entity (partition, table if not partitioned) the history of compactions
+ * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
+ * history such that a configurable number of each type of state is present. Any other entries
+ * can be purged. This scheme has advantage of always retaining the last failure/success even if
+ * it's not recent.
+ * @throws MetaException
+ */
+ @RetrySemantics.SafeToRetry
+ void purgeCompactionHistory() throws MetaException;
+
+ /**
+ * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the
+ * transaction metadata once it becomes unnecessary.
+ */
+ @RetrySemantics.SafeToRetry
+ void performWriteSetGC();
+
+ /**
+ * Determine if there are enough consecutive failures compacting a table or partition that no
+ * new automatic compactions should be scheduled. User initiated compactions do not do this
+ * check.
+ * @param ci Table or partition to check.
+ * @return true if it is ok to compact, false if there have been too many failures.
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
+
+ @VisibleForTesting
+ int numLocksInLockTable() throws SQLException, MetaException;
+
+ @VisibleForTesting
+ long setTimeout(long milliseconds);
+
+ @RetrySemantics.Idempotent
+ MutexAPI getMutexAPI();
+
+ /**
+ * This is primarily designed to provide coarse grained mutex support to operations running
+ * inside the Metastore (of which there could be several instances). The initial goal is to
+ * ensure that various sub-processes of the Compactor don't step on each other.
+ *
+ * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly.
+ */
+ interface MutexAPI {
+ /**
+ * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns
+ * a handle which must be used to release the lock. Each invocation returns a new handle.
+ */
+ LockHandle acquireLock(String key) throws MetaException;
+
+ /**
+ * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This
+ * will associate the lock on {@code key} with the same handle. All locks associated with
+ * the same handle will be released together.
+ * @param handle not NULL
+ */
+ void acquireLock(String key, LockHandle handle) throws MetaException;
+ interface LockHandle {
+ /**
+ * Releases all locks associated with this handle.
+ */
+ void releaseLocks();
+ }
+ }
+
+ /**
+ * Once a {@link java.util.concurrent.ThreadPoolExecutor} Worker submits a job to the cluster,
+ * it calls this to update the metadata.
+ * @param id {@link CompactionInfo#id}
+ */
+ @RetrySemantics.Idempotent
+ void setHadoopJobId(String hadoopJobId, long id);
+
+ /**
+ * Add the ACID write event information to writeNotificationLog table.
+ * @param acidWriteEvent
+ */
+ @RetrySemantics.Idempotent
+ void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException;
+ }
http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 0000000,fa291d5..aac5811
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@@ -1,0 -1,471 +1,481 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hive.metastore.txn;
+
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+ import org.apache.hadoop.hive.common.ValidReadTxnList;
+ import org.apache.hadoop.hive.common.ValidTxnList;
+ import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+ import org.apache.hadoop.hive.common.ValidWriteIdList;
+ import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
++import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import java.util.Collections;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.BitSet;
+ import java.util.List;
+ import java.util.Map;
+
+ public class TxnUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
+
++ // Transactional stats states
++ static final public char STAT_OPEN = 'o';
++ static final public char STAT_INVALID = 'i';
++ static final public char STAT_COMMITTED = 'c';
++ static final public char STAT_OBSOLETE = 's';
++
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
+ * read the files, and thus treats both open and aborted transactions as invalid.
+ * @param txns txn list from the metastore
+ * @param currentTxn Current transaction that the user has open. If this is greater than 0 it
+ * will be removed from the exceptions list so that the user sees his own
+ * transaction as valid.
+ * @return a valid txn list.
+ */
+ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+ /*
+ * The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
+ * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
+ * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should
+ * include the latest committed set.
+ */
+ long highWaterMark = (currentTxn > 0) ? Math.min(currentTxn, txns.getTxn_high_water_mark())
+ : txns.getTxn_high_water_mark();
+
+ // Open txns are already sorted in ascending order. This list may or may not include HWM
+ // but it is guaranteed that list won't have txn > HWM. But, if we overwrite the HWM with currentTxn
+ // then need to truncate the exceptions list accordingly.
+ List<Long> openTxns = txns.getOpen_txns();
+
+ // We care only about open/aborted txns below currentTxn and hence the size should be determined
+ // for the exceptions list. The currentTxn will be missing in openTxns list only in rare case like
+ // txn is aborted by AcidHouseKeeperService and compactor actually cleans up the aborted txns.
+ // So, for such cases, we get negative value for sizeToHwm with found position for currentTxn, and so,
+ // we just negate it to get the size.
+ int sizeToHwm = (currentTxn > 0) ? Collections.binarySearch(openTxns, currentTxn) : openTxns.size();
+ sizeToHwm = (sizeToHwm < 0) ? (-sizeToHwm) : sizeToHwm;
+ long[] exceptions = new long[sizeToHwm];
+ BitSet inAbortedBits = BitSet.valueOf(txns.getAbortedBits());
+ BitSet outAbortedBits = new BitSet();
+ long minOpenTxnId = Long.MAX_VALUE;
+ int i = 0;
+ for (long txn : openTxns) {
+ // For snapshot isolation, we don't care about txns greater than current txn and so stop here.
+ // Also, we need not include current txn to exceptions list.
+ if ((currentTxn > 0) && (txn >= currentTxn)) {
+ break;
+ }
+ if (inAbortedBits.get(i)) {
+ outAbortedBits.set(i);
+ } else if (minOpenTxnId == Long.MAX_VALUE) {
+ minOpenTxnId = txn;
+ }
+ exceptions[i++] = txn;
+ }
+ return new ValidReadTxnList(exceptions, outAbortedBits, highWaterMark, minOpenTxnId);
+ }
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}. This assumes that the caller intends to
+ * read the files, and thus treats both open and aborted transactions as invalid.
+ * @param currentTxnId current txn ID for which we get the valid write ids list
+ * @param list valid write ids list from the metastore
+ * @return a valid write IDs list for the whole transaction.
+ */
+ public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId,
+ List<TableValidWriteIds> validIds) {
+ ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId);
+ for (TableValidWriteIds tableWriteIds : validIds) {
+ validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds));
+ }
+ return validTxnWriteIdList;
+ }
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
+ * {@link org.apache.hadoop.hive.common.ValidReaderWriteIdList}. This assumes that the caller intends to
+ * read the files, and thus treats both open and aborted write ids as invalid.
+ * @param tableWriteIds valid write ids for the given table from the metastore
+ * @return a valid write IDs list for the input table
+ */
+ public static ValidReaderWriteIdList createValidReaderWriteIdList(TableValidWriteIds tableWriteIds) {
+ String fullTableName = tableWriteIds.getFullTableName();
+ long highWater = tableWriteIds.getWriteIdHighWaterMark();
+ List<Long> invalids = tableWriteIds.getInvalidWriteIds();
+ BitSet abortedBits = BitSet.valueOf(tableWriteIds.getAbortedBits());
+ long[] exceptions = new long[invalids.size()];
+ int i = 0;
+ for (long writeId : invalids) {
+ exceptions[i++] = writeId;
+ }
+ if (tableWriteIds.isSetMinOpenWriteId()) {
+ return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater,
+ tableWriteIds.getMinOpenWriteId());
+ } else {
+ return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater);
+ }
+ }
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
+ * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}. This assumes that the caller intends to
+ * compact the files, and thus treats only open transactions/write ids as invalid. Additionally any
+ * writeId > highestOpenWriteId is also invalid. This is to avoid creating something like
+ * delta_17_120 where writeId 80, for example, is still open.
+ * @param tableValidWriteIds table write id list from the metastore
+ * @return a valid write id list.
+ */
+ public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValidWriteIds tableValidWriteIds) {
+ String fullTableName = tableValidWriteIds.getFullTableName();
+ long highWater = tableValidWriteIds.getWriteIdHighWaterMark();
+ long minOpenWriteId = Long.MAX_VALUE;
+ List<Long> invalids = tableValidWriteIds.getInvalidWriteIds();
+ BitSet abortedBits = BitSet.valueOf(tableValidWriteIds.getAbortedBits());
+ long[] exceptions = new long[invalids.size()];
+ int i = 0;
+ for (long writeId : invalids) {
+ if (abortedBits.get(i)) {
+ // Only need aborted since we don't consider anything above minOpenWriteId
+ exceptions[i++] = writeId;
+ } else {
+ minOpenWriteId = Math.min(minOpenWriteId, writeId);
+ }
+ }
+ if(i < exceptions.length) {
+ exceptions = Arrays.copyOf(exceptions, i);
+ }
+ highWater = minOpenWriteId == Long.MAX_VALUE ? highWater : minOpenWriteId - 1;
+ BitSet bitSet = new BitSet(exceptions.length);
+ bitSet.set(0, exceptions.length); // for ValidCompactorWriteIdList, everything in exceptions are aborted
+ if (minOpenWriteId == Long.MAX_VALUE) {
+ return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater);
+ } else {
+ return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater, minOpenWriteId);
+ }
+ }
+
+ public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) {
+ // This is based on the existing valid write ID list that was built for a select query;
+ // therefore we assume all the aborted txns, etc. were already accounted for.
+ // All we do is adjust the high watermark to only include contiguous txns.
+ Long minOpenWriteId = ids.getMinOpenWriteId();
+ if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) {
+ return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1);
+ }
+ return ids;
+ }
+
+ /**
+ * Get an instance of the TxnStore that is appropriate for this store
+ * @param conf configuration
+ * @return txn store
+ */
+ public static TxnStore getTxnStore(Configuration conf) {
+ String className = MetastoreConf.getVar(conf, ConfVars.TXN_STORE_IMPL);
+ try {
+ TxnStore handler = JavaUtils.getClass(className, TxnStore.class).newInstance();
+ handler.setConf(conf);
+ return handler;
+ } catch (Exception e) {
+ LOG.error("Unable to instantiate raw store directly in fastpath mode", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Note, users are responsible for using the correct TxnManager. We do not look at
+ * SessionState.get().getTxnMgr().supportsAcid() here
+ * Should produce the same result as
+ * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)}.
+ * @return true if table is a transactional table, false otherwise
+ */
+ public static boolean isTransactionalTable(Table table) {
+ if (table == null) {
+ return false;
+ }
+ Map<String, String> parameters = table.getParameters();
+ String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
++ return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
++ }
++
++ public static boolean isTransactionalTable(Map<String, String> parameters) {
++ if (parameters == null) {
++ return false;
++ }
++ String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
+ }
+
+ /**
+ * Should produce the same result as
+ * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)}.
+ */
+ public static boolean isAcidTable(Table table) {
+ return TxnUtils.isTransactionalTable(table) &&
+ TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY.equals(table.getParameters()
+ .get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES));
+ }
+
+ /**
+ * Should produce the result as <dbName>.<tableName>.
+ */
+ public static String getFullTableName(String dbName, String tableName) {
+ return dbName.toLowerCase() + "." + tableName.toLowerCase();
+ }
+
+ public static String[] getDbTableName(String fullTableName) {
+ return fullTableName.split("\\.");
+ }
+
+
+
+ /**
+ * Build a query (or queries if one query is too big but only for the case of 'IN'
+ * composite clause. For the case of 'NOT IN' clauses, multiple queries change
+ * the semantics of the intended query.
+ * E.g., Let's assume that input "inList" parameter has [5, 6] and that
+ * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause,
+ * Then having two delete statements changes the semantics of the inteneded SQL statement.
+ * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence
+ * is not equal to 'delete from T where a not in (5, 6)'.)
+ * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters.
+ *
+ * Note that this method currently support only single column for
+ * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and
+ * AND-based composite 'NOT IN' clause.
+ * For example, for 'IN' clause case, the method will build a query with OR.
+ * E.g., "id in (1,2,3) OR id in (4,5,6)".
+ * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND.
+ *
+ * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN'
+ * clauses in a query".
+ *
+ * @param queries OUT: Array of query strings
+ * @param prefix IN: Part of the query that comes before IN list
+ * @param suffix IN: Part of the query that comes after IN list
+ * @param inList IN: the list with IN list values
+ * @param inColumn IN: single column name of IN list operator
+ * @param addParens IN: add a pair of parenthesis outside the IN lists
+ * e.g. "(id in (1,2,3) OR id in (4,5,6))"
+ * @param notIn IN: is this for building a 'NOT IN' composite clause?
+ * @return OUT: a list of the count of IN list values that are in each of the corresponding queries
+ */
+ public static List<Integer> buildQueryWithINClause(Configuration conf,
+ List<String> queries,
+ StringBuilder prefix,
+ StringBuilder suffix,
+ List<Long> inList,
+ String inColumn,
+ boolean addParens,
+ boolean notIn) {
+ List<String> inListStrings = new ArrayList<>(inList.size());
+ for (Long aLong : inList) {
+ inListStrings.add(aLong.toString());
+ }
+ return buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
+ inListStrings, inColumn, addParens, notIn);
+
+ }
+ /**
+ * Build a query (or queries if one query is too big but only for the case of 'IN'
+ * composite clause. For the case of 'NOT IN' clauses, multiple queries change
+ * the semantics of the intended query.
+ * E.g., Let's assume that input "inList" parameter has [5, 6] and that
+ * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause,
+ * Then having two delete statements changes the semantics of the inteneded SQL statement.
+ * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence
+ * is not equal to 'delete from T where a not in (5, 6)'.)
+ * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters.
+ *
+ * Note that this method currently support only single column for
+ * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and
+ * AND-based composite 'NOT IN' clause.
+ * For example, for 'IN' clause case, the method will build a query with OR.
+ * E.g., "id in (1,2,3) OR id in (4,5,6)".
+ * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND.
+ *
+ * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN'
+ * clauses in a query".
+ *
+ * @param queries OUT: Array of query strings
+ * @param prefix IN: Part of the query that comes before IN list
+ * @param suffix IN: Part of the query that comes after IN list
+ * @param inList IN: the list with IN list values
+ * @param inColumn IN: single column name of IN list operator
+ * @param addParens IN: add a pair of parenthesis outside the IN lists
+ * e.g. "(id in (1,2,3) OR id in (4,5,6))"
+ * @param notIn IN: is this for building a 'NOT IN' composite clause?
+ * @return OUT: a list of the count of IN list values that are in each of the corresponding queries
+ */
+ public static List<Integer> buildQueryWithINClauseStrings(Configuration conf, List<String> queries, StringBuilder prefix,
+ StringBuilder suffix, List<String> inList, String inColumn, boolean addParens, boolean notIn) {
+ // Get configuration parameters
+ int maxQueryLength = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH);
+ int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
+
+ // Check parameter set validity as a public method.
+ if (inList == null || inList.size() == 0 || maxQueryLength <= 0 || batchSize <= 0) {
+ throw new IllegalArgumentException("The IN list is empty!");
+ }
+
+ // Define constants and local variables.
+ int inListSize = inList.size();
+ StringBuilder buf = new StringBuilder();
+
+ int cursor4InListArray = 0, // cursor for the "inList" array.
+ cursor4InClauseElements = 0, // cursor for an element list per an 'IN'/'NOT IN'-clause.
+ cursor4queryOfInClauses = 0; // cursor for in-clause lists per a query.
+ boolean nextItemNeeded = true;
+ boolean newInclausePrefixJustAppended = false;
+ StringBuilder nextValue = new StringBuilder("");
+ StringBuilder newInclausePrefix =
+ new StringBuilder(notIn ? " and " + inColumn + " not in (":
+ " or " + inColumn + " in (");
+ List<Integer> ret = new ArrayList<>();
+ int currentCount = 0;
+
+ // Loop over the given inList elements.
+ while( cursor4InListArray < inListSize || !nextItemNeeded) {
+ if (cursor4queryOfInClauses == 0) {
+ // Append prefix
+ buf.append(prefix);
+ if (addParens) {
+ buf.append("(");
+ }
+ buf.append(inColumn);
+
+ if (notIn) {
+ buf.append(" not in (");
+ } else {
+ buf.append(" in (");
+ }
+ cursor4queryOfInClauses++;
+ newInclausePrefixJustAppended = false;
+ }
+
+ // Get the next "inList" value element if needed.
+ if (nextItemNeeded) {
+ nextValue.setLength(0);
+ nextValue.append(String.valueOf(inList.get(cursor4InListArray++)));
+ nextItemNeeded = false;
+ }
+
+ // Compute the size of a query when the 'nextValue' is added to the current query.
+ int querySize = querySizeExpected(buf.length(), nextValue.length(), suffix.length(), addParens);
+
+ if (querySize > maxQueryLength * 1024) {
+ // Check an edge case where the DIRECT_SQL_MAX_QUERY_LENGTH does not allow one 'IN' clause with single value.
+ if (cursor4queryOfInClauses == 1 && cursor4InClauseElements == 0) {
+ throw new IllegalArgumentException("The current " + ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH.getVarname() + " is set too small to have one IN clause with single value!");
+ }
+
+ // Check en edge case to throw Exception if we can not build a single query for 'NOT IN' clause cases as mentioned at the method comments.
+ if (notIn) {
+ throw new IllegalArgumentException("The NOT IN list has too many elements for the current " + ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH.getVarname() + "!");
+ }
+
+ // Wrap up the current query string since we can not add another "inList" element value.
+ if (newInclausePrefixJustAppended) {
+ buf.delete(buf.length()-newInclausePrefix.length(), buf.length());
+ }
+
+ buf.setCharAt(buf.length() - 1, ')'); // replace the "commar" to finish a 'IN' clause string.
+
+ if (addParens) {
+ buf.append(")");
+ }
+
+ buf.append(suffix);
+ queries.add(buf.toString());
+ ret.add(currentCount);
+
+ // Prepare a new query string.
+ buf.setLength(0);
+ currentCount = 0;
+ cursor4queryOfInClauses = cursor4InClauseElements = 0;
+ querySize = 0;
+ newInclausePrefixJustAppended = false;
+ continue;
+ } else if (cursor4InClauseElements >= batchSize-1 && cursor4InClauseElements != 0) {
+ // Finish the current 'IN'/'NOT IN' clause and start a new clause.
+ buf.setCharAt(buf.length() - 1, ')'); // replace the "commar".
+ buf.append(newInclausePrefix.toString());
+
+ newInclausePrefixJustAppended = true;
+
+ // increment cursor for per-query IN-clause list
+ cursor4queryOfInClauses++;
+ cursor4InClauseElements = 0;
+ } else {
+ buf.append(nextValue.toString()).append(",");
+ currentCount++;
+ nextItemNeeded = true;
+ newInclausePrefixJustAppended = false;
+ // increment cursor for elements per 'IN'/'NOT IN' clause.
+ cursor4InClauseElements++;
+ }
+ }
+
+ // Finish the last query.
+ if (newInclausePrefixJustAppended) {
+ buf.delete(buf.length()-newInclausePrefix.length(), buf.length());
+ }
+ buf.setCharAt(buf.length() - 1, ')'); // replace the commar.
+ if (addParens) {
+ buf.append(")");
+ }
+ buf.append(suffix);
+ queries.add(buf.toString());
+ ret.add(currentCount);
+ return ret;
+ }
+
+ /**
+ * Compute and return the size of a query statement with the given parameters as input variables.
+ *
+ * @param sizeSoFar size of the current contents of the buf
+ * @param sizeNextItem size of the next 'IN' clause element value.
+ * @param suffixSize size of the suffix for a quey statement
+ * @param addParens Do we add an additional parenthesis?
+ */
+ private static int querySizeExpected(int sizeSoFar,
+ int sizeNextItem,
+ int suffixSize,
+ boolean addParens) {
+
+ int size = sizeSoFar + sizeNextItem + suffixSize;
+
+ if (addParens) {
+ size++;
+ }
+
+ return size;
+ }
+ }