You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/07/19 19:55:13 UTC
[14/51] [partial] hive git commit: HIVE-20188 : Split server-specific
code outside of standalone metastore-common (Alexander Kolbasov reviewed by
Vihang Karajgaonkar)
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
deleted file mode 100644
index 33f24fb..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.common.classification.RetrySemantics;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
-
-import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A handler to answer transaction related calls that come into the metastore
- * server.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface TxnStore extends Configurable {
-
- enum MUTEX_KEY {
- Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
- WriteSetCleaner, CompactionScheduler, WriteIdAllocator, MaterializationRebuild
- }
- // Compactor states (Should really be enum)
- String INITIATED_RESPONSE = "initiated";
- String WORKING_RESPONSE = "working";
- String CLEANING_RESPONSE = "ready for cleaning";
- String FAILED_RESPONSE = "failed";
- String SUCCEEDED_RESPONSE = "succeeded";
- String ATTEMPTED_RESPONSE = "attempted";
-
- int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000;
-
- /**
- * Get information about open transactions. This gives extensive information about the
- * transactions rather than just the list of transactions. This should be used when the need
- * is to see information about the transactions (e.g. show transactions).
- * @return information about open transactions
- * @throws MetaException
- */
- @RetrySemantics.ReadOnly
- GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
-
- /**
- * Get list of valid transactions. This gives just the list of transactions that are open.
- * @return list of open transactions, as well as a high water mark.
- * @throws MetaException
- */
- @RetrySemantics.ReadOnly
- GetOpenTxnsResponse getOpenTxns() throws MetaException;
-
- /**
- * Get the count for open transactions.
- * @throws MetaException
- */
- @RetrySemantics.ReadOnly
- void countOpenTxns() throws MetaException;
-
- /**
- * Open a set of transactions
- * @param rqst request to open transactions
- * @return information on opened transactions
- * @throws MetaException
- */
- @RetrySemantics.Idempotent
- OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
-
- @RetrySemantics.Idempotent
- long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException;
-
- /**
- * Abort (rollback) a transaction.
- * @param rqst info on transaction to abort
- * @throws NoSuchTxnException
- * @throws MetaException
- */
- @RetrySemantics.Idempotent
- void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException;
-
- /**
- * Abort (rollback) a list of transactions in one request.
- * @param rqst info on transactions to abort
- * @throws NoSuchTxnException
- * @throws MetaException
- */
- @RetrySemantics.Idempotent
- void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException;
-
- /**
- * Commit a transaction
- * @param rqst info on transaction to commit
- * @throws NoSuchTxnException
- * @throws TxnAbortedException
- * @throws MetaException
- */
- @RetrySemantics.Idempotent
- void commitTxn(CommitTxnRequest rqst)
- throws NoSuchTxnException, TxnAbortedException, MetaException;
-
- /**
- * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
- * @param rqst info on table/partitions and writeid snapshot to replicate.
- * @throws MetaException in case of failure
- */
- @RetrySemantics.Idempotent
- void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException;
-
- /**
- * Get invalidation info for the materialization. Currently, the materialization information
- * only contains information about whether there was update/delete operations on the source
- * tables used by the materialization since it was created.
- * @param cm creation metadata for the materialization
- * @param validTxnList valid transaction list for snapshot taken for current query
- * @throws MetaException
- */
- @RetrySemantics.Idempotent
- Materialization getMaterializationInvalidationInfo(
- final CreationMetadata cm, final String validTxnList)
- throws MetaException;
-
- LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId)
- throws MetaException;
-
- boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId)
- throws MetaException;
-
- long cleanupMaterializationRebuildLocks(ValidTxnList validTxnList, long timeout)
- throws MetaException;
-
- /**
- * Gets the list of valid write ids for the given table wrt to current txn
- * @param rqst info on transaction and list of table names associated with given transaction
- * @throws NoSuchTxnException
- * @throws MetaException
- */
- @RetrySemantics.ReadOnly
- GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
- throws NoSuchTxnException, MetaException;
-
- /**
- * Allocate a write ID for the given table and associate it with a transaction
- * @param rqst info on transaction and table to allocate write id
- * @throws NoSuchTxnException
- * @throws TxnAbortedException
- * @throws MetaException
- */
- AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
- throws NoSuchTxnException, TxnAbortedException, MetaException;
-
- /**
- * Called on conversion of existing table to full acid. Sets initial write ID to a high
- * enough value so that we can assign unique ROW__IDs to data in existing files.
- */
- void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) throws MetaException;
-
- /**
- * Obtain a lock.
- * @param rqst information on the lock to obtain. If the requester is part of a transaction
- * the txn information must be included in the lock request.
- * @return info on the lock, including whether it was obtained.
- * @throws NoSuchTxnException
- * @throws TxnAbortedException
- * @throws MetaException
- */
- @RetrySemantics.CannotRetry
- LockResponse lock(LockRequest rqst)
- throws NoSuchTxnException, TxnAbortedException, MetaException;
-
- /**
- * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait
- * state.
- * @param rqst info on the lock to check
- * @return info on the state of the lock
- * @throws NoSuchTxnException
- * @throws NoSuchLockException
- * @throws TxnAbortedException
- * @throws MetaException
- */
- @RetrySemantics.SafeToRetry
- LockResponse checkLock(CheckLockRequest rqst)
- throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
-
- /**
- * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case
- * the txn should be committed or aborted instead. (Note someday this will change since
- * multi-statement transactions will allow unlocking in the transaction.)
- * @param rqst lock to unlock
- * @throws NoSuchLockException
- * @throws TxnOpenException
- * @throws MetaException
- */
- @RetrySemantics.Idempotent
- void unlock(UnlockRequest rqst)
- throws NoSuchLockException, TxnOpenException, MetaException;
-
- /**
- * Get information on current locks.
- * @param rqst lock information to retrieve
- * @return lock information.
- * @throws MetaException
- */
- @RetrySemantics.ReadOnly
- ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException;
-
- /**
- * Send a heartbeat for a lock or a transaction
- * @param ids lock and/or txn id to heartbeat
- * @throws NoSuchTxnException
- * @throws NoSuchLockException
- * @throws TxnAbortedException
- * @throws MetaException
- */
- @RetrySemantics.SafeToRetry
- void heartbeat(HeartbeatRequest ids)
- throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
-
- /**
- * Heartbeat a group of transactions together
- * @param rqst set of transactions to heartbat
- * @return info on txns that were heartbeated
- * @throws MetaException
- */
- @RetrySemantics.SafeToRetry
- HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
- throws MetaException;
-
- /**
- * Submit a compaction request into the queue. This is called when a user manually requests a
- * compaction.
- * @param rqst information on what to compact
- * @return id of the compaction that has been started or existing id if this resource is already scheduled
- * @throws MetaException
- */
- @RetrySemantics.Idempotent
- CompactionResponse compact(CompactionRequest rqst) throws MetaException;
-
- /**
- * Show list of current compactions.
- * @param rqst info on which compactions to show
- * @return compaction information
- * @throws MetaException
- */
- @RetrySemantics.ReadOnly
- ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException;
-
- /**
- * Add information on a set of dynamic partitions that participated in a transaction.
- * @param rqst dynamic partition info.
- * @throws NoSuchTxnException
- * @throws TxnAbortedException
- * @throws MetaException
- */
- @RetrySemantics.SafeToRetry
- void addDynamicPartitions(AddDynamicPartitions rqst)
- throws NoSuchTxnException, TxnAbortedException, MetaException;
-
- /**
- * Clean up corresponding records in metastore tables.
- * @param type Hive object type
- * @param db database object
- * @param table table object
- * @param partitionIterator partition iterator
- * @throws MetaException
- */
- @RetrySemantics.Idempotent
- void cleanupRecords(HiveObjectType type, Database db, Table table,
- Iterator<Partition> partitionIterator) throws MetaException;
-
- @RetrySemantics.Idempotent
- void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName,
- String newCatName, String newDbName, String newTabName, String newPartName)
- throws MetaException;
-
- /**
- * Timeout transactions and/or locks. This should only be called by the compactor.
- */
- @RetrySemantics.Idempotent
- void performTimeOuts();
-
- /**
- * This will look through the completed_txn_components table and look for partitions or tables
- * that may be ready for compaction. Also, look through txns and txn_components tables for
- * aborted transactions that we should add to the list.
- * @param maxAborted Maximum number of aborted queries to allow before marking this as a
- * potential compaction.
- * @return list of CompactionInfo structs. These will not have id, type,
- * or runAs set since these are only potential compactions not actual ones.
- */
- @RetrySemantics.ReadOnly
- Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
-
- /**
- * Sets the user to run as. This is for the case
- * where the request was generated by the user and so the worker must set this value later.
- * @param cq_id id of this entry in the queue
- * @param user user to run the jobs as
- */
- @RetrySemantics.Idempotent
- void setRunAs(long cq_id, String user) throws MetaException;
-
- /**
- * This will grab the next compaction request off of
- * the queue, and assign it to the worker.
- * @param workerId id of the worker calling this, will be recorded in the db
- * @return an info element for this compaction request, or null if there is no work to do now.
- */
- @RetrySemantics.ReadOnly
- CompactionInfo findNextToCompact(String workerId) throws MetaException;
-
- /**
- * This will mark an entry in the queue as compacted
- * and put it in the ready to clean state.
- * @param info info on the compaction entry to mark as compacted.
- */
- @RetrySemantics.SafeToRetry
- void markCompacted(CompactionInfo info) throws MetaException;
-
- /**
- * Find entries in the queue that are ready to
- * be cleaned.
- * @return information on the entry in the queue.
- */
- @RetrySemantics.ReadOnly
- List<CompactionInfo> findReadyToClean() throws MetaException;
-
- /**
- * This will remove an entry from the queue after
- * it has been compacted.
- *
- * @param info info on the compaction entry to remove
- */
- @RetrySemantics.CannotRetry
- void markCleaned(CompactionInfo info) throws MetaException;
-
- /**
- * Mark a compaction entry as failed. This will move it to the compaction history queue with a
- * failed status. It will NOT clean up aborted transactions in the table/partition associated
- * with this compaction.
- * @param info information on the compaction that failed.
- * @throws MetaException
- */
- @RetrySemantics.CannotRetry
- void markFailed(CompactionInfo info) throws MetaException;
-
- /**
- * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
- * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)).
- */
- @RetrySemantics.SafeToRetry
- void cleanTxnToWriteIdTable() throws MetaException;
-
- /**
- * Clean up aborted transactions from txns that have no components in txn_components. The reson such
- * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
- * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
- */
- @RetrySemantics.SafeToRetry
- void cleanEmptyAbortedTxns() throws MetaException;
-
- /**
- * This will take all entries assigned to workers
- * on a host return them to INITIATED state. The initiator should use this at start up to
- * clean entries from any workers that were in the middle of compacting when the metastore
- * shutdown. It does not reset entries from worker threads on other hosts as those may still
- * be working.
- * @param hostname Name of this host. It is assumed this prefixes the thread's worker id,
- * so that like hostname% will match the worker id.
- */
- @RetrySemantics.Idempotent
- void revokeFromLocalWorkers(String hostname) throws MetaException;
-
- /**
- * This call will return all compaction queue
- * entries assigned to a worker but over the timeout back to the initiated state.
- * This should be called by the initiator on start up and occasionally when running to clean up
- * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called
- * first.
- * @param timeout number of milliseconds since start time that should elapse before a worker is
- * declared dead.
- */
- @RetrySemantics.Idempotent
- void revokeTimedoutWorkers(long timeout) throws MetaException;
-
- /**
- * Queries metastore DB directly to find columns in the table which have statistics information.
- * If {@code ci} includes partition info then per partition stats info is examined, otherwise
- * table level stats are examined.
- * @throws MetaException
- */
- @RetrySemantics.ReadOnly
- List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
-
- /**
- * Record the highest write id that the {@code ci} compaction job will pay attention to.
- */
- @RetrySemantics.Idempotent
- void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException;
-
- /**
- * For any given compactable entity (partition, table if not partitioned) the history of compactions
- * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
- * history such that a configurable number of each type of state is present. Any other entries
- * can be purged. This scheme has advantage of always retaining the last failure/success even if
- * it's not recent.
- * @throws MetaException
- */
- @RetrySemantics.SafeToRetry
- void purgeCompactionHistory() throws MetaException;
-
- /**
- * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the
- * transaction metadata once it becomes unnecessary.
- */
- @RetrySemantics.SafeToRetry
- void performWriteSetGC();
-
- /**
- * Determine if there are enough consecutive failures compacting a table or partition that no
- * new automatic compactions should be scheduled. User initiated compactions do not do this
- * check.
- * @param ci Table or partition to check.
- * @return true if it is ok to compact, false if there have been too many failures.
- * @throws MetaException
- */
- @RetrySemantics.ReadOnly
- boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
-
- @VisibleForTesting
- int numLocksInLockTable() throws SQLException, MetaException;
-
- @VisibleForTesting
- long setTimeout(long milliseconds);
-
- @RetrySemantics.Idempotent
- MutexAPI getMutexAPI();
-
- /**
- * This is primarily designed to provide coarse grained mutex support to operations running
- * inside the Metastore (of which there could be several instances). The initial goal is to
- * ensure that various sub-processes of the Compactor don't step on each other.
- *
- * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly.
- */
- interface MutexAPI {
- /**
- * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns
- * a handle which must be used to release the lock. Each invocation returns a new handle.
- */
- LockHandle acquireLock(String key) throws MetaException;
-
- /**
- * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This
- * will associate the lock on {@code key} with the same handle. All locks associated with
- * the same handle will be released together.
- * @param handle not NULL
- */
- void acquireLock(String key, LockHandle handle) throws MetaException;
- interface LockHandle {
- /**
- * Releases all locks associated with this handle.
- */
- void releaseLocks();
- }
- }
-
- /**
- * Once a {@link java.util.concurrent.ThreadPoolExecutor} Worker submits a job to the cluster,
- * it calls this to update the metadata.
- * @param id {@link CompactionInfo#id}
- */
- @RetrySemantics.Idempotent
- void setHadoopJobId(String hadoopJobId, long id);
-
- /**
- * Add the ACID write event information to writeNotificationLog table.
- * @param acidWriteEvent
- */
- @RetrySemantics.Idempotent
- void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException;
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
deleted file mode 100644
index fa291d5..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
-import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.utils.JavaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.List;
-import java.util.Map;
-
-public class TxnUtils {
- private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
-
- /**
- * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
- * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
- * read the files, and thus treats both open and aborted transactions as invalid.
- * @param txns txn list from the metastore
- * @param currentTxn Current transaction that the user has open. If this is greater than 0 it
- * will be removed from the exceptions list so that the user sees his own
- * transaction as valid.
- * @return a valid txn list.
- */
- public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
- /*
- * The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
- * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
- * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should
- * include the latest committed set.
- */
- long highWaterMark = (currentTxn > 0) ? Math.min(currentTxn, txns.getTxn_high_water_mark())
- : txns.getTxn_high_water_mark();
-
- // Open txns are already sorted in ascending order. This list may or may not include HWM
- // but it is guaranteed that list won't have txn > HWM. But, if we overwrite the HWM with currentTxn
- // then need to truncate the exceptions list accordingly.
- List<Long> openTxns = txns.getOpen_txns();
-
- // We care only about open/aborted txns below currentTxn and hence the size should be determined
- // for the exceptions list. The currentTxn will be missing in openTxns list only in rare case like
- // txn is aborted by AcidHouseKeeperService and compactor actually cleans up the aborted txns.
- // So, for such cases, we get negative value for sizeToHwm with found position for currentTxn, and so,
- // we just negate it to get the size.
- int sizeToHwm = (currentTxn > 0) ? Collections.binarySearch(openTxns, currentTxn) : openTxns.size();
- sizeToHwm = (sizeToHwm < 0) ? (-sizeToHwm) : sizeToHwm;
- long[] exceptions = new long[sizeToHwm];
- BitSet inAbortedBits = BitSet.valueOf(txns.getAbortedBits());
- BitSet outAbortedBits = new BitSet();
- long minOpenTxnId = Long.MAX_VALUE;
- int i = 0;
- for (long txn : openTxns) {
- // For snapshot isolation, we don't care about txns greater than current txn and so stop here.
- // Also, we need not include current txn to exceptions list.
- if ((currentTxn > 0) && (txn >= currentTxn)) {
- break;
- }
- if (inAbortedBits.get(i)) {
- outAbortedBits.set(i);
- } else if (minOpenTxnId == Long.MAX_VALUE) {
- minOpenTxnId = txn;
- }
- exceptions[i++] = txn;
- }
- return new ValidReadTxnList(exceptions, outAbortedBits, highWaterMark, minOpenTxnId);
- }
-
- /**
- * Transform a {@link org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse} to a
- * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}. This assumes that the caller intends to
- * read the files, and thus treats both open and aborted transactions as invalid.
- * @param currentTxnId current txn ID for which we get the valid write ids list
- * @param list valid write ids list from the metastore
- * @return a valid write IDs list for the whole transaction.
- */
- public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId,
- List<TableValidWriteIds> validIds) {
- ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId);
- for (TableValidWriteIds tableWriteIds : validIds) {
- validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds));
- }
- return validTxnWriteIdList;
- }
-
- /**
- * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
- * {@link org.apache.hadoop.hive.common.ValidReaderWriteIdList}. This assumes that the caller intends to
- * read the files, and thus treats both open and aborted write ids as invalid.
- * @param tableWriteIds valid write ids for the given table from the metastore
- * @return a valid write IDs list for the input table
- */
- public static ValidReaderWriteIdList createValidReaderWriteIdList(TableValidWriteIds tableWriteIds) {
- String fullTableName = tableWriteIds.getFullTableName();
- long highWater = tableWriteIds.getWriteIdHighWaterMark();
- List<Long> invalids = tableWriteIds.getInvalidWriteIds();
- BitSet abortedBits = BitSet.valueOf(tableWriteIds.getAbortedBits());
- long[] exceptions = new long[invalids.size()];
- int i = 0;
- for (long writeId : invalids) {
- exceptions[i++] = writeId;
- }
- if (tableWriteIds.isSetMinOpenWriteId()) {
- return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater,
- tableWriteIds.getMinOpenWriteId());
- } else {
- return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater);
- }
- }
-
- /**
- * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
- * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}. This assumes that the caller intends to
- * compact the files, and thus treats only open transactions/write ids as invalid. Additionally any
- * writeId > highestOpenWriteId is also invalid. This is to avoid creating something like
- * delta_17_120 where writeId 80, for example, is still open.
- * @param tableValidWriteIds table write id list from the metastore
- * @return a valid write id list.
- */
- public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValidWriteIds tableValidWriteIds) {
- String fullTableName = tableValidWriteIds.getFullTableName();
- long highWater = tableValidWriteIds.getWriteIdHighWaterMark();
- long minOpenWriteId = Long.MAX_VALUE;
- List<Long> invalids = tableValidWriteIds.getInvalidWriteIds();
- BitSet abortedBits = BitSet.valueOf(tableValidWriteIds.getAbortedBits());
- long[] exceptions = new long[invalids.size()];
- int i = 0;
- for (long writeId : invalids) {
- if (abortedBits.get(i)) {
- // Only need aborted since we don't consider anything above minOpenWriteId
- exceptions[i++] = writeId;
- } else {
- minOpenWriteId = Math.min(minOpenWriteId, writeId);
- }
- }
- if(i < exceptions.length) {
- exceptions = Arrays.copyOf(exceptions, i);
- }
- highWater = minOpenWriteId == Long.MAX_VALUE ? highWater : minOpenWriteId - 1;
- BitSet bitSet = new BitSet(exceptions.length);
- bitSet.set(0, exceptions.length); // for ValidCompactorWriteIdList, everything in exceptions are aborted
- if (minOpenWriteId == Long.MAX_VALUE) {
- return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater);
- } else {
- return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater, minOpenWriteId);
- }
- }
-
- public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) {
- // This is based on the existing valid write ID list that was built for a select query;
- // therefore we assume all the aborted txns, etc. were already accounted for.
- // All we do is adjust the high watermark to only include contiguous txns.
- Long minOpenWriteId = ids.getMinOpenWriteId();
- if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) {
- return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1);
- }
- return ids;
- }
-
- /**
- * Get an instance of the TxnStore that is appropriate for this store
- * @param conf configuration
- * @return txn store
- */
- public static TxnStore getTxnStore(Configuration conf) {
- String className = MetastoreConf.getVar(conf, ConfVars.TXN_STORE_IMPL);
- try {
- TxnStore handler = JavaUtils.getClass(className, TxnStore.class).newInstance();
- handler.setConf(conf);
- return handler;
- } catch (Exception e) {
- LOG.error("Unable to instantiate raw store directly in fastpath mode", e);
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Note, users are responsible for using the correct TxnManager. We do not look at
- * SessionState.get().getTxnMgr().supportsAcid() here
- * Should produce the same result as
- * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)}.
- * @return true if table is a transactional table, false otherwise
- */
- public static boolean isTransactionalTable(Table table) {
- if (table == null) {
- return false;
- }
- Map<String, String> parameters = table.getParameters();
- String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
- return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
- }
-
- /**
- * Should produce the same result as
- * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)}.
- */
- public static boolean isAcidTable(Table table) {
- return TxnUtils.isTransactionalTable(table) &&
- TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY.equals(table.getParameters()
- .get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES));
- }
-
- /**
- * Should produce the result as <dbName>.<tableName>.
- */
- public static String getFullTableName(String dbName, String tableName) {
- return dbName.toLowerCase() + "." + tableName.toLowerCase();
- }
-
- public static String[] getDbTableName(String fullTableName) {
- return fullTableName.split("\\.");
- }
-
-
-
- /**
- * Build a query (or queries if one query is too big but only for the case of 'IN'
- * composite clause. For the case of 'NOT IN' clauses, multiple queries change
- * the semantics of the intended query.
- * E.g., Let's assume that input "inList" parameter has [5, 6] and that
- * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause,
- * Then having two delete statements changes the semantics of the inteneded SQL statement.
- * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence
- * is not equal to 'delete from T where a not in (5, 6)'.)
- * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters.
- *
- * Note that this method currently support only single column for
- * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and
- * AND-based composite 'NOT IN' clause.
- * For example, for 'IN' clause case, the method will build a query with OR.
- * E.g., "id in (1,2,3) OR id in (4,5,6)".
- * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND.
- *
- * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN'
- * clauses in a query".
- *
- * @param queries OUT: Array of query strings
- * @param prefix IN: Part of the query that comes before IN list
- * @param suffix IN: Part of the query that comes after IN list
- * @param inList IN: the list with IN list values
- * @param inColumn IN: single column name of IN list operator
- * @param addParens IN: add a pair of parenthesis outside the IN lists
- * e.g. "(id in (1,2,3) OR id in (4,5,6))"
- * @param notIn IN: is this for building a 'NOT IN' composite clause?
- * @return OUT: a list of the count of IN list values that are in each of the corresponding queries
- */
- public static List<Integer> buildQueryWithINClause(Configuration conf,
- List<String> queries,
- StringBuilder prefix,
- StringBuilder suffix,
- List<Long> inList,
- String inColumn,
- boolean addParens,
- boolean notIn) {
- List<String> inListStrings = new ArrayList<>(inList.size());
- for (Long aLong : inList) {
- inListStrings.add(aLong.toString());
- }
- return buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
- inListStrings, inColumn, addParens, notIn);
-
- }
- /**
- * Build a query (or queries if one query is too big but only for the case of 'IN'
- * composite clause. For the case of 'NOT IN' clauses, multiple queries change
- * the semantics of the intended query.
- * E.g., Let's assume that input "inList" parameter has [5, 6] and that
- * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause,
- * Then having two delete statements changes the semantics of the inteneded SQL statement.
- * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence
- * is not equal to 'delete from T where a not in (5, 6)'.)
- * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters.
- *
- * Note that this method currently support only single column for
- * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and
- * AND-based composite 'NOT IN' clause.
- * For example, for 'IN' clause case, the method will build a query with OR.
- * E.g., "id in (1,2,3) OR id in (4,5,6)".
- * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND.
- *
- * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN'
- * clauses in a query".
- *
- * @param queries OUT: Array of query strings
- * @param prefix IN: Part of the query that comes before IN list
- * @param suffix IN: Part of the query that comes after IN list
- * @param inList IN: the list with IN list values
- * @param inColumn IN: single column name of IN list operator
- * @param addParens IN: add a pair of parenthesis outside the IN lists
- * e.g. "(id in (1,2,3) OR id in (4,5,6))"
- * @param notIn IN: is this for building a 'NOT IN' composite clause?
- * @return OUT: a list of the count of IN list values that are in each of the corresponding queries
- */
- public static List<Integer> buildQueryWithINClauseStrings(Configuration conf, List<String> queries, StringBuilder prefix,
- StringBuilder suffix, List<String> inList, String inColumn, boolean addParens, boolean notIn) {
- // Get configuration parameters
- int maxQueryLength = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH);
- int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
-
- // Check parameter set validity as a public method.
- if (inList == null || inList.size() == 0 || maxQueryLength <= 0 || batchSize <= 0) {
- throw new IllegalArgumentException("The IN list is empty!");
- }
-
- // Define constants and local variables.
- int inListSize = inList.size();
- StringBuilder buf = new StringBuilder();
-
- int cursor4InListArray = 0, // cursor for the "inList" array.
- cursor4InClauseElements = 0, // cursor for an element list per an 'IN'/'NOT IN'-clause.
- cursor4queryOfInClauses = 0; // cursor for in-clause lists per a query.
- boolean nextItemNeeded = true;
- boolean newInclausePrefixJustAppended = false;
- StringBuilder nextValue = new StringBuilder("");
- StringBuilder newInclausePrefix =
- new StringBuilder(notIn ? " and " + inColumn + " not in (":
- " or " + inColumn + " in (");
- List<Integer> ret = new ArrayList<>();
- int currentCount = 0;
-
- // Loop over the given inList elements.
- while( cursor4InListArray < inListSize || !nextItemNeeded) {
- if (cursor4queryOfInClauses == 0) {
- // Append prefix
- buf.append(prefix);
- if (addParens) {
- buf.append("(");
- }
- buf.append(inColumn);
-
- if (notIn) {
- buf.append(" not in (");
- } else {
- buf.append(" in (");
- }
- cursor4queryOfInClauses++;
- newInclausePrefixJustAppended = false;
- }
-
- // Get the next "inList" value element if needed.
- if (nextItemNeeded) {
- nextValue.setLength(0);
- nextValue.append(String.valueOf(inList.get(cursor4InListArray++)));
- nextItemNeeded = false;
- }
-
- // Compute the size of a query when the 'nextValue' is added to the current query.
- int querySize = querySizeExpected(buf.length(), nextValue.length(), suffix.length(), addParens);
-
- if (querySize > maxQueryLength * 1024) {
- // Check an edge case where the DIRECT_SQL_MAX_QUERY_LENGTH does not allow one 'IN' clause with single value.
- if (cursor4queryOfInClauses == 1 && cursor4InClauseElements == 0) {
- throw new IllegalArgumentException("The current " + ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH.getVarname() + " is set too small to have one IN clause with single value!");
- }
-
- // Check en edge case to throw Exception if we can not build a single query for 'NOT IN' clause cases as mentioned at the method comments.
- if (notIn) {
- throw new IllegalArgumentException("The NOT IN list has too many elements for the current " + ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH.getVarname() + "!");
- }
-
- // Wrap up the current query string since we can not add another "inList" element value.
- if (newInclausePrefixJustAppended) {
- buf.delete(buf.length()-newInclausePrefix.length(), buf.length());
- }
-
- buf.setCharAt(buf.length() - 1, ')'); // replace the "commar" to finish a 'IN' clause string.
-
- if (addParens) {
- buf.append(")");
- }
-
- buf.append(suffix);
- queries.add(buf.toString());
- ret.add(currentCount);
-
- // Prepare a new query string.
- buf.setLength(0);
- currentCount = 0;
- cursor4queryOfInClauses = cursor4InClauseElements = 0;
- querySize = 0;
- newInclausePrefixJustAppended = false;
- continue;
- } else if (cursor4InClauseElements >= batchSize-1 && cursor4InClauseElements != 0) {
- // Finish the current 'IN'/'NOT IN' clause and start a new clause.
- buf.setCharAt(buf.length() - 1, ')'); // replace the "commar".
- buf.append(newInclausePrefix.toString());
-
- newInclausePrefixJustAppended = true;
-
- // increment cursor for per-query IN-clause list
- cursor4queryOfInClauses++;
- cursor4InClauseElements = 0;
- } else {
- buf.append(nextValue.toString()).append(",");
- currentCount++;
- nextItemNeeded = true;
- newInclausePrefixJustAppended = false;
- // increment cursor for elements per 'IN'/'NOT IN' clause.
- cursor4InClauseElements++;
- }
- }
-
- // Finish the last query.
- if (newInclausePrefixJustAppended) {
- buf.delete(buf.length()-newInclausePrefix.length(), buf.length());
- }
- buf.setCharAt(buf.length() - 1, ')'); // replace the commar.
- if (addParens) {
- buf.append(")");
- }
- buf.append(suffix);
- queries.add(buf.toString());
- ret.add(currentCount);
- return ret;
- }
-
- /**
- * Compute and return the size of a query statement with the given parameters as input variables.
- *
- * @param sizeSoFar size of the current contents of the buf
- * @param sizeNextItem size of the next 'IN' clause element value.
- * @param suffixSize size of the suffix for a quey statement
- * @param addParens Do we add an additional parenthesis?
- */
- private static int querySizeExpected(int sizeSoFar,
- int sizeNextItem,
- int suffixSize,
- boolean addParens) {
-
- int size = sizeSoFar + sizeNextItem + suffixSize;
-
- if (addParens) {
- size++;
- }
-
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/CommonCliOptions.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/CommonCliOptions.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/CommonCliOptions.java
deleted file mode 100644
index 24e4ebe..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/CommonCliOptions.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.utils;
-
-import java.util.Properties;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.logging.log4j.Level;
-
-/**
- * Reusable code for Hive Cli's.
- * <p>
- * Basic usage is: create an instance (usually a subclass if you want to
- * all your own options or processing instructions), parse, and then use
- * the resulting information.
- * <p>
- * See org.apache.hadoop.hive.service.HiveServer or
- * org.apache.hadoop.hive.metastore.HiveMetaStore
- * for examples of use.
- *
- */
-public class CommonCliOptions {
- /**
- * Options for parsing the command line.
- */
- protected final Options OPTIONS = new Options();
-
- protected CommandLine commandLine;
-
- /**
- * The name of this cli.
- */
- protected final String cliname;
-
- private boolean verbose = false;
-
- /**
- * Create an instance with common options (help, verbose, etc...).
- *
- * @param cliname the name of the command
- * @param includeHiveConf include "hiveconf" as an option if true
- */
- @SuppressWarnings("static-access")
- public CommonCliOptions(String cliname, boolean includeHiveConf) {
- this.cliname = cliname;
-
- // [-v|--verbose]
- OPTIONS.addOption(new Option("v", "verbose", false, "Verbose mode"));
-
- // [-h|--help]
- OPTIONS.addOption(new Option("h", "help", false, "Print help information"));
-
- if (includeHiveConf) {
- OPTIONS.addOption(OptionBuilder
- .withValueSeparator()
- .hasArgs(2)
- .withArgName("property=value")
- .withLongOpt("hiveconf")
- .withDescription("Use value for given property")
- .create());
- }
- }
-
- /**
- * Add the hiveconf properties to the Java system properties, override
- * anything therein.
- *
- * @return a copy of the properties specified in hiveconf
- */
- public Properties addHiveconfToSystemProperties() {
- Properties confProps = commandLine.getOptionProperties("hiveconf");
- for (String propKey : confProps.stringPropertyNames()) {
- if (verbose) {
- System.err.println(
- "hiveconf: " + propKey + "=" + confProps.getProperty(propKey));
- }
- if (propKey.equalsIgnoreCase("hive.root.logger")) {
- splitAndSetLogger(propKey, confProps);
- } else {
- System.setProperty(propKey, confProps.getProperty(propKey));
- }
- }
- return confProps;
- }
-
- public static void splitAndSetLogger(final String propKey, final Properties confProps) {
- String propVal = confProps.getProperty(propKey);
- if (propVal.contains(",")) {
- String[] tokens = propVal.split(",");
- for (String token : tokens) {
- if (Level.getLevel(token) == null) {
- System.setProperty("hive.root.logger", token);
- } else {
- System.setProperty("hive.log.level", token);
- }
- }
- } else {
- System.setProperty(propKey, confProps.getProperty(propKey));
- }
- }
-
- /**
- * Print usage information for the CLI.
- */
- public void printUsage() {
- new HelpFormatter().printHelp(cliname, OPTIONS);
- }
-
- /**
- * Parse the arguments.
- * @param args
- */
- public void parse(String[] args) {
- try {
- commandLine = new GnuParser().parse(OPTIONS, args);
-
- if (commandLine.hasOption('h')) {
- printUsage();
- System.exit(1);
- }
- if (commandLine.hasOption('v')) {
- verbose = true;
- }
- } catch (ParseException e) {
- System.err.println(e.getMessage());
- printUsage();
- System.exit(1);
- }
-
- }
-
- /**
- * Should the client be verbose.
- */
- public boolean isVerbose() {
- return verbose;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
deleted file mode 100644
index 154db4b..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.utils;
-
-import org.apache.curator.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-
-public class FileUtils {
- private static final PathFilter SNAPSHOT_DIR_PATH_FILTER = new PathFilter() {
- @Override
- public boolean accept(Path p) {
- return ".snapshot".equalsIgnoreCase(p.getName());
- }
- };
- private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
-
- public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() {
- @Override
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
- /**
- * Filter that filters out hidden files
- */
- private static final PathFilter hiddenFileFilter = new PathFilter() {
- @Override
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
- /**
- * Move a particular file or directory to the trash.
- * @param fs FileSystem to use
- * @param f path of file or directory to move to trash.
- * @param conf configuration object
- * @return true if move successful
- * @throws IOException
- */
- public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf, boolean purge)
- throws IOException {
- LOG.debug("deleting " + f);
- boolean result;
- try {
- if(purge) {
- LOG.debug("purge is set to true. Not moving to Trash " + f);
- } else {
- result = Trash.moveToAppropriateTrash(fs, f, conf);
- if (result) {
- LOG.trace("Moved to trash: " + f);
- return true;
- }
- }
- } catch (IOException ioe) {
- // for whatever failure reason including that trash has lower encryption zone
- // retry with force delete
- LOG.warn(ioe.getMessage() + "; Force to delete it.");
- }
-
- result = fs.delete(f, true);
- if (!result) {
- LOG.error("Failed to delete " + f);
- }
- return result;
- }
-
- /**
- * Copies files between filesystems.
- */
- public static boolean copy(FileSystem srcFS, Path src,
- FileSystem dstFS, Path dst,
- boolean deleteSource,
- boolean overwrite,
- Configuration conf) throws IOException {
- boolean copied = false;
- boolean triedDistcp = false;
-
- /* Run distcp if source file/dir is too big */
- if (srcFS.getUri().getScheme().equals("hdfs")) {
- ContentSummary srcContentSummary = srcFS.getContentSummary(src);
- if (srcContentSummary.getFileCount() >
- MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXNUMFILES)
- && srcContentSummary.getLength() >
- MetastoreConf.getLongVar(conf,ConfVars.REPL_COPYFILE_MAXSIZE)) {
-
- LOG.info("Source is " + srcContentSummary.getLength() + " bytes. (MAX: " +
- MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXSIZE) + ")");
- LOG.info("Source is " + srcContentSummary.getFileCount() + " files. (MAX: " +
- MetastoreConf.getLongVar(conf, ConfVars.REPL_COPYFILE_MAXNUMFILES) + ")");
- LOG.info("Launch distributed copy (distcp) job.");
- triedDistcp = true;
- copied = distCp(srcFS, Collections.singletonList(src), dst, deleteSource, null, conf);
- }
- }
- if (!triedDistcp) {
- // Note : Currently, this implementation does not "fall back" to regular copy if distcp
- // is tried and it fails. We depend upon that behaviour in cases like replication,
- // wherein if distcp fails, there is good reason to not plod along with a trivial
- // implementation, and fail instead.
- copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf);
- }
- return copied;
- }
-
- private static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst,
- boolean deleteSource, String doAsUser,
- Configuration conf) throws IOException {
- boolean copied;
- if (doAsUser == null){
- copied = HdfsUtils.runDistCp(srcPaths, dst, conf);
- } else {
- copied = HdfsUtils.runDistCpAs(srcPaths, dst, conf, doAsUser);
- }
- if (copied && deleteSource) {
- for (Path path : srcPaths) {
- srcFS.delete(path, true);
- }
- }
- return copied;
- }
-
- /**
- * Creates the directory and all necessary parent directories.
- * @param fs FileSystem to use
- * @param f path to create.
- * @return true if directory created successfully. False otherwise, including if it exists.
- * @throws IOException exception in creating the directory
- */
- public static boolean mkdir(FileSystem fs, Path f) throws IOException {
- LOG.info("Creating directory if it doesn't exist: " + f);
- return fs.mkdirs(f);
- }
-
- /**
- * Rename a file. Unlike {@link FileSystem#rename(Path, Path)}, if the destPath already exists
- * and is a directory, this will NOT move the sourcePath into it. It will throw an IOException
- * instead.
- * @param srcFs file system src paths are on
- * @param destFs file system dest paths are on
- * @param srcPath source file or directory to move
- * @param destPath destination file name. This must be a file and not an existing directory.
- * @return result of fs.rename.
- * @throws IOException if fs.rename throws it, or if destPath already exists.
- */
- public static boolean rename(FileSystem srcFs, FileSystem destFs, Path srcPath,
- Path destPath) throws IOException {
- LOG.info("Renaming " + srcPath + " to " + destPath);
-
- // If destPath directory exists, rename call will move the srcPath
- // into destPath without failing. So check it before renaming.
- if(destFs.exists(destPath)) {
- throw new IOException("Cannot rename the source path. The destination "
- + "path already exists.");
- }
-
- if (equalsFileSystem(srcFs, destFs)) {
- //just rename the directory
- return srcFs.rename(srcPath, destPath);
- } else {
- Configuration conf = new Configuration();
- return copy(srcFs, srcPath, destFs, destPath,
- true, // delete source
- false, // overwrite destination
- conf);
- }
- }
-
- // NOTE: This is for generating the internal path name for partitions. Users
- // should always use the MetaStore API to get the path name for a partition.
- // Users should not directly take partition values and turn it into a path
- // name by themselves, because the logic below may change in the future.
- //
- // In the future, it's OK to add new chars to the escape list, and old data
- // won't be corrupt, because the full path name in metastore is stored.
- // In that case, Hive will continue to read the old data, but when it creates
- // new partitions, it will use new names.
- // edit : There are some use cases for which adding new chars does not seem
- // to be backward compatible - Eg. if partition was created with name having
- // a special char that you want to start escaping, and then you try dropping
- // the partition with a hive version that now escapes the special char using
- // the list below, then the drop partition fails to work.
-
- private static BitSet charToEscape = new BitSet(128);
- static {
- for (char c = 0; c < ' '; c++) {
- charToEscape.set(c);
- }
-
- /*
- * ASCII 01-1F are HTTP control characters that need to be escaped.
- * \u000A and \u000D are \n and \r, respectively.
- */
- char[] clist = new char[] {'\u0001', '\u0002', '\u0003', '\u0004',
- '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', '\n', '\u000B',
- '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012',
- '\u0013', '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019',
- '\u001A', '\u001B', '\u001C', '\u001D', '\u001E', '\u001F',
- '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', '{',
- '[', ']', '^'};
-
- for (char c : clist) {
- charToEscape.set(c);
- }
- }
-
- private static boolean needsEscaping(char c) {
- return c >= 0 && c < charToEscape.size() && charToEscape.get(c);
- }
-
- public static String escapePathName(String path) {
- return escapePathName(path, null);
- }
-
- /**
- * Escapes a path name.
- * @param path The path to escape.
- * @param defaultPath
- * The default name for the path, if the given path is empty or null.
- * @return An escaped path name.
- */
- public static String escapePathName(String path, String defaultPath) {
-
- // __HIVE_DEFAULT_NULL__ is the system default value for null and empty string.
- // TODO: we should allow user to specify default partition or HDFS file location.
- if (path == null || path.length() == 0) {
- if (defaultPath == null) {
- //previously, when path is empty or null and no default path is specified,
- // __HIVE_DEFAULT_PARTITION__ was the return value for escapePathName
- return "__HIVE_DEFAULT_PARTITION__";
- } else {
- return defaultPath;
- }
- }
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < path.length(); i++) {
- char c = path.charAt(i);
- if (needsEscaping(c)) {
- sb.append('%');
- sb.append(String.format("%1$02X", (int) c));
- } else {
- sb.append(c);
- }
- }
- return sb.toString();
- }
-
- public static String unescapePathName(String path) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < path.length(); i++) {
- char c = path.charAt(i);
- if (c == '%' && i + 2 < path.length()) {
- int code = -1;
- try {
- code = Integer.parseInt(path.substring(i + 1, i + 3), 16);
- } catch (Exception e) {
- code = -1;
- }
- if (code >= 0) {
- sb.append((char) code);
- i += 2;
- continue;
- }
- }
- sb.append(c);
- }
- return sb.toString();
- }
-
- /**
- * Get all file status from a root path and recursively go deep into certain levels.
- *
- * @param path
- * the root path
- * @param level
- * the depth of directory to explore
- * @param fs
- * the file system
- * @return array of FileStatus
- * @throws IOException
- */
- public static List<FileStatus> getFileStatusRecurse(Path path, int level, FileSystem fs)
- throws IOException {
-
- // if level is <0, the return all files/directories under the specified path
- if (level < 0) {
- List<FileStatus> result = new ArrayList<>();
- try {
- FileStatus fileStatus = fs.getFileStatus(path);
- FileUtils.listStatusRecursively(fs, fileStatus, result);
- } catch (IOException e) {
- // globStatus() API returns empty FileStatus[] when the specified path
- // does not exist. But getFileStatus() throw IOException. To mimic the
- // similar behavior we will return empty array on exception. For external
- // tables, the path of the table will not exists during table creation
- return new ArrayList<>(0);
- }
- return result;
- }
-
- // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
- StringBuilder sb = new StringBuilder(path.toUri().getPath());
- for (int i = 0; i < level; i++) {
- sb.append(Path.SEPARATOR).append("*");
- }
- Path pathPattern = new Path(path, sb.toString());
- return Lists.newArrayList(fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER));
- }
-
- /**
- * Recursively lists status for all files starting from a particular directory (or individual file
- * as base case).
- *
- * @param fs
- * file system
- *
- * @param fileStatus
- * starting point in file system
- *
- * @param results
- * receives enumeration of all files found
- */
- public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus,
- List<FileStatus> results) throws IOException {
-
- if (fileStatus.isDir()) {
- for (FileStatus stat : fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_PATH_FILTER)) {
- listStatusRecursively(fs, stat, results);
- }
- } else {
- results.add(fileStatus);
- }
- }
-
- public static String makePartName(List<String> partCols, List<String> vals) {
- return makePartName(partCols, vals, null);
- }
-
- /**
- * Makes a valid partition name.
- * @param partCols The partition keys' names
- * @param vals The partition values
- * @param defaultStr
- * The default name given to a partition value if the respective value is empty or null.
- * @return An escaped, valid partition name.
- */
- public static String makePartName(List<String> partCols, List<String> vals,
- String defaultStr) {
- StringBuilder name = new StringBuilder();
- for (int i = 0; i < partCols.size(); i++) {
- if (i > 0) {
- name.append(Path.SEPARATOR);
- }
- name.append(escapePathName((partCols.get(i)).toLowerCase(), defaultStr));
- name.append('=');
- name.append(escapePathName(vals.get(i), defaultStr));
- }
- return name.toString();
- }
-
- /**
- * Determine if two objects reference the same file system.
- * @param fs1 first file system
- * @param fs2 second file system
- * @return return true if both file system arguments point to same file system
- */
- public static boolean equalsFileSystem(FileSystem fs1, FileSystem fs2) {
- //When file system cache is disabled, you get different FileSystem objects
- // for same file system, so '==' can't be used in such cases
- //FileSystem api doesn't have a .equals() function implemented, so using
- //the uri for comparison. FileSystem already uses uri+Configuration for
- //equality in its CACHE .
- //Once equality has been added in HDFS-9159, we should make use of it
- return fs1.getUri().equals(fs2.getUri());
- }
-
- /**
- * Check if the path contains a subdirectory named '.snapshot'
- * @param p path to check
- * @param fs filesystem of the path
- * @return true if p contains a subdirectory named '.snapshot'
- * @throws IOException
- */
- public static boolean pathHasSnapshotSubDir(Path p, FileSystem fs) throws IOException {
- // Hadoop is missing a public API to check for snapshotable directories. Check with the directory name
- // until a more appropriate API is provided by HDFS-12257.
- final FileStatus[] statuses = fs.listStatus(p, FileUtils.SNAPSHOT_DIR_PATH_FILTER);
- return statuses != null && statuses.length != 0;
- }
-
- public static void makeDir(Path path, Configuration conf) throws MetaException {
- FileSystem fs;
- try {
- fs = path.getFileSystem(conf);
- if (!fs.exists(path)) {
- fs.mkdirs(path);
- }
- } catch (IOException e) {
- throw new MetaException("Unable to : " + path);
- }
- }
-
- /**
- * Utility method that determines if a specified directory already has
- * contents (non-hidden files) or not - useful to determine if an
- * immutable table already has contents, for example.
- * @param fs
- * @param path
- * @throws IOException
- */
- public static boolean isDirEmpty(FileSystem fs, Path path) throws IOException {
-
- if (fs.exists(path)) {
- FileStatus[] status = fs.globStatus(new Path(path, "*"), hiddenFileFilter);
- if (status.length > 0) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Variant of Path.makeQualified that qualifies the input path against the default file system
- * indicated by the configuration
- *
- * This does not require a FileSystem handle in most cases - only requires the Filesystem URI.
- * This saves the cost of opening the Filesystem - which can involve RPCs - as well as cause
- * errors
- *
- * @param path
- * path to be fully qualified
- * @param conf
- * Configuration file
- * @return path qualified relative to default file system
- */
- public static Path makeQualified(Path path, Configuration conf) throws IOException {
-
- if (!path.isAbsolute()) {
- // in this case we need to get the working directory
- // and this requires a FileSystem handle. So revert to
- // original method.
- FileSystem fs = FileSystem.get(conf);
- return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
- }
-
- URI fsUri = FileSystem.getDefaultUri(conf);
- URI pathUri = path.toUri();
-
- String scheme = pathUri.getScheme();
- String authority = pathUri.getAuthority();
-
- // validate/fill-in scheme and authority. this follows logic
- // identical to FileSystem.get(URI, conf) - but doesn't actually
- // obtain a file system handle
-
- if (scheme == null) {
- // no scheme - use default file system uri
- scheme = fsUri.getScheme();
- authority = fsUri.getAuthority();
- if (authority == null) {
- authority = "";
- }
- } else {
- if (authority == null) {
- // no authority - use default one if it applies
- if (scheme.equals(fsUri.getScheme()) && fsUri.getAuthority() != null) {
- authority = fsUri.getAuthority();
- } else {
- authority = "";
- }
- }
- }
-
- return new Path(scheme, authority, pathUri.getPath());
- }
-
- /**
- * Returns a BEST GUESS as to whether or not other is a subdirectory of parent. It does not
- * take into account any intricacies of the underlying file system, which is assumed to be
- * HDFS. This should not return any false positives, but may return false negatives.
- *
- * @param parent
- * @param other Directory to check if it is a subdirectory of parent
- * @return True, if other is subdirectory of parent
- */
- public static boolean isSubdirectory(String parent, String other) {
- return other.startsWith(parent.endsWith(Path.SEPARATOR) ? parent : parent + Path.SEPARATOR);
- }
-
- public static Path getTransformedPath(String name, String subDir, String root) {
- if (root != null) {
- Path newPath = new Path(root);
- if (subDir != null) {
- newPath = new Path(newPath, subDir);
- }
- return new Path(newPath, name);
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
deleted file mode 100644
index 2122788..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/HdfsUtils.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.utils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclEntryScope;
-import org.apache.hadoop.fs.permission.AclEntryType;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.tools.DistCp;
-import org.apache.hadoop.tools.DistCpOptions;
-import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.LoginException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class HdfsUtils {
- private static final Logger LOG = LoggerFactory.getLogger(HdfsUtils.class);
- private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
- // TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this
- // is still going to work. Otherwise, file IDs can be turned off. Later, we should use
- // as public utility method in HDFS to obtain the inode-based path.
- private static final String HDFS_ID_PATH_PREFIX = "/.reserved/.inodes/";
-
- /**
- * Check the permissions on a file.
- * @param fs Filesystem the file is contained in
- * @param stat Stat info for the file
- * @param action action to be performed
- * @throws IOException If thrown by Hadoop
- * @throws AccessControlException if the file cannot be accessed
- */
- public static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action)
- throws IOException, LoginException {
- checkFileAccess(fs, stat, action, SecurityUtils.getUGI());
- }
-
- /**
- * Check the permissions on a file
- * @param fs Filesystem the file is contained in
- * @param stat Stat info for the file
- * @param action action to be performed
- * @param ugi user group info for the current user. This is passed in so that tests can pass
- * in mock ones.
- * @throws IOException If thrown by Hadoop
- * @throws AccessControlException if the file cannot be accessed
- */
- @VisibleForTesting
- static void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action,
- UserGroupInformation ugi) throws IOException {
-
- String user = ugi.getShortUserName();
- String[] groups = ugi.getGroupNames();
-
- if (groups != null) {
- String superGroupName = fs.getConf().get("dfs.permissions.supergroup", "");
- if (arrayContains(groups, superGroupName)) {
- LOG.debug("User \"" + user + "\" belongs to super-group \"" + superGroupName + "\". " +
- "Permission granted for action: " + action + ".");
- return;
- }
- }
-
- FsPermission dirPerms = stat.getPermission();
-
- if (user.equals(stat.getOwner())) {
- if (dirPerms.getUserAction().implies(action)) {
- return;
- }
- } else if (arrayContains(groups, stat.getGroup())) {
- if (dirPerms.getGroupAction().implies(action)) {
- return;
- }
- } else if (dirPerms.getOtherAction().implies(action)) {
- return;
- }
- throw new AccessControlException("action " + action + " not permitted on path "
- + stat.getPath() + " for user " + user);
- }
-
- public static boolean isPathEncrypted(Configuration conf, URI fsUri, Path path)
- throws IOException {
- Path fullPath;
- if (path.isAbsolute()) {
- fullPath = path;
- } else {
- fullPath = path.getFileSystem(conf).makeQualified(path);
- }
- if(!"hdfs".equalsIgnoreCase(path.toUri().getScheme())) {
- return false;
- }
- try {
- HdfsAdmin hdfsAdmin = new HdfsAdmin(fsUri, conf);
- return (hdfsAdmin.getEncryptionZoneForPath(fullPath) != null);
- } catch (FileNotFoundException fnfe) {
- LOG.debug("Failed to get EZ for non-existent path: "+ fullPath, fnfe);
- return false;
- }
- }
-
- private static boolean arrayContains(String[] array, String value) {
- if (array == null) return false;
- for (String element : array) {
- if (element.equals(value)) return true;
- }
- return false;
- }
-
- public static boolean runDistCpAs(List<Path> srcPaths, Path dst, Configuration conf,
- String doAsUser) throws IOException {
- UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
- doAsUser, UserGroupInformation.getLoginUser());
- try {
- return proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() {
- @Override
- public Boolean run() throws Exception {
- return runDistCp(srcPaths, dst, conf);
- }
- });
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- public static boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf)
- throws IOException {
- DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst)
- .withSyncFolder(true)
- .withCRC(true)
- .preserve(FileAttribute.BLOCKSIZE)
- .build();
-
- // Creates the command-line parameters for distcp
- List<String> params = constructDistCpParams(srcPaths, dst, conf);
-
- try {
- conf.setBoolean("mapred.mapper.new-api", true);
- DistCp distcp = new DistCp(conf, options);
-
- // HIVE-13704 states that we should use run() instead of execute() due to a hadoop known issue
- // added by HADOOP-10459
- if (distcp.run(params.toArray(new String[params.size()])) == 0) {
- return true;
- } else {
- return false;
- }
- } catch (Exception e) {
- throw new IOException("Cannot execute DistCp process: " + e, e);
- } finally {
- conf.setBoolean("mapred.mapper.new-api", false);
- }
- }
-
- private static List<String> constructDistCpParams(List<Path> srcPaths, Path dst,
- Configuration conf) {
- List<String> params = new ArrayList<>();
- for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
- String distCpOption = entry.getKey();
- String distCpVal = entry.getValue();
- params.add("-" + distCpOption);
- if ((distCpVal != null) && (!distCpVal.isEmpty())){
- params.add(distCpVal);
- }
- }
- if (params.size() == 0){
- // if no entries were added via conf, we initiate our defaults
- params.add("-update");
- params.add("-pbx");
- }
- for (Path src : srcPaths) {
- params.add(src.toString());
- }
- params.add(dst.toString());
- return params;
- }
-
- public static Path getFileIdPath(
- FileSystem fileSystem, Path path, long fileId) {
- return (fileSystem instanceof DistributedFileSystem)
- ? new Path(HDFS_ID_PATH_PREFIX + fileId) : path;
- }
-
- public static long getFileId(FileSystem fs, String path) throws IOException {
- return ensureDfs(fs).getClient().getFileInfo(path).getFileId();
- }
-
- private static DistributedFileSystem ensureDfs(FileSystem fs) {
- if (!(fs instanceof DistributedFileSystem)) {
- throw new UnsupportedOperationException("Only supported for DFS; got " + fs.getClass());
- }
- return (DistributedFileSystem)fs;
- }
-
- public static class HadoopFileStatus {
-
- private final FileStatus fileStatus;
- private final AclStatus aclStatus;
-
- public HadoopFileStatus(Configuration conf, FileSystem fs, Path file) throws IOException {
-
- FileStatus fileStatus = fs.getFileStatus(file);
- AclStatus aclStatus = null;
- if (Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true")) {
- //Attempt extended Acl operations only if its enabled, but don't fail the operation regardless.
- try {
- aclStatus = fs.getAclStatus(file);
- } catch (Exception e) {
- LOG.info("Skipping ACL inheritance: File system for path " + file + " " +
- "does not support ACLs but dfs.namenode.acls.enabled is set to true. ");
- LOG.debug("The details are: " + e, e);
- }
- }this.fileStatus = fileStatus;
- this.aclStatus = aclStatus;
- }
-
- public FileStatus getFileStatus() {
- return fileStatus;
- }
-
- List<AclEntry> getAclEntries() {
- return aclStatus == null ? null : Collections.unmodifiableList(aclStatus.getEntries());
- }
-
- @VisibleForTesting
- AclStatus getAclStatus() {
- return this.aclStatus;
- }
- }
-
- /**
- * Copy the permissions, group, and ACLs from a source {@link HadoopFileStatus} to a target {@link Path}. This method
- * will only log a warning if permissions cannot be set, no exception will be thrown.
- *
- * @param conf the {@link Configuration} used when setting permissions and ACLs
- * @param sourceStatus the source {@link HadoopFileStatus} to copy permissions and ACLs from
- * @param targetGroup the group of the target {@link Path}, if this is set and it is equal to the source group, an
- * extra set group operation is avoided
- * @param fs the {@link FileSystem} that contains the target {@link Path}
- * @param target the {@link Path} to copy permissions, group, and ACLs to
- * @param recursion recursively set permissions and ACLs on the target {@link Path}
- */
- public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus,
- String targetGroup, FileSystem fs, Path target, boolean recursion) {
- setFullFileStatus(conf, sourceStatus, targetGroup, fs, target, recursion, recursion ? new FsShell() : null);
- }
-
- @VisibleForTesting
- static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus,
- String targetGroup, FileSystem fs, Path target, boolean recursion, FsShell fsShell) {
- try {
- FileStatus fStatus = sourceStatus.getFileStatus();
- String group = fStatus.getGroup();
- boolean aclEnabled = Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true");
- FsPermission sourcePerm = fStatus.getPermission();
- List<AclEntry> aclEntries = null;
- if (aclEnabled) {
- if (sourceStatus.getAclEntries() != null) {
- LOG.trace(sourceStatus.getAclStatus().toString());
- aclEntries = new ArrayList<>(sourceStatus.getAclEntries());
- removeBaseAclEntries(aclEntries);
-
- //the ACL api's also expect the tradition user/group/other permission in the form of ACL
- aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, sourcePerm.getUserAction()));
- aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, sourcePerm.getGroupAction()));
- aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, sourcePerm.getOtherAction()));
- }
- }
-
- if (recursion) {
- //use FsShell to change group, permissions, and extended ACL's recursively
- fsShell.setConf(conf);
- //If there is no group of a file, no need to call chgrp
- if (group != null && !group.isEmpty()) {
- run(fsShell, new String[]{"-chgrp", "-R", group, target.toString()});
- }
- if (aclEnabled) {
- if (null != aclEntries) {
- //Attempt extended Acl operations only if its enabled, 8791but don't fail the operation regardless.
- try {
- //construct the -setfacl command
- String aclEntry = Joiner.on(",").join(aclEntries);
- run(fsShell, new String[]{"-setfacl", "-R", "--set", aclEntry, target.toString()});
-
- } catch (Exception e) {
- LOG.info("Skipping ACL inheritance: File system for path " + target + " " +
- "does not support ACLs but dfs.namenode.acls.enabled is set to true. ");
- LOG.debug("The details are: " + e, e);
- }
- }
- } else {
- String permission = Integer.toString(sourcePerm.toShort(), 8);
- run(fsShell, new String[]{"-chmod", "-R", permission, target.toString()});
- }
- } else {
- if (group != null && !group.isEmpty()) {
- if (targetGroup == null ||
- !group.equals(targetGroup)) {
- fs.setOwner(target, null, group);
- }
- }
- if (aclEnabled) {
- if (null != aclEntries) {
- fs.setAcl(target, aclEntries);
- }
- } else {
- fs.setPermission(target, sourcePerm);
- }
- }
- } catch (Exception e) {
- LOG.warn(
- "Unable to inherit permissions for file " + target + " from file " + sourceStatus.getFileStatus().getPath(),
- e.getMessage());
- LOG.debug("Exception while inheriting permissions", e);
- }
- }
-
- /**
- * Removes basic permission acls (unamed acls) from the list of acl entries
- * @param entries acl entries to remove from.
- */
- private static void removeBaseAclEntries(List<AclEntry> entries) {
- Iterables.removeIf(entries, new Predicate<AclEntry>() {
- @Override
- public boolean apply(AclEntry input) {
- if (input.getName() == null) {
- return true;
- }
- return false;
- }
- });
- }
-
- /**
- * Create a new AclEntry with scope, type and permission (no name).
- *
- * @param scope
- * AclEntryScope scope of the ACL entry
- * @param type
- * AclEntryType ACL entry type
- * @param permission
- * FsAction set of permissions in the ACL entry
- * @return AclEntry new AclEntry
- */
- private static AclEntry newAclEntry(AclEntryScope scope, AclEntryType type,
- FsAction permission) {
- return new AclEntry.Builder().setScope(scope).setType(type)
- .setPermission(permission).build();
- }
-
- private static void run(FsShell shell, String[] command) throws Exception {
- LOG.debug(ArrayUtils.toString(command));
- int retval = shell.run(command);
- LOG.debug("Return value is :" + retval);
- }
-}