You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:45 UTC
[28/61] [partial] incubator-impala git commit: IMPALA-3786: Replace
"cloudera" with "apache" (part 1)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
deleted file mode 100644
index 2464376..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
+++ /dev/null
@@ -1,1958 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.BlockStorageLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.VolumeId;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.ColumnDef;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.LiteralExpr;
-import com.cloudera.impala.analysis.NullLiteral;
-import com.cloudera.impala.analysis.NumericLiteral;
-import com.cloudera.impala.analysis.PartitionKeyValue;
-import com.cloudera.impala.catalog.HdfsPartition.BlockReplica;
-import com.cloudera.impala.catalog.HdfsPartition.FileBlock;
-import com.cloudera.impala.catalog.HdfsPartition.FileDescriptor;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.FileSystemUtil;
-import com.cloudera.impala.common.Pair;
-import com.cloudera.impala.common.PrintUtils;
-import com.cloudera.impala.thrift.ImpalaInternalServiceConstants;
-import com.cloudera.impala.thrift.TAccessLevel;
-import com.cloudera.impala.thrift.TCatalogObjectType;
-import com.cloudera.impala.thrift.TColumn;
-import com.cloudera.impala.thrift.THdfsFileBlock;
-import com.cloudera.impala.thrift.THdfsPartition;
-import com.cloudera.impala.thrift.THdfsPartitionLocation;
-import com.cloudera.impala.thrift.THdfsTable;
-import com.cloudera.impala.thrift.TNetworkAddress;
-import com.cloudera.impala.thrift.TPartitionKeyValue;
-import com.cloudera.impala.thrift.TResultRow;
-import com.cloudera.impala.thrift.TResultSet;
-import com.cloudera.impala.thrift.TResultSetMetadata;
-import com.cloudera.impala.thrift.TTable;
-import com.cloudera.impala.thrift.TTableDescriptor;
-import com.cloudera.impala.thrift.TTableType;
-import com.cloudera.impala.util.AvroSchemaConverter;
-import com.cloudera.impala.util.AvroSchemaParser;
-import com.cloudera.impala.util.AvroSchemaUtils;
-import com.cloudera.impala.util.FsPermissionChecker;
-import com.cloudera.impala.util.HdfsCachingUtil;
-import com.cloudera.impala.util.ListMap;
-import com.cloudera.impala.util.MetaStoreUtil;
-import com.cloudera.impala.util.TAccessLevelUtil;
-import com.cloudera.impala.util.TResultRowBuilder;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Internal representation of table-related metadata of a file-resident table on a
- * Hadoop filesystem. The table data can be accessed through libHDFS (which is more of
- * an abstraction over Hadoop's FileSystem class rather than DFS specifically). A
- * partitioned table can even span multiple filesystems.
- *
- * This class is not thread-safe. Clients of this class need to protect against
- * concurrent updates using external locking (see CatalogOpExecutor class).
- *
- * Owned by Catalog instance.
- * The partition keys constitute the clustering columns.
- *
- */
-public class HdfsTable extends Table {
- // hive's default value for table property 'serialization.null.format'
- private static final String DEFAULT_NULL_COLUMN_VALUE = "\\N";
-
- // Name of default partition for unpartitioned tables
- private static final String DEFAULT_PARTITION_NAME = "";
-
- // Number of times to retry fetching the partitions from the HMS should an error occur.
- private final static int NUM_PARTITION_FETCH_RETRIES = 5;
-
- // Table property key for skip.header.line.count
- public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count";
-
- // An invalid network address, which will always be treated as remote.
- private final static TNetworkAddress REMOTE_NETWORK_ADDRESS =
- new TNetworkAddress("remote*addr", 0);
-
- // Minimum block size in bytes allowed for synthetic file blocks (other than the last
- // block, which may be shorter).
- private final static long MIN_SYNTHETIC_BLOCK_SIZE = 1024 * 1024;
-
- // string to indicate NULL. set in load() from table properties
- private String nullColumnValue_;
-
- // hive uses this string for NULL partition keys. Set in load().
- private String nullPartitionKeyValue_;
-
- // Avro schema of this table if this is an Avro table, otherwise null. Set in load().
- private String avroSchema_ = null;
-
- // Set to true if any of the partitions have Avro data.
- private boolean hasAvroData_ = false;
-
- // True if this table's metadata is marked as cached. Does not necessarily mean the
- // data is cached or that all/any partitions are cached.
- private boolean isMarkedCached_ = false;
-
- private static boolean hasLoggedDiskIdFormatWarning_ = false;
-
- // Array of sorted maps storing the association between partition values and
- // partition ids. There is one sorted map per partition key.
- // TODO: We should not populate this for HdfsTable objects stored in the catalog
- // server.
- private ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> partitionValuesMap_ =
- Lists.newArrayList();
-
- // Array of partition id sets that correspond to partitions with null values
- // in the partition keys; one set per partition key.
- private ArrayList<HashSet<Long>> nullPartitionIds_ = Lists.newArrayList();
-
- // Map of partition ids to HdfsPartitions.
- private HashMap<Long, HdfsPartition> partitionMap_ = Maps.newHashMap();
-
- // Map of partition name to HdfsPartition object. Used for speeding up
- // table metadata loading.
- private HashMap<String, HdfsPartition> nameToPartitionMap_ = Maps.newHashMap();
-
- // Store all the partition ids of an HdfsTable.
- private HashSet<Long> partitionIds_ = Sets.newHashSet();
-
- // Maximum size (in bytes) of incremental stats the catalog is allowed to serialize per
- // table. This limit is set as a safety check, to prevent the JVM from hitting a
- // maximum array limit of 1GB (or OOM) while building the thrift objects to send to
- // impalads.
- public static final long MAX_INCREMENTAL_STATS_SIZE_BYTES = 200 * 1024 * 1024;
-
- // Estimate (in bytes) of the incremental stats size per column per partition
- public static final long STATS_SIZE_PER_COLUMN_BYTES = 400;
-
- // Bi-directional map between an integer index and a unique datanode
- // TNetworkAddresses, each of which contains blocks of 1 or more
- // files in this table. The network addresses are stored using IP
- // address as the host name. Each FileBlock specifies a list of
- // indices within this hostIndex_ to specify which nodes contain
- // replicas of the block.
- private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<TNetworkAddress>();
-
- private HdfsPartitionLocationCompressor partitionLocationCompressor_;
-
- // Map of file names to file descriptors for each partition location (directory).
- private Map<String, Map<String, FileDescriptor>>
- perPartitionFileDescMap_ = Maps.newHashMap();
-
- // Total number of Hdfs files in this table. Set in load().
- private long numHdfsFiles_;
-
- // Sum of sizes of all Hdfs files in this table. Set in load().
- private long totalHdfsBytes_;
-
- // True iff the table's partitions are located on more than one filesystem.
- private boolean multipleFileSystems_ = false;
-
- // Base Hdfs directory where files of this table are stored.
- // For unpartitioned tables it is simply the path where all files live.
- // For partitioned tables it is the root directory
- // under which partition dirs are placed.
- protected String hdfsBaseDir_;
-
- // List of FieldSchemas that correspond to the non-partition columns. Used when
- // describing this table and its partitions to the HMS (e.g. as part of an alter table
- // operation), when only non-partition columns are required.
- private final List<FieldSchema> nonPartFieldSchemas_ = Lists.newArrayList();
-
- // Flag to check if the table schema has been loaded. Used as a precondition
- // for setAvroSchema().
- private boolean isSchemaLoaded_ = false;
-
- private final static Logger LOG = LoggerFactory.getLogger(HdfsTable.class);
-
- // Caching this configuration object makes calls to getFileSystem much quicker
- // (saves ~50ms on a standard plan)
- // TODO(henry): confirm that this is thread safe - cursory inspection of the class
- // and its usage in getFileSystem suggests it should be.
- private static final Configuration CONF = new Configuration();
-
- private static final boolean SUPPORTS_VOLUME_ID;
-
- // Wrapper around a FileSystem object to hash based on the underlying FileSystem's
- // scheme and authority.
- private static class FsKey {
- FileSystem filesystem;
-
- public FsKey(FileSystem fs) { filesystem = fs; }
-
- @Override
- public int hashCode() { return filesystem.getUri().hashCode(); }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) return true;
- if (o != null && o instanceof FsKey) {
- URI uri = filesystem.getUri();
- URI otherUri = ((FsKey)o).filesystem.getUri();
- return uri.equals(otherUri);
- }
- return false;
- }
-
- @Override
- public String toString() { return filesystem.getUri().toString(); }
- }
-
- // Keeps track of newly added THdfsFileBlock metadata and its corresponding
- // BlockLocation. For each i, blocks.get(i) corresponds to locations.get(i). Once
- // all the new file blocks are collected, the disk volume IDs are retrieved in one
- // batched DFS call.
- private static class FileBlocksInfo {
- final List<THdfsFileBlock> blocks = Lists.newArrayList();
- final List<BlockLocation> locations = Lists.newArrayList();
-
- public void addBlocks(List<THdfsFileBlock> b, List<BlockLocation> l) {
- Preconditions.checkState(b.size() == l.size());
- blocks.addAll(b);
- locations.addAll(l);
- }
- }
-
- public HdfsTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTbl,
- Db db, String name, String owner) {
- super(id, msTbl, db, name, owner);
- partitionLocationCompressor_ =
- new HdfsPartitionLocationCompressor(numClusteringCols_);
- }
-
- static {
- SUPPORTS_VOLUME_ID =
- CONF.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
- }
-
- /**
- * Returns a disk id (0-based) index from the Hdfs VolumeId object.
- * There is currently no public API to get at the volume id. We'll have to get it
- * by accessing the internals.
- */
- private static int getDiskId(VolumeId hdfsVolumeId) {
- // Initialize the diskId as -1 to indicate it is unknown
- int diskId = -1;
-
- if (hdfsVolumeId != null) {
- // TODO: this is a hack and we'll have to address this by getting the
- // public API. Also, we need to be very mindful of this when we change
- // the version of HDFS.
- String volumeIdString = hdfsVolumeId.toString();
- // This is the hacky part. The toString is currently the underlying id
- // encoded as hex.
- byte[] volumeIdBytes = StringUtils.hexStringToByte(volumeIdString);
- if (volumeIdBytes != null && volumeIdBytes.length == 4) {
- diskId = Bytes.toInt(volumeIdBytes);
- } else if (!hasLoggedDiskIdFormatWarning_) {
- LOG.warn("wrong disk id format: " + volumeIdString);
- hasLoggedDiskIdFormatWarning_ = true;
- }
- }
- return diskId;
- }
-
- public boolean spansMultipleFileSystems() { return multipleFileSystems_; }
-
- /**
- * Returns true if the table resides at a location which supports caching (e.g. HDFS).
- */
- public boolean isLocationCacheable() {
- return FileSystemUtil.isPathCacheable(new Path(getLocation()));
- }
-
- /**
- * Returns true if the table and all its partitions reside at locations which
- * support caching (e.g. HDFS).
- */
- public boolean isCacheable() {
- if (!isLocationCacheable()) return false;
- if (!isMarkedCached() && numClusteringCols_ > 0) {
- for (HdfsPartition partition: getPartitions()) {
- if (partition.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
- continue;
- }
- if (!partition.isCacheable()) {
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- * Queries the filesystem to load the file block metadata (e.g. DFS blocks) for the
- * given file. Adds the newly created block metadata and block location to the
- * perFsFileBlocks, so that the disk IDs for each block can be retrieved with one
- * call to DFS.
- */
- private void loadBlockMetadata(FileSystem fs, FileStatus file, FileDescriptor fd,
- HdfsFileFormat fileFormat, Map<FsKey, FileBlocksInfo> perFsFileBlocks) {
- Preconditions.checkNotNull(fd);
- Preconditions.checkNotNull(perFsFileBlocks);
- Preconditions.checkArgument(!file.isDirectory());
- LOG.debug("load block md for " + name_ + " file " + fd.getFileName());
-
- if (!FileSystemUtil.hasGetFileBlockLocations(fs)) {
- synthesizeBlockMetadata(fs, fd, fileFormat);
- return;
- }
- try {
- BlockLocation[] locations = fs.getFileBlockLocations(file, 0, file.getLen());
- Preconditions.checkNotNull(locations);
-
- // Loop over all blocks in the file.
- for (BlockLocation loc: locations) {
- Preconditions.checkNotNull(loc);
- // Get the location of all block replicas in ip:port format.
- String[] blockHostPorts = loc.getNames();
- // Get the hostnames for all block replicas. Used to resolve which hosts
- // contain cached data. The results are returned in the same order as
- // block.getNames() so it allows us to match a host specified as ip:port to
- // corresponding hostname using the same array index.
- String[] blockHostNames = loc.getHosts();
- Preconditions.checkState(blockHostNames.length == blockHostPorts.length);
- // Get the hostnames that contain cached replicas of this block.
- Set<String> cachedHosts =
- Sets.newHashSet(Arrays.asList(loc.getCachedHosts()));
- Preconditions.checkState(cachedHosts.size() <= blockHostNames.length);
-
- // Now enumerate all replicas of the block, adding any unknown hosts
- // to hostMap_/hostList_. The host ID (index in to the hostList_) for each
- // replica is stored in replicaHostIdxs.
- List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
- blockHostPorts.length);
- for (int i = 0; i < blockHostPorts.length; ++i) {
- TNetworkAddress networkAddress = BlockReplica.parseLocation(blockHostPorts[i]);
- Preconditions.checkState(networkAddress != null);
- replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
- cachedHosts.contains(blockHostNames[i])));
- }
- fd.addFileBlock(new FileBlock(loc.getOffset(), loc.getLength(), replicas));
- }
- // Remember the THdfsFileBlocks and corresponding BlockLocations. Once all the
- // blocks are collected, the disk IDs will be queried in one batch per filesystem.
- addPerFsFileBlocks(perFsFileBlocks, fs, fd.getFileBlocks(),
- Arrays.asList(locations));
- } catch (IOException e) {
- throw new RuntimeException("couldn't determine block locations for path '" +
- file.getPath() + "':\n" + e.getMessage(), e);
- }
- }
-
- /**
- * For filesystems that don't override getFileBlockLocations, synthesize file blocks
- * by manually splitting the file range into fixed-size blocks. That way, scan
- * ranges can be derived from file blocks as usual. All synthesized blocks are given
- * an invalid network address so that the scheduler will treat them as remote.
- */
- private void synthesizeBlockMetadata(FileSystem fs, FileDescriptor fd,
- HdfsFileFormat fileFormat) {
- long start = 0;
- long remaining = fd.getFileLength();
- // Workaround HADOOP-11584 by using the filesystem default block size rather than
- // the block size from the FileStatus.
- // TODO: after HADOOP-11584 is resolved, get the block size from the FileStatus.
- long blockSize = fs.getDefaultBlockSize();
- if (blockSize < MIN_SYNTHETIC_BLOCK_SIZE) blockSize = MIN_SYNTHETIC_BLOCK_SIZE;
- if (!fileFormat.isSplittable(HdfsCompression.fromFileName(fd.getFileName()))) {
- blockSize = remaining;
- }
- while (remaining > 0) {
- long len = Math.min(remaining, blockSize);
- List<BlockReplica> replicas = Lists.newArrayList(
- new BlockReplica(hostIndex_.getIndex(REMOTE_NETWORK_ADDRESS), false));
- fd.addFileBlock(new FileBlock(start, len, replicas));
- remaining -= len;
- start += len;
- }
- }
-
- /**
- * Populates disk/volume ID metadata inside the newly created THdfsFileBlocks.
- * perFsFileBlocks maps from each filesystem to a FileBLocksInfo. The first list
- * contains the newly created THdfsFileBlocks and the second contains the
- * corresponding BlockLocations.
- */
- private void loadDiskIds(Map<FsKey, FileBlocksInfo> perFsFileBlocks) {
- if (!SUPPORTS_VOLUME_ID) return;
- // Loop over each filesystem. If the filesystem is DFS, retrieve the volume IDs
- // for all the blocks.
- for (FsKey fsKey: perFsFileBlocks.keySet()) {
- FileSystem fs = fsKey.filesystem;
- // Only DistributedFileSystem has getFileBlockStorageLocations(). It's not even
- // part of the FileSystem interface, so we'll need to downcast.
- if (!(fs instanceof DistributedFileSystem)) continue;
-
- LOG.trace("Loading disk ids for: " + getFullName() + ". nodes: " +
- hostIndex_.size() + ". filesystem: " + fsKey);
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- FileBlocksInfo blockLists = perFsFileBlocks.get(fsKey);
- Preconditions.checkNotNull(blockLists);
- BlockStorageLocation[] storageLocs = null;
- try {
- // Get the BlockStorageLocations for all the blocks
- storageLocs = dfs.getFileBlockStorageLocations(blockLists.locations);
- } catch (IOException e) {
- LOG.error("Couldn't determine block storage locations for filesystem " +
- fs + ":\n" + e.getMessage());
- continue;
- }
- if (storageLocs == null || storageLocs.length == 0) {
- LOG.warn("Attempted to get block locations for filesystem " + fs +
- " but the call returned no results");
- continue;
- }
- if (storageLocs.length != blockLists.locations.size()) {
- // Block locations and storage locations didn't match up.
- LOG.error("Number of block storage locations not equal to number of blocks: "
- + "#storage locations=" + Long.toString(storageLocs.length)
- + " #blocks=" + Long.toString(blockLists.locations.size()));
- continue;
- }
- long unknownDiskIdCount = 0;
- // Attach volume IDs given by the storage location to the corresponding
- // THdfsFileBlocks.
- for (int locIdx = 0; locIdx < storageLocs.length; ++locIdx) {
- VolumeId[] volumeIds = storageLocs[locIdx].getVolumeIds();
- THdfsFileBlock block = blockLists.blocks.get(locIdx);
- // Convert opaque VolumeId to 0 based ids.
- // TODO: the diskId should be eventually retrievable from Hdfs when the
- // community agrees this API is useful.
- int[] diskIds = new int[volumeIds.length];
- for (int i = 0; i < volumeIds.length; ++i) {
- diskIds[i] = getDiskId(volumeIds[i]);
- if (diskIds[i] < 0) ++unknownDiskIdCount;
- }
- FileBlock.setDiskIds(diskIds, block);
- }
- if (unknownDiskIdCount > 0) {
- LOG.warn("Unknown disk id count for filesystem " + fs + ":" + unknownDiskIdCount);
- }
- }
- }
-
- @Override
- public TCatalogObjectType getCatalogObjectType() {
- return TCatalogObjectType.TABLE;
- }
- public boolean isMarkedCached() { return isMarkedCached_; }
-
- public Collection<HdfsPartition> getPartitions() { return partitionMap_.values(); }
- public Map<Long, HdfsPartition> getPartitionMap() { return partitionMap_; }
- public Map<String, HdfsPartition> getNameToPartitionMap() {
- return nameToPartitionMap_;
- }
- public Set<Long> getNullPartitionIds(int i) { return nullPartitionIds_.get(i); }
- public HdfsPartitionLocationCompressor getPartitionLocationCompressor() {
- return partitionLocationCompressor_;
- }
- public Set<Long> getPartitionIds() { return partitionIds_; }
- public TreeMap<LiteralExpr, HashSet<Long>> getPartitionValueMap(int i) {
- return partitionValuesMap_.get(i);
- }
-
- /**
- * Returns the value Hive is configured to use for NULL partition key values.
- * Set during load.
- */
- public String getNullPartitionKeyValue() { return nullPartitionKeyValue_; }
- public String getNullColumnValue() { return nullColumnValue_; }
-
- /*
- * Returns the storage location (HDFS path) of this table.
- */
- public String getLocation() {
- return super.getMetaStoreTable().getSd().getLocation();
- }
-
- List<FieldSchema> getNonPartitionFieldSchemas() { return nonPartFieldSchemas_; }
-
- // True if Impala has HDFS write permissions on the hdfsBaseDir (for an unpartitioned
- // table) or if Impala has write permissions on all partition directories (for
- // a partitioned table).
- public boolean hasWriteAccess() {
- return TAccessLevelUtil.impliesWriteAccess(accessLevel_);
- }
-
- /**
- * Returns the first location (HDFS path) that Impala does not have WRITE access
- * to, or an null if none is found. For an unpartitioned table, this just
- * checks the hdfsBaseDir. For a partitioned table it checks all partition directories.
- */
- public String getFirstLocationWithoutWriteAccess() {
- if (getMetaStoreTable() == null) return null;
-
- if (getMetaStoreTable().getPartitionKeysSize() == 0) {
- if (!TAccessLevelUtil.impliesWriteAccess(accessLevel_)) {
- return hdfsBaseDir_;
- }
- } else {
- for (HdfsPartition partition: partitionMap_.values()) {
- if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
- return partition.getLocation();
- }
- }
- }
- return null;
- }
-
- /**
- * Gets the HdfsPartition matching the given partition spec. Returns null if no match
- * was found.
- */
- public HdfsPartition getPartition(
- List<PartitionKeyValue> partitionSpec) {
- List<TPartitionKeyValue> partitionKeyValues = Lists.newArrayList();
- for (PartitionKeyValue kv: partitionSpec) {
- String value = PartitionKeyValue.getPartitionKeyValueString(
- kv.getLiteralValue(), getNullPartitionKeyValue());
- partitionKeyValues.add(new TPartitionKeyValue(kv.getColName(), value));
- }
- return getPartitionFromThriftPartitionSpec(partitionKeyValues);
- }
-
- /**
- * Gets the HdfsPartition matching the Thrift version of the partition spec.
- * Returns null if no match was found.
- */
- public HdfsPartition getPartitionFromThriftPartitionSpec(
- List<TPartitionKeyValue> partitionSpec) {
- // First, build a list of the partition values to search for in the same order they
- // are defined in the table.
- List<String> targetValues = Lists.newArrayList();
- Set<String> keys = Sets.newHashSet();
- for (FieldSchema fs: getMetaStoreTable().getPartitionKeys()) {
- for (TPartitionKeyValue kv: partitionSpec) {
- if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
- targetValues.add(kv.getValue().toLowerCase());
- // Same key was specified twice
- if (!keys.add(kv.getName().toLowerCase())) {
- return null;
- }
- }
- }
- }
-
- // Make sure the number of values match up and that some values were found.
- if (targetValues.size() == 0 ||
- (targetValues.size() != getMetaStoreTable().getPartitionKeysSize())) {
- return null;
- }
-
- // Search through all the partitions and check if their partition key values
- // match the values being searched for.
- for (HdfsPartition partition: partitionMap_.values()) {
- if (partition.isDefaultPartition()) continue;
- List<LiteralExpr> partitionValues = partition.getPartitionValues();
- Preconditions.checkState(partitionValues.size() == targetValues.size());
- boolean matchFound = true;
- for (int i = 0; i < targetValues.size(); ++i) {
- String value;
- if (partitionValues.get(i) instanceof NullLiteral) {
- value = getNullPartitionKeyValue();
- } else {
- value = partitionValues.get(i).getStringValue();
- Preconditions.checkNotNull(value);
- // See IMPALA-252: we deliberately map empty strings on to
- // NULL when they're in partition columns. This is for
- // backwards compatibility with Hive, and is clearly broken.
- if (value.isEmpty()) value = getNullPartitionKeyValue();
- }
- if (!targetValues.get(i).equals(value.toLowerCase())) {
- matchFound = false;
- break;
- }
- }
- if (matchFound) {
- return partition;
- }
- }
- return null;
- }
-
- /**
- * Create columns corresponding to fieldSchemas. Throws a TableLoadingException if the
- * metadata is incompatible with what we support.
- */
- private void addColumnsFromFieldSchemas(List<FieldSchema> fieldSchemas)
- throws TableLoadingException {
- int pos = colsByPos_.size();
- for (FieldSchema s: fieldSchemas) {
- Type type = parseColumnType(s);
- // Check if we support partitioning on columns of such a type.
- if (pos < numClusteringCols_ && !type.supportsTablePartitioning()) {
- throw new TableLoadingException(
- String.format("Failed to load metadata for table '%s' because of " +
- "unsupported partition-column type '%s' in partition column '%s'",
- getFullName(), type.toString(), s.getName()));
- }
-
- Column col = new Column(s.getName(), type, s.getComment(), pos);
- addColumn(col);
- ++pos;
- }
- }
-
- /**
- * Clear the partitions of an HdfsTable and the associated metadata.
- */
- private void resetPartitions() {
- partitionIds_.clear();
- partitionMap_.clear();
- nameToPartitionMap_.clear();
- partitionValuesMap_.clear();
- nullPartitionIds_.clear();
- perPartitionFileDescMap_.clear();
- // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
- for (int i = 0; i < numClusteringCols_; ++i) {
- getColumns().get(i).getStats().setNumNulls(0);
- getColumns().get(i).getStats().setNumDistinctValues(0);
- partitionValuesMap_.add(Maps.<LiteralExpr, HashSet<Long>>newTreeMap());
- nullPartitionIds_.add(Sets.<Long>newHashSet());
- }
- numHdfsFiles_ = 0;
- totalHdfsBytes_ = 0;
- }
-
- /**
- * Resets any partition metadata, creates the default partition and sets the base
- * table directory path as well as the caching info from the HMS table.
- */
- private void initializePartitionMetadata(
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws CatalogException {
- Preconditions.checkNotNull(msTbl);
- resetPartitions();
- hdfsBaseDir_ = msTbl.getSd().getLocation();
- // INSERT statements need to refer to this if they try to write to new partitions
- // Scans don't refer to this because by definition all partitions they refer to
- // exist.
- addDefaultPartition(msTbl.getSd());
-
- // We silently ignore cache directives that no longer exist in HDFS, and remove
- // non-existing cache directives from the parameters.
- isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
- }
-
- /**
- * Create HdfsPartition objects corresponding to 'msPartitions' and add them to this
- * table's partition list. Any partition metadata will be reset and loaded from
- * scratch.
- *
- * If there are no partitions in the Hive metadata, a single partition is added with no
- * partition keys.
- */
- private void loadAllPartitions(
- List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException,
- CatalogException {
- Preconditions.checkNotNull(msTbl);
- initializePartitionMetadata(msTbl);
- // Map of filesystem to the file blocks for new/modified FileDescriptors. Blocks in
- // this map will have their disk volume IDs information (re)loaded. This is used to
- // speed up the incremental refresh of a table's metadata by skipping unmodified,
- // previously loaded blocks.
- Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap();
- if (msTbl.getPartitionKeysSize() == 0) {
- Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty());
- // This table has no partition key, which means it has no declared partitions.
- // We model partitions slightly differently to Hive - every file must exist in a
- // partition, so add a single partition with no keys which will get all the
- // files in the table's root directory.
- HdfsPartition part = createPartition(msTbl.getSd(), null, blocksToLoad);
- if (isMarkedCached_) part.markCached();
- addPartition(part);
- Path location = new Path(hdfsBaseDir_);
- FileSystem fs = location.getFileSystem(CONF);
- if (fs.exists(location)) {
- accessLevel_ = getAvailableAccessLevel(fs, location);
- }
- } else {
- for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
- HdfsPartition partition = createPartition(msPartition.getSd(), msPartition,
- blocksToLoad);
- addPartition(partition);
- // If the partition is null, its HDFS path does not exist, and it was not added
- // to this table's partition list. Skip the partition.
- if (partition == null) continue;
- if (msPartition.getParameters() != null) {
- partition.setNumRows(getRowCount(msPartition.getParameters()));
- }
- if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
- // TODO: READ_ONLY isn't exactly correct because the it's possible the
- // partition does not have READ permissions either. When we start checking
- // whether we can READ from a table, this should be updated to set the
- // table's access level to the "lowest" effective level across all
- // partitions. That is, if one partition has READ_ONLY and another has
- // WRITE_ONLY the table's access level should be NONE.
- accessLevel_ = TAccessLevel.READ_ONLY;
- }
- }
- }
- loadDiskIds(blocksToLoad);
- }
-
- /**
- * Gets the AccessLevel that is available for Impala for this table based on the
- * permissions Impala has on the given path. If the path does not exist, recurses up
- * the path until a existing parent directory is found, and inherit access permissions
- * from that.
- */
- private TAccessLevel getAvailableAccessLevel(FileSystem fs, Path location)
- throws IOException {
- FsPermissionChecker permissionChecker = FsPermissionChecker.getInstance();
- while (location != null) {
- if (fs.exists(location)) {
- FsPermissionChecker.Permissions perms =
- permissionChecker.getPermissions(fs, location);
- if (perms.canReadAndWrite()) {
- return TAccessLevel.READ_WRITE;
- } else if (perms.canRead()) {
- return TAccessLevel.READ_ONLY;
- } else if (perms.canWrite()) {
- return TAccessLevel.WRITE_ONLY;
- }
- return TAccessLevel.NONE;
- }
- location = location.getParent();
- }
- // Should never get here.
- Preconditions.checkNotNull(location, "Error: no path ancestor exists");
- return TAccessLevel.NONE;
- }
-
- /**
- * Creates a new HdfsPartition object to be added to HdfsTable's partition list.
- * Partitions may be empty, or may not even exist in the filesystem (a partition's
- * location may have been changed to a new path that is about to be created by an
- * INSERT). Also loads the block metadata for this partition. Returns new partition
- * if successful or null if none was created.
- *
- * Throws CatalogException if the supplied storage descriptor contains metadata that
- * Impala can't understand.
- */
- public HdfsPartition createPartition(StorageDescriptor storageDescriptor,
- org.apache.hadoop.hive.metastore.api.Partition msPartition)
- throws CatalogException {
- Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap();
- HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition,
- blocksToLoad);
- loadDiskIds(blocksToLoad);
- return hdfsPartition;
- }
-
- /**
- * Creates a new HdfsPartition from a specified StorageDescriptor and an HMS partition
- * object. It populates 'perFsFileBlock' with the blocks to be loaded for each file in
- * the partition directory.
- */
- private HdfsPartition createPartition(StorageDescriptor storageDescriptor,
- org.apache.hadoop.hive.metastore.api.Partition msPartition,
- Map<FsKey, FileBlocksInfo> perFsFileBlocks)
- throws CatalogException {
- HdfsStorageDescriptor fileFormatDescriptor =
- HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
- List<LiteralExpr> keyValues = Lists.newArrayList();
- boolean isMarkedCached = isMarkedCached_;
- if (msPartition != null) {
- isMarkedCached = HdfsCachingUtil.validateCacheParams(msPartition.getParameters());
- // Load key values
- for (String partitionKey: msPartition.getValues()) {
- Type type = getColumns().get(keyValues.size()).getType();
- // Deal with Hive's special NULL partition key.
- if (partitionKey.equals(nullPartitionKeyValue_)) {
- keyValues.add(NullLiteral.create(type));
- } else {
- try {
- keyValues.add(LiteralExpr.create(partitionKey, type));
- } catch (Exception ex) {
- LOG.warn("Failed to create literal expression of type: " + type, ex);
- throw new CatalogException("Invalid partition key value of type: " + type,
- ex);
- }
- }
- }
- try {
- Expr.analyze(keyValues, null);
- } catch (AnalysisException e) {
- // should never happen
- throw new IllegalStateException(e);
- }
- }
-
- Path partDirPath = new Path(storageDescriptor.getLocation());
- try {
- FileSystem fs = partDirPath.getFileSystem(CONF);
- multipleFileSystems_ = multipleFileSystems_ ||
- !FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs);
- updatePartitionFds(partDirPath, isMarkedCached,
- fileFormatDescriptor.getFileFormat(), perFsFileBlocks);
- HdfsPartition partition =
- new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor,
- perPartitionFileDescMap_.get(partDirPath.toString()).values(),
- getAvailableAccessLevel(fs, partDirPath));
- partition.checkWellFormed();
- return partition;
- } catch (IOException e) {
- throw new CatalogException("Error initializing partition", e);
- }
- }
-
- /**
- * Add the given THdfsFileBlocks and BlockLocations to the FileBlockInfo for the
- * given filesystem.
- */
- private void addPerFsFileBlocks(Map<FsKey, FileBlocksInfo> fsToBlocks, FileSystem fs,
- List<THdfsFileBlock> blocks, List<BlockLocation> locations) {
- FsKey fsKey = new FsKey(fs);
- FileBlocksInfo infos = fsToBlocks.get(fsKey);
- if (infos == null) {
- infos = new FileBlocksInfo();
- fsToBlocks.put(fsKey, infos);
- }
- infos.addBlocks(blocks, locations);
- }
-
- /**
- * Adds the partition to the HdfsTable. Throws a CatalogException if the partition
- * already exists in this table.
- */
- public void addPartition(HdfsPartition partition) throws CatalogException {
- if (partitionMap_.containsKey(partition.getId())) {
- throw new CatalogException(String.format("Partition %s already exists in table %s",
- partition.getPartitionName(), getFullName()));
- }
- partitionMap_.put(partition.getId(), partition);
- totalHdfsBytes_ += partition.getSize();
- numHdfsFiles_ += partition.getNumFileDescriptors();
- updatePartitionMdAndColStats(partition);
- }
-
- /**
- * Updates the HdfsTable's partition metadata, i.e. adds the id to the HdfsTable and
- * populates structures used for speeding up partition pruning/lookup. Also updates
- * column stats.
- */
- private void updatePartitionMdAndColStats(HdfsPartition partition) {
- if (partition.getPartitionValues().size() != numClusteringCols_) return;
- partitionIds_.add(partition.getId());
- for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
- ColumnStats stats = getColumns().get(i).getStats();
- LiteralExpr literal = partition.getPartitionValues().get(i);
- // Store partitions with null partition values separately
- if (literal instanceof NullLiteral) {
- stats.setNumNulls(stats.getNumNulls() + 1);
- if (nullPartitionIds_.get(i).isEmpty()) {
- stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
- }
- nullPartitionIds_.get(i).add(partition.getId());
- continue;
- }
- HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
- if (partitionIds == null) {
- partitionIds = Sets.newHashSet();
- partitionValuesMap_.get(i).put(literal, partitionIds);
- stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
- }
- partitionIds.add(partition.getId());
- }
- nameToPartitionMap_.put(partition.getPartitionName(), partition);
- }
-
- /**
- * Drops the partition having the given partition spec from HdfsTable. Cleans up its
- * metadata from all the mappings used to speed up partition pruning/lookup.
- * Also updates partition column statistics. Given partitionSpec must match exactly
- * one partition.
- * Returns the HdfsPartition that was dropped. If the partition does not exist, returns
- * null.
- */
- public HdfsPartition dropPartition(List<TPartitionKeyValue> partitionSpec) {
- return dropPartition(getPartitionFromThriftPartitionSpec(partitionSpec));
- }
-
- /**
- * Drops a partition and updates partition column statistics. Returns the
- * HdfsPartition that was dropped or null if the partition does not exist.
- */
- private HdfsPartition dropPartition(HdfsPartition partition) {
- if (partition == null) return null;
- totalHdfsBytes_ -= partition.getSize();
- numHdfsFiles_ -= partition.getNumFileDescriptors();
- Preconditions.checkArgument(partition.getPartitionValues().size() ==
- numClusteringCols_);
- Long partitionId = partition.getId();
- // Remove the partition id from the list of partition ids and other mappings.
- partitionIds_.remove(partitionId);
- partitionMap_.remove(partitionId);
- nameToPartitionMap_.remove(partition.getPartitionName());
- perPartitionFileDescMap_.remove(partition.getLocation());
- for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
- ColumnStats stats = getColumns().get(i).getStats();
- LiteralExpr literal = partition.getPartitionValues().get(i);
- // Check if this is a null literal.
- if (literal instanceof NullLiteral) {
- nullPartitionIds_.get(i).remove(partitionId);
- stats.setNumNulls(stats.getNumNulls() - 1);
- if (nullPartitionIds_.get(i).isEmpty()) {
- stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
- }
- continue;
- }
- HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
- // If there are multiple partition ids corresponding to a literal, remove
- // only this id. Otherwise, remove the <literal, id> pair.
- if (partitionIds.size() > 1) partitionIds.remove(partitionId);
- else {
- partitionValuesMap_.get(i).remove(literal);
- stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
- }
- }
- return partition;
- }
-
- private void addDefaultPartition(StorageDescriptor storageDescriptor)
- throws CatalogException {
- // Default partition has no files and is not referred to by scan nodes. Data sinks
- // refer to this to understand how to create new partitions.
- HdfsStorageDescriptor hdfsStorageDescriptor =
- HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
- HdfsPartition partition = HdfsPartition.defaultPartition(this,
- hdfsStorageDescriptor);
- partitionMap_.put(partition.getId(), partition);
- }
-
- @Override
- public void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
- load(reuseMetadata, client, msTbl, true, true, null);
- }
-
- /**
- * Loads table metadata from the Hive Metastore.
- *
- * If 'reuseMetadata' is false, performs a full metadata load from the Hive Metastore,
- * including partition and file metadata. Otherwise, loads metadata incrementally and
- * updates this HdfsTable in place so that it is in sync with the Hive Metastore.
- *
- * Depending on the operation that triggered the table metadata load, not all the
- * metadata may need to be updated. If 'partitionsToUpdate' is not null, it specifies a
- * list of partitions for which metadata should be updated. Otherwise, all partition
- * metadata will be updated from the Hive Metastore.
- *
- * If 'loadFileMetadata' is true, file metadata of the specified partitions are
- * reloaded while reusing existing file descriptors to avoid loading metadata for files
- * that haven't changed. If 'partitionsToUpdate' is not specified, file metadata of all
- * the partitions are loaded.
- *
- * If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore.
- *
- * There are several cases where existing file descriptors might be reused incorrectly:
- * 1. an ALTER TABLE ADD PARTITION or dynamic partition insert is executed through
- * Hive. This does not update the lastDdlTime.
- * 2. Hdfs rebalancer is executed. This changes the block locations but doesn't update
- * the mtime (file modification time).
- * If any of these occur, user has to execute "invalidate metadata" to invalidate the
- * metadata cache of the table and trigger a fresh load.
- */
- public void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl, boolean loadFileMetadata,
- boolean loadTableSchema, Set<String> partitionsToUpdate)
- throws TableLoadingException {
- // turn all exceptions into TableLoadingException
- msTable_ = msTbl;
- try {
- if (loadTableSchema) loadSchema(client, msTbl);
- if (reuseMetadata && getCatalogVersion() == Catalog.INITIAL_CATALOG_VERSION) {
- // This is the special case of CTAS that creates a 'temp' table that does not
- // actually exist in the Hive Metastore.
- initializePartitionMetadata(msTbl);
- updateStatsFromHmsTable(msTbl);
- return;
- }
- // Load partition and file metadata
- if (!reuseMetadata) {
- // Load all partitions from Hive Metastore, including file metadata.
- LOG.debug("load table from Hive Metastore: " + db_.getName() + "." + name_);
- List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
- Lists.newArrayList();
- msPartitions.addAll(MetaStoreUtil.fetchAllPartitions(
- client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES));
- loadAllPartitions(msPartitions, msTbl);
- } else {
- // Incrementally update this table's partitions and file metadata
- LOG.debug("incremental update for table: " + db_.getName() + "." + name_);
- Preconditions.checkState(partitionsToUpdate == null || loadFileMetadata);
- updateMdFromHmsTable(msTbl);
- if (msTbl.getPartitionKeysSize() == 0) {
- if (loadFileMetadata) updateUnpartitionedTableFileMd();
- } else {
- updatePartitionsFromHms(client, partitionsToUpdate, loadFileMetadata);
- }
- }
- if (loadTableSchema) setAvroSchema(client, msTbl);
- updateStatsFromHmsTable(msTbl);
- } catch (TableLoadingException e) {
- throw e;
- } catch (Exception e) {
- throw new TableLoadingException("Failed to load metadata for table: " + name_, e);
- }
- }
-
- /**
- * Updates the table metadata, including 'hdfsBaseDir_', 'isMarkedCached_',
- * and 'accessLevel_' from 'msTbl'. Throws an IOException if there was an error
- * accessing the table location path.
- */
- private void updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
- throws IOException {
- Preconditions.checkNotNull(msTbl);
- hdfsBaseDir_ = msTbl.getSd().getLocation();
- isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
- if (msTbl.getPartitionKeysSize() == 0) {
- Path location = new Path(hdfsBaseDir_);
- FileSystem fs = location.getFileSystem(CONF);
- if (fs.exists(location)) {
- accessLevel_ = getAvailableAccessLevel(fs, location);
- }
- }
- setMetaStoreTable(msTbl);
- }
-
- /**
- * Updates the file metadata of an unpartitioned HdfsTable.
- */
- private void updateUnpartitionedTableFileMd() throws CatalogException {
- LOG.debug("update unpartitioned table: " + name_);
- resetPartitions();
- org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
- Preconditions.checkNotNull(msTbl);
- addDefaultPartition(msTbl.getSd());
- Map<FsKey, FileBlocksInfo> fileBlocksToLoad = Maps.newHashMap();
- HdfsPartition part = createPartition(msTbl.getSd(), null, fileBlocksToLoad);
- addPartition(part);
- loadDiskIds(fileBlocksToLoad);
- if (isMarkedCached_) part.markCached();
- }
-
- /**
- * Updates the partitions of an HdfsTable so that they are in sync with the Hive
- * Metastore. It reloads partitions that were marked 'dirty' by doing a DROP + CREATE.
- * It removes from this table partitions that no longer exist in the Hive Metastore and
- * adds partitions that were added externally (e.g. using Hive) to the Hive Metastore
- * but do not exist in this table. If 'loadFileMetadata' is true, it triggers
- * file/block metadata reload for the partitions specified in 'partitionsToUpdate', if
- * any, or for all the table partitions if 'partitionsToUpdate' is null.
- */
- private void updatePartitionsFromHms(IMetaStoreClient client,
- Set<String> partitionsToUpdate, boolean loadFileMetadata) throws Exception {
- LOG.debug("sync table partitions: " + name_);
- org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
- Preconditions.checkNotNull(msTbl);
- Preconditions.checkState(msTbl.getPartitionKeysSize() != 0);
- Preconditions.checkState(loadFileMetadata || partitionsToUpdate == null);
-
- // Retrieve all the partition names from the Hive Metastore. We need this to
- // identify the delta between partitions of the local HdfsTable and the table entry
- // in the Hive Metastore. Note: This is a relatively "cheap" operation
- // (~.3 secs for 30K partitions).
- Set<String> msPartitionNames = Sets.newHashSet();
- msPartitionNames.addAll(
- client.listPartitionNames(db_.getName(), name_, (short) -1));
- // Names of loaded partitions in this table
- Set<String> partitionNames = Sets.newHashSet();
- // Partitions for which file metadata must be loaded
- List<HdfsPartition> partitionsToUpdateFileMd = Lists.newArrayList();
- // Partitions that need to be dropped and recreated from scratch
- List<HdfsPartition> dirtyPartitions = Lists.newArrayList();
- // Partitions that need to be removed from this table. That includes dirty
- // partitions as well as partitions that were removed from the Hive Metastore.
- List<HdfsPartition> partitionsToRemove = Lists.newArrayList();
- // Identify dirty partitions that need to be loaded from the Hive Metastore and
- // partitions that no longer exist in the Hive Metastore.
- for (HdfsPartition partition: partitionMap_.values()) {
- // Ignore the default partition
- if (partition.isDefaultPartition()) continue;
- // Remove partitions that don't exist in the Hive Metastore. These are partitions
- // that were removed from HMS using some external process, e.g. Hive.
- if (!msPartitionNames.contains(partition.getPartitionName())) {
- partitionsToRemove.add(partition);
- }
- if (partition.isDirty()) {
- // Dirty partitions are updated by removing them from table's partition
- // list and loading them from the Hive Metastore.
- dirtyPartitions.add(partition);
- } else {
- if (partitionsToUpdate == null && loadFileMetadata) {
- partitionsToUpdateFileMd.add(partition);
- }
- }
- Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
- partitionNames.add(partition.getPartitionName());
- }
- partitionsToRemove.addAll(dirtyPartitions);
- for (HdfsPartition partition: partitionsToRemove) dropPartition(partition);
- // Load dirty partitions from Hive Metastore
- loadPartitionsFromMetastore(dirtyPartitions, client);
-
- // Identify and load partitions that were added in the Hive Metastore but don't
- // exist in this table.
- Set<String> newPartitionsInHms = Sets.difference(msPartitionNames, partitionNames);
- loadPartitionsFromMetastore(newPartitionsInHms, client);
- // If a list of modified partitions (old and new) is specified, don't reload file
- // metadata for the new ones as they have already been detected in HMS and have been
- // reloaded by loadPartitionsFromMetastore().
- if (partitionsToUpdate != null) {
- partitionsToUpdate.removeAll(newPartitionsInHms);
- }
-
- // Load file metadata. Until we have a notification mechanism for when a
- // file changes in hdfs, it is sometimes required to reload all the file
- // descriptors and block metadata of a table (e.g. REFRESH statement).
- if (loadFileMetadata) {
- if (partitionsToUpdate != null) {
- // Only reload file metadata of partitions specified in 'partitionsToUpdate'
- Preconditions.checkState(partitionsToUpdateFileMd.isEmpty());
- partitionsToUpdateFileMd = getPartitionsByName(partitionsToUpdate);
- }
- loadPartitionFileMetadata(partitionsToUpdateFileMd);
- }
- }
-
- /**
- * Returns the HdfsPartition objects associated with the specified list of partition
- * names.
- */
- private List<HdfsPartition> getPartitionsByName(Collection<String> partitionNames) {
- List<HdfsPartition> partitions = Lists.newArrayList();
- for (String partitionName: partitionNames) {
- String partName = DEFAULT_PARTITION_NAME;
- if (partitionName.length() > 0) {
- // Trim the last trailing char '/' from each partition name
- partName = partitionName.substring(0, partitionName.length()-1);
- }
- Preconditions.checkState(nameToPartitionMap_.containsKey(partName),
- "Invalid partition name: " + partName);
- partitions.add(nameToPartitionMap_.get(partName));
- }
- return partitions;
- }
-
- /**
- * Updates the cardinality of this table from an HMS table. Sets the cardinalities of
- * dummy/default partitions for the case of unpartitioned tables.
- */
- private void updateStatsFromHmsTable(
- org.apache.hadoop.hive.metastore.api.Table msTbl) {
- numRows_ = getRowCount(msTbl.getParameters());
- // For unpartitioned tables set the numRows in its partitions
- // to the table's numRows.
- if (numClusteringCols_ == 0 && !partitionMap_.isEmpty()) {
- // Unpartitioned tables have a 'dummy' partition and a default partition.
- // Temp tables used in CTAS statements have one partition.
- Preconditions.checkState(partitionMap_.size() == 2 || partitionMap_.size() == 1);
- for (HdfsPartition p: partitionMap_.values()) {
- p.setNumRows(numRows_);
- }
- }
- }
-
- /**
- * Returns whether the table has the 'skip.header.line.count' property set.
- */
- private boolean hasSkipHeaderLineCount() {
- String key = TBL_PROP_SKIP_HEADER_LINE_COUNT;
- org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
- if (msTbl == null) return false;
- String inputFormat = msTbl.getSd().getInputFormat();
- return msTbl.getParameters().containsKey(key);
- }
-
- /**
- * Parses and returns the value of the 'skip.header.line.count' table property. If the
- * value is not set for the table, returns 0. If parsing fails or a value < 0 is found,
- * the error parameter is updated to contain an error message.
- */
- public int parseSkipHeaderLineCount(StringBuilder error) {
- if (!hasSkipHeaderLineCount()) return 0;
- return parseSkipHeaderLineCount(getMetaStoreTable().getParameters(), error);
- }
-
- /**
- * Parses and returns the value of the 'skip.header.line.count' table property. The
- * caller must ensure that the property is contained in the 'tblProperties' map. If
- * parsing fails or a value < 0 is found, the error parameter is updated to contain an
- * error message.
- */
- public static int parseSkipHeaderLineCount(Map<String, String> tblProperties,
- StringBuilder error) {
- Preconditions.checkState(tblProperties != null);
- String key = TBL_PROP_SKIP_HEADER_LINE_COUNT;
- Preconditions.checkState(tblProperties.containsKey(key));
- // Try to parse.
- String string_value = tblProperties.get(key);
- int skipHeaderLineCount = 0;
- String error_msg = String.format("Invalid value for table property %s: %s (value " +
- "must be an integer >= 0)", key, string_value);
- try {
- skipHeaderLineCount = Integer.parseInt(string_value);
- } catch (NumberFormatException exc) {
- error.append(error_msg);
- }
- if (skipHeaderLineCount < 0) error.append(error_msg);
- return skipHeaderLineCount;
- }
-
- /**
- * Sets avroSchema_ if the table or any of the partitions in the table are stored
- * as Avro. Additionally, this method also reconciles the schema if the column
- * definitions from the metastore differ from the Avro schema.
- */
- private void setAvroSchema(IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
- Preconditions.checkState(isSchemaLoaded_);
- String inputFormat = msTbl.getSd().getInputFormat();
- if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO
- || hasAvroData_) {
- // Look for Avro schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
- // taking precedence.
- List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
- schemaSearchLocations.add(
- getMetaStoreTable().getSd().getSerdeInfo().getParameters());
- schemaSearchLocations.add(getMetaStoreTable().getParameters());
-
- avroSchema_ = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
-
- if (avroSchema_ == null) {
- // No Avro schema was explicitly set in the table metadata, so infer the Avro
- // schema from the column definitions.
- Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas(
- msTbl.getSd().getCols(), getFullName());
- avroSchema_ = inferredSchema.toString();
- }
- String serdeLib = msTbl.getSd().getSerdeInfo().getSerializationLib();
- if (serdeLib == null ||
- serdeLib.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) {
- // If the SerDe library is null or set to LazySimpleSerDe or is null, it
- // indicates there is an issue with the table metadata since Avro table need a
- // non-native serde. Instead of failing to load the table, fall back to
- // using the fields from the storage descriptor (same as Hive).
- return;
- } else {
- // Generate new FieldSchemas from the Avro schema. This step reconciles
- // differences in the column definitions and the Avro schema. For
- // Impala-created tables this step is not necessary because the same
- // resolution is done during table creation. But Hive-created tables
- // store the original column definitions, and not the reconciled ones.
- List<ColumnDef> colDefs =
- ColumnDef.createFromFieldSchemas(msTbl.getSd().getCols());
- List<ColumnDef> avroCols = AvroSchemaParser.parse(avroSchema_);
- StringBuilder warning = new StringBuilder();
- List<ColumnDef> reconciledColDefs =
- AvroSchemaUtils.reconcileSchemas(colDefs, avroCols, warning);
- if (warning.length() != 0) {
- LOG.warn(String.format("Warning while loading table %s:\n%s",
- getFullName(), warning.toString()));
- }
- AvroSchemaUtils.setFromSerdeComment(reconciledColDefs);
- // Reset and update nonPartFieldSchemas_ to the reconcicled colDefs.
- nonPartFieldSchemas_.clear();
- nonPartFieldSchemas_.addAll(ColumnDef.toFieldSchemas(reconciledColDefs));
- // Update the columns as per the reconciled colDefs and re-load stats.
- clearColumns();
- addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
- addColumnsFromFieldSchemas(nonPartFieldSchemas_);
- loadAllColumnStats(client);
- }
- }
- }
-
- /**
- * Loads table schema and column stats from Hive Metastore.
- */
- private void loadSchema(IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
- nonPartFieldSchemas_.clear();
- // set nullPartitionKeyValue from the hive conf.
- nullPartitionKeyValue_ = client.getConfigValue(
- "hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__");
-
- // set NULL indicator string from table properties
- nullColumnValue_ =
- msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
- if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE;
-
- // Excludes partition columns.
- nonPartFieldSchemas_.addAll(msTbl.getSd().getCols());
-
- // The number of clustering columns is the number of partition keys.
- numClusteringCols_ = msTbl.getPartitionKeys().size();
- partitionLocationCompressor_.setClusteringColumns(numClusteringCols_);
- clearColumns();
- // Add all columns to the table. Ordering is important: partition columns first,
- // then all other columns.
- addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
- addColumnsFromFieldSchemas(nonPartFieldSchemas_);
- loadAllColumnStats(client);
- isSchemaLoaded_ = true;
- }
-
- /**
- * Loads partitions from the Hive Metastore and adds them to the internal list of
- * table partitions.
- */
- private void loadPartitionsFromMetastore(List<HdfsPartition> partitions,
- IMetaStoreClient client) throws Exception {
- Preconditions.checkNotNull(partitions);
- if (partitions.isEmpty()) return;
- LOG.info(String.format("Incrementally updating %d/%d partitions.",
- partitions.size(), partitionMap_.size()));
- Set<String> partitionNames = Sets.newHashSet();
- for (HdfsPartition part: partitions) {
- partitionNames.add(part.getPartitionName());
- }
- loadPartitionsFromMetastore(partitionNames, client);
- }
-
- /**
- * Loads from the Hive Metastore the partitions that correspond to the specified
- * 'partitionNames' and adds them to the internal list of table partitions.
- */
- private void loadPartitionsFromMetastore(Set<String> partitionNames,
- IMetaStoreClient client) throws Exception {
- Preconditions.checkNotNull(partitionNames);
- if (partitionNames.isEmpty()) return;
- // Load partition metadata from Hive Metastore.
- List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
- Lists.newArrayList();
- msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
- Lists.newArrayList(partitionNames), db_.getName(), name_));
-
- Map<FsKey, FileBlocksInfo> fileBlocksToLoad = Maps.newHashMap();
- for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
- HdfsPartition partition =
- createPartition(msPartition.getSd(), msPartition, fileBlocksToLoad);
- addPartition(partition);
- // If the partition is null, its HDFS path does not exist, and it was not added to
- // this table's partition list. Skip the partition.
- if (partition == null) continue;
- if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true;
- if (msPartition.getParameters() != null) {
- partition.setNumRows(getRowCount(msPartition.getParameters()));
- }
- if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
- // TODO: READ_ONLY isn't exactly correct because the it's possible the
- // partition does not have READ permissions either. When we start checking
- // whether we can READ from a table, this should be updated to set the
- // table's access level to the "lowest" effective level across all
- // partitions. That is, if one partition has READ_ONLY and another has
- // WRITE_ONLY the table's access level should be NONE.
- accessLevel_ = TAccessLevel.READ_ONLY;
- }
- }
- loadDiskIds(fileBlocksToLoad);
- }
-
- /**
- * Loads the file descriptors and block metadata of a list of partitions.
- */
- private void loadPartitionFileMetadata(List<HdfsPartition> partitions)
- throws Exception {
- Preconditions.checkNotNull(partitions);
- LOG.info(String.format("loading file metadata for %d partitions",
- partitions.size()));
- org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
- Preconditions.checkNotNull(msTbl);
- HdfsStorageDescriptor fileFormatDescriptor =
- HdfsStorageDescriptor.fromStorageDescriptor(this.name_, msTbl.getSd());
- Map<FsKey, FileBlocksInfo> perFsFileBlocks = Maps.newHashMap();
- for (HdfsPartition part: partitions) {
- org.apache.hadoop.hive.metastore.api.Partition msPart =
- part.toHmsPartition();
- StorageDescriptor sd = null;
- if (msPart == null) {
- // If this partition is not stored in the Hive Metastore (e.g. default partition
- // of an unpartitioned table), use the table's storage descriptor to load file
- // metadata.
- sd = msTbl.getSd();
- } else {
- sd = msPart.getSd();
- }
- loadPartitionFileMetadata(sd, part, fileFormatDescriptor.getFileFormat(),
- perFsFileBlocks);
- }
- loadDiskIds(perFsFileBlocks);
- }
-
- /**
- * Loads the file descriptors and block metadata of a partition from its
- * StorageDescriptor. If 'partition' does not have an entry in the Hive Metastore,
- * 'storageDescriptor' is the StorageDescriptor of the associated table. Populates
- * 'perFsFileBlocks' with file block info and updates table metadata.
- */
- private void loadPartitionFileMetadata(StorageDescriptor storageDescriptor,
- HdfsPartition partition, HdfsFileFormat fileFormat,
- Map<FsKey, FileBlocksInfo> perFsFileBlocks) throws Exception {
- Preconditions.checkNotNull(storageDescriptor);
- Preconditions.checkNotNull(partition);
- org.apache.hadoop.hive.metastore.api.Partition msPart =
- partition.toHmsPartition();
- boolean isMarkedCached = isMarkedCached_;
- if (msPart != null) {
- isMarkedCached = HdfsCachingUtil.validateCacheParams(msPart.getParameters());
- }
- Path partDirPath = new Path(storageDescriptor.getLocation());
- FileSystem fs = partDirPath.getFileSystem(CONF);
- if (!fs.exists(partDirPath)) return;
-
- String partitionDir = partDirPath.toString();
- numHdfsFiles_ -= partition.getNumFileDescriptors();
- totalHdfsBytes_ -= partition.getSize();
- Preconditions.checkState(numHdfsFiles_ >= 0 && totalHdfsBytes_ >= 0);
- updatePartitionFds(partDirPath, isMarkedCached, fileFormat, perFsFileBlocks);
- List<FileDescriptor> fileDescs = Lists.newArrayList(
- perPartitionFileDescMap_.get(partDirPath.toString()).values());
- partition.setFileDescriptors(fileDescs);
- totalHdfsBytes_ += partition.getSize();
- numHdfsFiles_ += fileDescs.size();
- }
-
- /**
- * Updates the file descriptors of a partition directory specified by 'partitionPath'
- * and loads block metadata of new/modified files. Reuses existing FileDescriptors for
- * unchanged files (indicated by unchanged mtime). The one exception is if the
- * partition is marked as cached (HDFS caching) in which case the block metadata
- * cannot be reused. Otherwise, creates new FileDescriptors and adds them to
- * perPartitionFileDescMap_. 'fileFomat' is the file format of the files in this
- * partition directory. 'perFsFileBlocks' is populated with the loaded block metadata.
- */
- private void updatePartitionFds(Path partitionPath,
- boolean isMarkedCached, HdfsFileFormat fileFormat,
- Map<FsKey, FileBlocksInfo> perFsFileBlocks) throws CatalogException {
- Preconditions.checkNotNull(partitionPath);
- String partPathStr = partitionPath.toString();
- try {
- FileSystem fs = partitionPath.getFileSystem(CONF);
- if (!fs.exists(partitionPath)) {
- perPartitionFileDescMap_.put(
- partPathStr, Maps.<String, FileDescriptor>newHashMap());
- return;
- }
- Map<String, FileDescriptor> fileDescMap =
- perPartitionFileDescMap_.get(partPathStr);
- Map<String, FileDescriptor> newFileDescMap = Maps.newHashMap();
- // Get all the files in the partition directory
- for (FileStatus fileStatus: fs.listStatus(partitionPath)) {
- String fileName = fileStatus.getPath().getName().toString();
- if (fileStatus.isDirectory() || FileSystemUtil.isHiddenFile(fileName) ||
- HdfsCompression.fromFileName(fileName) == HdfsCompression.LZO_INDEX) {
- // Ignore directory, hidden file starting with . or _, and LZO index files
- // If a directory is erroneously created as a subdirectory of a partition dir
- // we should ignore it and move on. Hive will not recurse into directories.
- // Skip index files, these are read by the LZO scanner directly.
- continue;
- }
- FileDescriptor fd = fileDescMap != null ? fileDescMap.get(fileName) : null;
- if (fd == null || isMarkedCached || fd.getFileLength() != fileStatus.getLen()
- || fd.getModificationTime() != fileStatus.getModificationTime()) {
- // Metadata of cached or modified files are not reused.
- fd = new FileDescriptor(fileName, fileStatus.getLen(),
- fileStatus.getModificationTime());
- loadBlockMetadata(fs, fileStatus, fd, fileFormat, perFsFileBlocks);
- }
- newFileDescMap.put(fileName, fd);
- }
- perPartitionFileDescMap_.put(partPathStr, newFileDescMap);
- } catch (Exception e) {
- throw new CatalogException("Failed to retrieve file descriptors from path " +
- partitionPath, e);
- }
- }
-
- @Override
- protected List<String> getColumnNamesWithHmsStats() {
- List<String> ret = Lists.newArrayList();
- // Only non-partition columns have column stats in the HMS.
- for (Column column: getColumns().subList(numClusteringCols_, getColumns().size())) {
- ret.add(column.getName().toLowerCase());
- }
- return ret;
- }
-
- @Override
- protected synchronized void loadFromThrift(TTable thriftTable)
- throws TableLoadingException {
- super.loadFromThrift(thriftTable);
- THdfsTable hdfsTable = thriftTable.getHdfs_table();
- Preconditions.checkState(hdfsTable.getPartition_prefixes() instanceof ArrayList<?>);
- partitionLocationCompressor_ = new HdfsPartitionLocationCompressor(
- numClusteringCols_, (ArrayList<String>)hdfsTable.getPartition_prefixes());
- hdfsBaseDir_ = hdfsTable.getHdfsBaseDir();
- nullColumnValue_ = hdfsTable.nullColumnValue;
- nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue;
- multipleFileSystems_ = hdfsTable.multiple_filesystems;
- Preconditions.checkState(hdfsTable.getNetwork_addresses() instanceof ArrayList<?>);
- hostIndex_.populate((ArrayList<TNetworkAddress>)hdfsTable.getNetwork_addresses());
- resetPartitions();
-
- try {
- for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) {
- HdfsPartition hdfsPart =
- HdfsPartition.fromThrift(this, part.getKey(), part.getValue());
- addPartition(hdfsPart);
- }
- } catch (CatalogException e) {
- throw new TableLoadingException(e.getMessage());
- }
- avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : null;
- isMarkedCached_ =
- HdfsCachingUtil.validateCacheParams(getMetaStoreTable().getParameters());
- }
-
- @Override
- public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) {
- // Create thrift descriptors to send to the BE. The BE does not
- // need any information below the THdfsPartition level.
- TTableDescriptor tableDesc = new TTableDescriptor(id_.asInt(), TTableType.HDFS_TABLE,
- getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
- tableDesc.setHdfsTable(getTHdfsTable(false, referencedPartitions));
- return tableDesc;
- }
-
- @Override
- public TTable toThrift() {
- // Send all metadata between the catalog service and the FE.
- TTable table = super.toThrift();
- table.setTable_type(TTableType.HDFS_TABLE);
- table.setHdfs_table(getTHdfsTable(true, null));
- return table;
- }
-
- /**
- * Create a THdfsTable corresponding to this HdfsTable. If includeFileDesc is true,
- * then then all partitions and THdfsFileDescs of each partition should be included.
- * Otherwise, don't include any THdfsFileDescs, and include only those partitions in
- * the refPartitions set (the backend doesn't need metadata for unreferenced
- * partitions). To prevent the catalog from hitting an OOM error while trying to
- * serialize large partition incremental stats, we estimate the stats size and filter
- * the incremental stats data from partition objects if the estimate exceeds
- * MAX_INCREMENTAL_STATS_SIZE_BYTES.
- */
- private THdfsTable getTHdfsTable(boolean includeFileDesc, Set<Long> refPartitions) {
- // includeFileDesc implies all partitions should be included (refPartitions == null).
- Preconditions.checkState(!includeFileDesc || refPartitions == null);
- int numPartitions =
- (refPartitions == null) ? partitionMap_.values().size() : refPartitions.size();
- long statsSizeEstimate =
- numPartitions * getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
- boolean includeIncrementalStats =
- (statsSizeEstimate < MAX_INCREMENTAL_STATS_SIZE_BYTES);
- Map<Long, THdfsPartition> idToPartition = Maps.newHashMap();
- for (HdfsPartition partition: partitionMap_.values()) {
- long id = partition.getId();
- if (refPartitions == null || refPartitions.contains(id)) {
- idToPartition.put(id,
- partition.toThrift(includeFileDesc, includeIncrementalStats));
- }
- }
- THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
- nullPartitionKeyValue_, nullColumnValue_, idToPartition);
- hdfsTable.setAvroSchema(avroSchema_);
- hdfsTable.setMultiple_filesystems(multipleFileSystems_);
- if (includeFileDesc) {
- // Network addresses are used only by THdfsFileBlocks which are inside
- // THdfsFileDesc, so include network addreses only when including THdfsFileDesc.
- hdfsTable.setNetwork_addresses(hostIndex_.getList());
- }
- hdfsTable.setPartition_prefixes(partitionLocationCompressor_.getPrefixes());
- return hdfsTable;
- }
-
- public long getNumHdfsFiles() { return numHdfsFiles_; }
- public long getTotalHdfsBytes() { return totalHdfsBytes_; }
- public String getHdfsBaseDir() { return hdfsBaseDir_; }
- public boolean isAvroTable() { return avroSchema_ != null; }
-
- /**
- * Get the index of hosts that store replicas of blocks of this table.
- */
- public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
-
- /**
- * Returns the file format that the majority of partitions are stored in.
- */
- public HdfsFileFormat getMajorityFormat() {
- Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newHashMap();
- for (HdfsPartition partition: partitionMap_.values()) {
- HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
- Integer numPartitions = numPartitionsByFormat.get(format);
- if (numPartitions == null) {
- numPartitions = Integer.valueOf(1);
- } else {
- numPartitions = Integer.valueOf(numPartitions.intValue() + 1);
- }
- numPartitionsByFormat.put(format, numPartitions);
- }
-
- int maxNumPartitions = Integer.MIN_VALUE;
- HdfsFileFormat majorityFormat = null;
- for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) {
- if (entry.getValue().intValue() > maxNumPartitions) {
- majorityFormat = entry.getKey();
- maxNumPartitions = entry.getValue().intValue();
- }
- }
- Preconditions.checkNotNull(majorityFormat);
- return majorityFormat;
- }
-
- /**
- * Returns the HDFS paths corresponding to HdfsTable partitions that don't exist in
- * the Hive Metastore. An HDFS path is represented as a list of strings values, one per
- * partition key column.
- */
- public List<List<String>> getPathsWithoutPartitions() throws CatalogException {
- List<List<LiteralExpr>> existingPartitions = new ArrayList<List<LiteralExpr>>();
- // Get the list of partition values of existing partitions in Hive Metastore.
- for (HdfsPartition partition: partitionMap_.values()) {
- if (partition.isDefaultPartition()) continue;
- existingPartitions.add(partition.getPartitionValues());
- }
-
- List<String> partitionKeys = Lists.newArrayList();
- for (int i = 0; i < numClusteringCols_; ++i) {
- partitionKeys.add(getColumns().get(i).getName());
- }
- Path basePath = new Path(hdfsBaseDir_);
- List<List<String>> partitionsNotInHms = new ArrayList<List<String>>();
- try {
- getAllPartitionsNotInHms(basePath, partitionKeys, existingPartitions,
- partitionsNotInHms);
- } catch (Exception e) {
- throw new CatalogException(String.format("Failed to recover partitions for %s " +
- "with exception:%s.", getFullName(), e));
- }
- return partitionsNotInHms;
- }
-
- /**
- * Returns all partitions which match the partition keys directory structure and pass
- * type compatibility check. Also these partitions are not already part of the table.
- */
- private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys,
- List<List<LiteralExpr>> existingPartitions,
- List<List<String>> partitionsNotInHms) throws IOException {
- FileSystem fs = path.getFileSystem(CONF);
- // Check whether the base directory exists.
- if (!fs.exists(path)) return;
-
- List<String> partitionValues = Lists.newArrayList();
- List<LiteralExpr> partitionExprs = Lists.newArrayList();
- getAllPartitionsNotInHms(path, partitionKeys, 0, fs, partitionValues,
- partitionExprs, existingPartitions, partitionsNotInHms);
- }
-
- /**
- * Returns all partitions which match the partition keys directory structure and pass
- * the type compatibility check.
- *
- * path e.g. c1=1/c2=2/c3=3
- * partitionKeys The ordered partition keys. e.g.("c1", "c2", "c3")
- * depth The start position in partitionKeys to match the path name.
- * partitionValues The partition values used to create a partition.
- * partitionExprs The list of LiteralExprs which is used to avoid duplicate partitions.
- * E.g. Having /c1=0001 and /c1=01, we should make sure only one partition
- * will be added.
- * existingPartitions All partitions which exist in Hive Metastore or newly added.
- * partitionsNotInHms Contains all the recovered partitions.
- */
- private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys,
- int depth, FileSystem fs, List<String> partitionValues,
- List<LiteralExpr> partitionExprs, List<List<LiteralExpr>> existingPartitions,
- List<List<String>> partitionsNotInHms) throws IOException {
- if (depth == partitionKeys.size()) {
- if (existingPartitions.contains(partitionExprs)) {
- LOG.trace(String.format("Skip recovery of path '%s' because it already exists " +
- "in metastore", path.toString()));
- } else {
- partitionsNotInHms.add(partitionValues);
- existingPartitions.add(partitionExprs);
- }
- return;
- }
-
- FileStatus[] statuses = fs.listStatus(path);
- for (FileStatus status: statuses) {
- if (!status.isDirectory()) continue;
- Pair<String, LiteralExpr> keyValues =
- getTypeCompatibleValue(status.getPath(), partitionKeys.get(depth));
- if (keyValues == null) continue;
-
- List<String> currentPartitionValues = Lists.newArrayList(partitionValues);
- List<LiteralExpr> currentPartitionExprs = Lists.newArrayList(partitionExprs);
- currentPartitionValues.add(keyValues.first);
- currentPartitionExprs.add(keyValues.second);
- getAllPartitionsNotInHms(status.getPath(), partitionKeys, depth + 1, fs,
- currentPartitionValues, currentPartitionExprs,
- existingPartitions, partitionsNotInHms);
- }
- }
-
- /**
- * Checks that the last component of 'path' is of the form "<partitionkey>=<v>"
- * where 'v' is a type-compatible value from the domain of the 'partitionKey' column.
- * If not, returns null, otherwise returns a Pair instance, the first element is the
- * original value, the second element is the LiteralExpr created from the original
- * value.
- */
- private Pair<String, LiteralExpr> getTypeCompatibleValue(Path path,
- String partitionKey) {
- String partName[] = path.getName().split("=");
- if (partName.length != 2 || !partName[0].equals(partitionKey)) return null;
-
- // Check Type compatibility for Partition value.
- Column column = getColumn(partName[0]);
- Preconditions.checkNotNull(column);
- Type type = column.getType();
- LiteralExpr expr = null;
- if (!partName[1].equals(getNullPartitionKeyValue())) {
- try {
- expr = LiteralExpr.create(partName[1], type);
- // Skip large value which exceeds the MAX VALUE of specified Type.
- if (expr instanceof NumericLiteral) {
- if (NumericLiteral.isOverflow(((NumericLiteral)expr).getValue(), type)) {
- LOG.warn(String.format("Skip the overflow value (%s) for Type (%s).",
- partName[1], type.toSql()));
- return null;
- }
- }
- } catch (Exception ex) {
- LOG.debug(String.format("Invalid partition value (%s) for Type (%s).",
- partName[1], type.toSql()));
- return null;
- }
- } else {
- expr = new NullLiteral();
- }
- return new Pair<String, LiteralExpr>(partName[1], expr);
- }
-
- /**
- * Returns statistics on this table as a tabular result set. Used for the
- * SHOW TABLE STATS statement. The schema of the returned TResultSet is set
- * inside this method.
- */
- public TResultSet getTableStats() {
- TResultSet result = new TResultSet();
- TResultSetMetadata resultSchema = new TResultSetMetadata();
- result.setSchema(resultSchema);
-
- for (int i = 0; i < numClusteringCols_; ++i) {
- // Add the partition-key values as strings for simplicity.
- Column partCol = getColumns().get(i);
- TColumn colDesc = new TColumn(partCol.getName(), Type.STRING.toThrift());
- resultSchema.addToColumns(colDesc);
- }
-
- resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
- resultSchema.addToColumns(new TColumn("#Files", Type.BIGINT.toThrift()));
- resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
- resultSchema.addToColumns(new TColumn("Bytes Cached", Type.STRING.toThrift()));
- resultSchema.addToColumns(new TColumn("Cache Replication", Type.STRING.toThrift()));
- resultSchema.addToColumns(new TColumn("Format", Type.STRING.toThrift()));
- resultSchema.addToColumns(new TColumn("Incremental stats", Type.STRING.toThrift()));
- resultSchema.addToColumns(new TColumn("Location", Type.STRING.toThrift()));
-
- // Pretty print partitions and their stats.
- ArrayList<HdfsPartition> orderedPartitions =
- Lists.newArrayList(partitionMap_.values());
- Collections.sort(orderedPartitions);
-
- long totalCachedBytes = 0L;
- for (HdfsPartition p: orderedPartitions) {
- // Ignore dummy default partition.
- if (p.isDefaultPartition()) continue;
- TResultRowBuilder rowBuilder = new TResultRowBuilder();
-
- // Add the partition-key values (as strings for simplicity).
- for (LiteralExpr expr: p.getPartitionValues()) {
- rowBuilder.add(expr.getStringValue());
- }
-
- // Add number of rows, files, bytes, cache stats, and file format.
- rowBuilder.add(p.getNumRows()).add(p.getFileDescriptors().size())
- .addBytes(p.getSize());
- if (!p.isMarkedCached()) {
- // Helps to differentiate partitions that have 0B cached versus partitions
- // that are not marked as cached.
- rowBuilder.add("NOT CACHED");
- rowBuilder.add("NOT CACHED");
- } else {
- // Calculate the number the number of bytes that are cached.
- long cachedBytes = 0L;
- for (FileDescriptor fd: p.getFileDescriptors()) {
- for (THdfsFileBlock fb: fd.getFileBlocks()) {
- if (fb.getIs_replica_cached().contains(true)) {
- cachedBytes += fb.getLength();
- }
- }
- }
- totalCachedBytes += cachedBytes;
- rowBuilder.addBytes(cachedBytes);
-
- // Extract cache replication factor from the parameters of the table
- // if the table is not partitioned or directly from the partition.
- Short rep = HdfsCachingUtil.getCachedCacheReplication(
- numClusteringCols_ == 0 ?
- p.getTable().getMetaStoreTable().getParameters() :
- p.getParameters());
- rowBuilder.add(rep.toString());
- }
- rowBuilder.add(p.getInputFormatDescriptor().getFileFormat().toString());
-
- rowBuilder.add(String.valueOf(p.hasIncrementalStats()));
- rowBuilder.add(p.getLocation());
- result.addToRows(rowBuilder.get());
- }
-
- // For partitioned tables add a summary row at the bottom.
- if (numClusteringCols_ > 0) {
- TResultRowBuilder rowBuilder = new TResultRowBuilder();
- int numEmptyCells = numClusteringCols_ - 1;
- rowBuilder.add("Total");
- for (int i = 0; i < numEmptyCells; ++i) {
- rowBuilder.add("");
- }
-
- // Total num rows, files, and bytes (leave format empty).
- rowBuilder.add(numRows_).add(numHdfsFiles_).addBytes(totalHdfsBytes_)
- .addBytes(totalCachedBytes).add("").add("").add("").add("");
- result.addToRows(rowBuilder.get());
- }
- return result;
- }
-
- /**
- * Returns files info for the given dbname/tableName and partition spec.
- * Returns files info for all partitions, if partition spec is null, ordered
- * by partition.
- */
- public TResultSet getFiles(List<TPartitionKeyValue> partitionSpec)
- throws CatalogException {
- TResultSet result = new TResultSet();
- TResultSetMetadata resultSchema = new TResultSetMetadata();
- result.setSchema(resultSchema);
- resultSchema.addToColumns(new TColumn("Path", Type.STRING.toThrift()));
- resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
- resultSchema.addToColumns(new TColumn("Partition", Type.STRING.toThrift()));
- result.setRows(Lists.<TResultRow>newArrayList());
-
- List<HdfsPartition> orderedPartitions = null;
- if (partitionSpec == null) {
- orderedPartitions = Lists.newArrayList(partitionMap_.values());
- Collections.sort(orderedPartitions);
- } else {
- // Get the HdfsPartition object for the given partition spec.
- HdfsPartition partition = getPartitionFromThriftPartitionSpec(partitionSpec);
- Preconditions.checkState(partition != null);
- orderedPartitions = Lists.newArrayList(partition);
- }
-
- for (HdfsPartition p: orderedPartitions) {
- List<FileDescriptor> orderedFds = Lists.newArrayList(p.getFileDescriptors());
- Collections.sort(orderedFds);
- for (FileDescriptor fd: orderedFds) {
- TResultRowBuilder rowBuilder = new TResultRowBuilder();
- rowBuilder.add(p.getLocation() + "/" + fd.getFileName());
- rowBuilder.add(PrintUtils.printBytes(fd.getFileLength()));
- rowBuilder.add(p.getPartitionName());
- result.addToRows(rowBuilder.get());
- }
- }
- return result;
- }
-
- /**
- * Constructs a partition name from a list of TPartitionKeyValue objects.
- */
- public static String constructPartitionName(List<TPartitionKeyValue> partitionSpec) {
- List<String> partitionCols = Lists.newArrayList();
- List<String> partitionVals = Lists.newArrayList();
- for (TPartitionKeyValue kv: partitionSpec) {
- partitionCols.add(kv.getName());
- partitionVals.add(kv.getValue());
- }
- return org.apache.hadoop.hive.common.FileUtils.makePartName(partitionCols,
- partitionVals);
- }
-
- /**
- * Reloads the metadata of partition 'oldPartition' by removing
- * it from the table and reconstructing it from the HMS partition object
- * 'hmsPartition'. If old partition is null then nothing is removed and
- * and partition constructed from 'hmsPartition' is simply added.
- */
- public void reloadPartition(HdfsPartition oldPartition, Partition hmsPartition)
- throws CatalogException {
- HdfsPartition refreshedPartition = createPartition(
- hmsPartition.getSd(), hmsPartition);
- Preconditions.checkArgument(oldPartition == null
- || oldPartition.compareTo(refreshedPart
<TRUNCATED>