You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:46 UTC
[29/61] [partial] incubator-impala git commit: IMPALA-3786: Replace
"cloudera" with "apache" (part 1)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java
deleted file mode 100644
index 475a8d7..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java
+++ /dev/null
@@ -1,261 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.List;
-import java.util.Map;
-
-import com.cloudera.impala.thrift.THdfsFileFormat;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-/**
- * Supported HDFS file formats. Every file format specifies:
- * 1) the input format class
- * 2) the output format class
- * 3) the serialization library class
- * 4) whether scanning complex types from it is supported
- *
- * Important note: Always keep consistent with the classes used in Hive.
- */
-public enum HdfsFileFormat {
- RC_FILE("org.apache.hadoop.hive.ql.io.RCFileInputFormat",
- "org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
- "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
- false, true),
- TEXT("org.apache.hadoop.mapred.TextInputFormat",
- "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
- "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
- false, false),
- LZO_TEXT("com.hadoop.mapred.DeprecatedLzoTextInputFormat",
- "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
- "", false, false),
- SEQUENCE_FILE("org.apache.hadoop.mapred.SequenceFileInputFormat",
- "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
- "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", false,
- true),
- AVRO("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat",
- "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat",
- "org.apache.hadoop.hive.serde2.avro.AvroSerDe",
- false, false),
- PARQUET("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
- "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
- "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
- true, true);
-
- private final String inputFormat_;
- private final String outputFormat_;
- private final String serializationLib_;
-
- // Indicates whether we support scanning complex types for this file format.
- private final boolean isComplexTypesSupported_;
-
- // Indicates whether the file format can skip complex columns in scans and just
- // materialize scalar typed columns. Ignored if isComplexTypesSupported_ is true.
- // TODO: Remove this once we support complex types for all file formats.
- private final boolean canSkipColumnTypes_;
-
- HdfsFileFormat(String inputFormat, String outputFormat, String serializationLib,
- boolean isComplexTypesSupported, boolean canSkipColumnTypes) {
- inputFormat_ = inputFormat;
- outputFormat_ = outputFormat;
- serializationLib_ = serializationLib;
- isComplexTypesSupported_ = isComplexTypesSupported;
- canSkipColumnTypes_ = canSkipColumnTypes;
- }
-
- public String inputFormat() { return inputFormat_; }
- public String outputFormat() { return outputFormat_; }
- public String serializationLib() { return serializationLib_; }
-
- // Impala supports legacy Parquet input formats and treats them internally as the most
- // modern Parquet input format.
- private static final String[] PARQUET_LEGACY_INPUT_FORMATS = {
- "com.cloudera.impala.hive.serde.ParquetInputFormat",
- "parquet.hive.DeprecatedParquetInputFormat",
- "parquet.hive.MapredParquetInputFormat"
- };
-
- private static final Map<String, HdfsFileFormat> VALID_INPUT_FORMATS =
- ImmutableMap.<String, HdfsFileFormat>builder()
- .put(RC_FILE.inputFormat(), RC_FILE)
- .put(TEXT.inputFormat(), TEXT)
- .put(LZO_TEXT.inputFormat(), TEXT)
- .put(SEQUENCE_FILE.inputFormat(), SEQUENCE_FILE)
- .put(AVRO.inputFormat(), AVRO)
- .put(PARQUET.inputFormat(), PARQUET)
- .put(PARQUET_LEGACY_INPUT_FORMATS[0], PARQUET)
- .put(PARQUET_LEGACY_INPUT_FORMATS[1], PARQUET)
- .put(PARQUET_LEGACY_INPUT_FORMATS[2], PARQUET)
- .build();
-
- /**
- * Returns true if the string describes an input format class that we support.
- */
- public static boolean isHdfsInputFormatClass(String inputFormatClass) {
- return VALID_INPUT_FORMATS.containsKey(inputFormatClass);
- }
-
- /**
- * Returns the file format associated with the input format class, or null if
- * the input format class is not supported.
- */
- public static HdfsFileFormat fromHdfsInputFormatClass(String inputFormatClass) {
- Preconditions.checkNotNull(inputFormatClass);
- return VALID_INPUT_FORMATS.get(inputFormatClass);
- }
-
- /**
- * Returns the corresponding enum for a SerDe class name. If classname is not one
- * of our supported formats, throws an IllegalArgumentException like Enum.valueOf
- */
- public static HdfsFileFormat fromJavaClassName(String className) {
- Preconditions.checkNotNull(className);
- if (isHdfsInputFormatClass(className)) return VALID_INPUT_FORMATS.get(className);
- throw new IllegalArgumentException(className);
- }
-
- public static HdfsFileFormat fromThrift(THdfsFileFormat thriftFormat) {
- switch (thriftFormat) {
- case RC_FILE: return HdfsFileFormat.RC_FILE;
- case TEXT: return HdfsFileFormat.TEXT;
- case SEQUENCE_FILE: return HdfsFileFormat.SEQUENCE_FILE;
- case AVRO: return HdfsFileFormat.AVRO;
- case PARQUET: return HdfsFileFormat.PARQUET;
- default:
- throw new RuntimeException("Unknown THdfsFileFormat: "
- + thriftFormat + " - should never happen!");
- }
- }
-
- public THdfsFileFormat toThrift() {
- switch (this) {
- case RC_FILE: return THdfsFileFormat.RC_FILE;
- case TEXT: return THdfsFileFormat.TEXT;
- case SEQUENCE_FILE: return THdfsFileFormat.SEQUENCE_FILE;
- case AVRO: return THdfsFileFormat.AVRO;
- case PARQUET: return THdfsFileFormat.PARQUET;
- default:
- throw new RuntimeException("Unknown HdfsFormat: "
- + this + " - should never happen!");
- }
- }
-
- public String toSql(HdfsCompression compressionType) {
- switch (this) {
- case RC_FILE: return "RCFILE";
- case TEXT:
- if (compressionType == HdfsCompression.LZO ||
- compressionType == HdfsCompression.LZO_INDEX) {
- // TODO: Update this when we can write LZO text.
- // It is not currently possible to create a table with LZO compressed text files
- // in Impala, but this is valid in Hive.
- return String.format("INPUTFORMAT '%s' OUTPUTFORMAT '%s'",
- LZO_TEXT.inputFormat(), LZO_TEXT.outputFormat());
- }
- return "TEXTFILE";
- case SEQUENCE_FILE: return "SEQUENCEFILE";
- case AVRO: return "AVRO";
- case PARQUET: return "PARQUET";
- default:
- throw new RuntimeException("Unknown HdfsFormat: "
- + this + " - should never happen!");
- }
- }
-
- /*
- * Checks whether a file is supported in Impala based on the file extension.
- * Returns true if the file format is supported. If the file format is not
- * supported, then it returns false and 'errorMsg' contains details on the
- * incompatibility.
- *
- * Impala supports LZO, GZIP, SNAPPY and BZIP2 on text files for partitions that have
- * been declared in the metastore as TEXT. LZO files can have their own input format.
- * For now, raise an error on any other type.
- */
- public boolean isFileCompressionTypeSupported(String fileName,
- StringBuilder errorMsg) {
- // Check to see if the file has a compression suffix.
- // TODO: Add LZ4
- HdfsCompression compressionType = HdfsCompression.fromFileName(fileName);
- switch (compressionType) {
- case LZO:
- case LZO_INDEX:
- // Index files are read by the LZO scanner directly.
- case GZIP:
- case SNAPPY:
- case BZIP2:
- case NONE:
- return true;
- case DEFLATE:
- // TODO: Ensure that text/deflate works correctly
- if (this == TEXT) {
- errorMsg.append("Expected compressed text file with {.lzo,.gzip,.snappy,.bz2} "
- + "suffix: " + fileName);
- return false;
- } else {
- return true;
- }
- default:
- errorMsg.append("Unknown compression suffix: " + fileName);
- return false;
- }
- }
-
- /**
- * Returns true if this file format with the given compression format is splittable.
- */
- public boolean isSplittable(HdfsCompression compression) {
- switch (this) {
- case TEXT:
- return compression == HdfsCompression.NONE;
- case RC_FILE:
- case SEQUENCE_FILE:
- case AVRO:
- case PARQUET:
- return true;
- default:
- throw new RuntimeException("Unknown HdfsFormat: "
- + this + " - should never happen!");
- }
- }
-
- /**
- * Returns true if Impala supports scanning complex-typed columns
- * from a table/partition with this file format.
- */
- public boolean isComplexTypesSupported() { return isComplexTypesSupported_; }
-
- /**
- * Returns true if this file format can skip complex typed columns and materialize
- * only scalar typed columns.
- */
- public boolean canSkipComplexTypes() { return canSkipColumnTypes_; }
-
- /**
- * Returns a list with all formats for which isComplexTypesSupported() is true.
- */
- public static List<HdfsFileFormat> complexTypesFormats() {
- List<HdfsFileFormat> result = Lists.newArrayList();
- for (HdfsFileFormat f: values()) {
- if (f.isComplexTypesSupported()) result.add(f);
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java
deleted file mode 100644
index f408468..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java
+++ /dev/null
@@ -1,791 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.LiteralExpr;
-import com.cloudera.impala.analysis.NullLiteral;
-import com.cloudera.impala.analysis.PartitionKeyValue;
-import com.cloudera.impala.analysis.ToSqlUtils;
-import com.cloudera.impala.common.FileSystemUtil;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.thrift.ImpalaInternalServiceConstants;
-import com.cloudera.impala.thrift.TAccessLevel;
-import com.cloudera.impala.thrift.TExpr;
-import com.cloudera.impala.thrift.TExprNode;
-import com.cloudera.impala.thrift.THdfsCompression;
-import com.cloudera.impala.thrift.THdfsFileBlock;
-import com.cloudera.impala.thrift.THdfsFileDesc;
-import com.cloudera.impala.thrift.THdfsPartition;
-import com.cloudera.impala.thrift.TNetworkAddress;
-import com.cloudera.impala.thrift.TPartitionStats;
-import com.cloudera.impala.thrift.TTableStats;
-import com.cloudera.impala.util.HdfsCachingUtil;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Query-relevant information for one table partition. Partitions are comparable
- * based on their partition-key values. The comparison orders partitions in ascending
- * order with NULLs sorting last. The ordering is useful for displaying partitions
- * in SHOW statements.
- */
-public class HdfsPartition implements Comparable<HdfsPartition> {
- /**
- * Metadata for a single file in this partition.
- * TODO: Do we even need this class? Just get rid of it and use the Thrift version?
- */
- static public class FileDescriptor implements Comparable<FileDescriptor> {
- private final THdfsFileDesc fileDescriptor_;
-
- public String getFileName() { return fileDescriptor_.getFile_name(); }
- public long getFileLength() { return fileDescriptor_.getLength(); }
- public THdfsCompression getFileCompression() {
- return fileDescriptor_.getCompression();
- }
- public long getModificationTime() {
- return fileDescriptor_.getLast_modification_time();
- }
- public List<THdfsFileBlock> getFileBlocks() {
- return fileDescriptor_.getFile_blocks();
- }
-
- public THdfsFileDesc toThrift() { return fileDescriptor_; }
-
- public FileDescriptor(String fileName, long fileLength, long modificationTime) {
- Preconditions.checkNotNull(fileName);
- Preconditions.checkArgument(fileLength >= 0);
- fileDescriptor_ = new THdfsFileDesc();
- fileDescriptor_.setFile_name(fileName);
- fileDescriptor_.setLength(fileLength);
- fileDescriptor_.setLast_modification_time(modificationTime);
- fileDescriptor_.setCompression(
- HdfsCompression.fromFileName(fileName).toThrift());
- List<THdfsFileBlock> emptyFileBlockList = Lists.newArrayList();
- fileDescriptor_.setFile_blocks(emptyFileBlockList);
- }
-
- private FileDescriptor(THdfsFileDesc fileDesc) {
- this(fileDesc.getFile_name(), fileDesc.length, fileDesc.last_modification_time);
- for (THdfsFileBlock block: fileDesc.getFile_blocks()) {
- fileDescriptor_.addToFile_blocks(block);
- }
- }
-
- public void addFileBlock(FileBlock blockMd) {
- fileDescriptor_.addToFile_blocks(blockMd.toThrift());
- }
-
- public static FileDescriptor fromThrift(THdfsFileDesc desc) {
- return new FileDescriptor(desc);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("FileName", getFileName())
- .add("Length", getFileLength()).toString();
- }
-
- /**
- * Orders file descriptors lexicographically by file name.
- */
- @Override
- public int compareTo(FileDescriptor otherFd) {
- return getFileName().compareTo(otherFd.getFileName());
- }
- }
-
- /**
- * Represents metadata of a single block replica.
- */
- public static class BlockReplica {
- private final boolean isCached_;
- private final int hostIdx_;
-
- /**
- * Creates a BlockReplica given a host ID/index and a flag specifying whether this
- * replica is cahced. Host IDs are assigned when loading the block metadata in
- * HdfsTable.
- */
- public BlockReplica(int hostIdx, boolean isCached) {
- hostIdx_ = hostIdx;
- isCached_ = isCached;
- }
-
- /**
- * Parses the location (an ip address:port string) of the replica and returns a
- * TNetworkAddress with this information, or null if parsing fails.
- */
- public static TNetworkAddress parseLocation(String location) {
- Preconditions.checkNotNull(location);
- String[] ip_port = location.split(":");
- if (ip_port.length != 2) return null;
- try {
- return new TNetworkAddress(ip_port[0], Integer.parseInt(ip_port[1]));
- } catch (NumberFormatException e) {
- return null;
- }
- }
-
- public boolean isCached() { return isCached_; }
- public int getHostIdx() { return hostIdx_; }
- }
-
- /**
- * File Block metadata
- */
- public static class FileBlock {
- private final THdfsFileBlock fileBlock_;
- private boolean isCached_; // Set to true if there is at least one cached replica.
-
- private FileBlock(THdfsFileBlock fileBlock) {
- fileBlock_ = fileBlock;
- isCached_ = false;
- for (boolean isCached: fileBlock.getIs_replica_cached()) {
- isCached_ |= isCached;
- }
- }
-
- /**
- * Construct a FileBlock given the start offset (in bytes) of the file associated
- * with this block, the length of the block (in bytes), and a list of BlockReplicas.
- * Does not fill diskIds.
- */
- public FileBlock(long offset, long blockLength,
- List<BlockReplica> replicaHostIdxs) {
- Preconditions.checkNotNull(replicaHostIdxs);
- fileBlock_ = new THdfsFileBlock();
- fileBlock_.setOffset(offset);
- fileBlock_.setLength(blockLength);
-
- fileBlock_.setReplica_host_idxs(new ArrayList<Integer>(replicaHostIdxs.size()));
- fileBlock_.setIs_replica_cached(new ArrayList<Boolean>(replicaHostIdxs.size()));
- isCached_ = false;
- for (BlockReplica replica: replicaHostIdxs) {
- fileBlock_.addToReplica_host_idxs(replica.getHostIdx());
- fileBlock_.addToIs_replica_cached(replica.isCached());
- isCached_ |= replica.isCached();
- }
- }
-
- public long getOffset() { return fileBlock_.getOffset(); }
- public long getLength() { return fileBlock_.getLength(); }
- // Returns true if at there at least one cached replica.
- public boolean isCached() { return isCached_; }
- public List<Integer> getReplicaHostIdxs() {
- return fileBlock_.getReplica_host_idxs();
- }
-
- /**
- * Populates the given THdfsFileBlock's list of disk ids with the given disk id
- * values. The number of disk ids must match the number of network addresses
- * set in the file block.
- */
- public static void setDiskIds(int[] diskIds, THdfsFileBlock fileBlock) {
- Preconditions.checkArgument(
- diskIds.length == fileBlock.getReplica_host_idxs().size());
- fileBlock.setDisk_ids(Arrays.asList(ArrayUtils.toObject(diskIds)));
- }
-
- /**
- * Return the disk id of the block in BlockLocation.getNames()[hostIndex]; -1 if
- * disk id is not supported.
- */
- public int getDiskId(int hostIndex) {
- if (fileBlock_.disk_ids == null) return -1;
- return fileBlock_.getDisk_ids().get(hostIndex);
- }
-
- public boolean isCached(int hostIndex) {
- return fileBlock_.getIs_replica_cached().get(hostIndex);
- }
-
- public THdfsFileBlock toThrift() { return fileBlock_; }
-
- public static FileBlock fromThrift(THdfsFileBlock thriftFileBlock) {
- return new FileBlock(thriftFileBlock);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("offset", fileBlock_.offset)
- .add("length", fileBlock_.length)
- .add("#disks", fileBlock_.getDisk_idsSize())
- .toString();
- }
- }
-
- private final HdfsTable table_;
- private final List<LiteralExpr> partitionKeyValues_;
- // estimated number of rows in partition; -1: unknown
- private long numRows_ = -1;
- private static AtomicLong partitionIdCounter_ = new AtomicLong();
-
- // A unique ID for each partition, used to identify a partition in the thrift
- // representation of a table.
- private final long id_;
-
- /*
- * Note: Although you can write multiple formats to a single partition (by changing
- * the format before each write), Hive won't let you read that data and neither should
- * we. We should therefore treat mixing formats inside one partition as user error.
- * It's easy to add per-file metadata to FileDescriptor if this changes.
- */
- private final HdfsStorageDescriptor fileFormatDescriptor_;
- private List<FileDescriptor> fileDescriptors_;
- private HdfsPartitionLocationCompressor.Location location_;
- private final static Logger LOG = LoggerFactory.getLogger(HdfsPartition.class);
- private boolean isDirty_ = false;
- // True if this partition is marked as cached. Does not necessarily mean the data is
- // cached.
- private boolean isMarkedCached_ = false;
- private final TAccessLevel accessLevel_;
-
- // (k,v) pairs of parameters for this partition, stored in the HMS. Used by Impala to
- // store intermediate state for statistics computations.
- private Map<String, String> hmsParameters_;
-
- public HdfsStorageDescriptor getInputFormatDescriptor() {
- return fileFormatDescriptor_;
- }
-
- public boolean isDefaultPartition() {
- return id_ == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID;
- }
-
- /**
- * Returns true if the partition resides at a location which can be cached (e.g. HDFS).
- */
- public boolean isCacheable() {
- return FileSystemUtil.isPathCacheable(new Path(getLocation()));
- }
-
- /**
- * Return a partition name formed by concatenating partition keys and their values,
- * compatible with the way Hive names partitions. Reuses Hive's
- * org.apache.hadoop.hive.common.FileUtils.makePartName() function to build the name
- * string because there are a number of special cases for how partition names are URL
- * escaped.
- * TODO: Consider storing the PartitionKeyValue in HdfsPartition. It would simplify
- * this code would be useful in other places, such as fromThrift().
- */
- public String getPartitionName() {
- List<String> partitionCols = Lists.newArrayList();
- for (int i = 0; i < getTable().getNumClusteringCols(); ++i) {
- partitionCols.add(getTable().getColumns().get(i).getName());
- }
-
- return org.apache.hadoop.hive.common.FileUtils.makePartName(
- partitionCols, getPartitionValuesAsStrings(true));
- }
-
- /**
- * Returns a list of partition values as strings. If mapNullsToHiveKey is true, any NULL
- * value is returned as the table's default null partition key string value, otherwise
- * they are returned as 'NULL'.
- */
- public List<String> getPartitionValuesAsStrings(boolean mapNullsToHiveKey) {
- List<String> ret = Lists.newArrayList();
- for (LiteralExpr partValue: getPartitionValues()) {
- if (mapNullsToHiveKey) {
- ret.add(PartitionKeyValue.getPartitionKeyValueString(
- partValue, getTable().getNullPartitionKeyValue()));
- } else {
- ret.add(partValue.getStringValue());
- }
- }
- return ret;
- }
-
- /**
- * Utility method which returns a string of conjuncts of equality exprs to exactly
- * select this partition (e.g. ((month=2009) AND (year=2012)).
- * TODO: Remove this when the TODO elsewhere in this file to save and expose the
- * list of TPartitionKeyValues has been resolved.
- */
- public String getConjunctSql() {
- List<String> partColSql = Lists.newArrayList();
- for (Column partCol: getTable().getClusteringColumns()) {
- partColSql.add(ToSqlUtils.getIdentSql(partCol.getName()));
- }
-
- List<String> conjuncts = Lists.newArrayList();
- for (int i = 0; i < partColSql.size(); ++i) {
- LiteralExpr partVal = getPartitionValues().get(i);
- String partValSql = partVal.toSql();
- if (partVal instanceof NullLiteral || partValSql.isEmpty()) {
- conjuncts.add(partColSql.get(i) + " IS NULL");
- } else {
- conjuncts.add(partColSql.get(i) + "=" + partValSql);
- }
- }
- return "(" + Joiner.on(" AND " ).join(conjuncts) + ")";
- }
-
- /**
- * Returns a string of the form part_key1=value1/part_key2=value2...
- */
- public String getValuesAsString() {
- StringBuilder partDescription = new StringBuilder();
- for (int i = 0; i < getTable().getNumClusteringCols(); ++i) {
- String columnName = getTable().getColumns().get(i).getName();
- String value = PartitionKeyValue.getPartitionKeyValueString(
- getPartitionValues().get(i),
- getTable().getNullPartitionKeyValue());
- partDescription.append(columnName + "=" + value);
- if (i != getTable().getNumClusteringCols() - 1) partDescription.append("/");
- }
- return partDescription.toString();
- }
-
- /**
- * Returns the storage location (HDFS path) of this partition. Should only be called
- * for partitioned tables.
- */
- public String getLocation() {
- return (location_ != null) ? location_.toString() : null;
- }
- public long getId() { return id_; }
- public HdfsTable getTable() { return table_; }
- public void setNumRows(long numRows) { numRows_ = numRows; }
- public long getNumRows() { return numRows_; }
- public boolean isMarkedCached() { return isMarkedCached_; }
- void markCached() { isMarkedCached_ = true; }
-
- /**
- * Updates the file format of this partition and sets the corresponding input/output
- * format classes.
- */
- public void setFileFormat(HdfsFileFormat fileFormat) {
- fileFormatDescriptor_.setFileFormat(fileFormat);
- cachedMsPartitionDescriptor_.sdInputFormat = fileFormat.inputFormat();
- cachedMsPartitionDescriptor_.sdOutputFormat = fileFormat.outputFormat();
- cachedMsPartitionDescriptor_.sdSerdeInfo.setSerializationLib(
- fileFormatDescriptor_.getFileFormat().serializationLib());
- }
-
- public HdfsFileFormat getFileFormat() {
- return fileFormatDescriptor_.getFileFormat();
- }
-
- public void setLocation(String place) {
- location_ = table_.getPartitionLocationCompressor().new Location(place);
- }
-
- public org.apache.hadoop.hive.metastore.api.SerDeInfo getSerdeInfo() {
- return cachedMsPartitionDescriptor_.sdSerdeInfo;
- }
-
- // May return null if no per-partition stats were recorded, or if the per-partition
- // stats could not be deserialised from the parameter map.
- public TPartitionStats getPartitionStats() {
- try {
- return PartitionStatsUtil.partStatsFromParameters(hmsParameters_);
- } catch (ImpalaException e) {
- LOG.warn("Could not deserialise incremental stats state for " + getPartitionName() +
- ", consider DROP INCREMENTAL STATS ... PARTITION ... and recomputing " +
- "incremental stats for this table.");
- return null;
- }
- }
-
- public boolean hasIncrementalStats() {
- TPartitionStats partStats = getPartitionStats();
- return partStats != null && partStats.intermediate_col_stats != null;
- }
-
- /**
- * Returns the HDFS permissions Impala has to this partition's directory - READ_ONLY,
- * READ_WRITE, etc.
- */
- public TAccessLevel getAccessLevel() { return accessLevel_; }
-
- /**
- * Returns the HMS parameter with key 'key' if it exists, otherwise returns null.
- */
- public String getParameter(String key) {
- return hmsParameters_.get(key);
- }
-
- public Map<String, String> getParameters() { return hmsParameters_; }
-
- public void putToParameters(String k, String v) { hmsParameters_.put(k, v); }
-
- /**
- * Marks this partition's metadata as "dirty" indicating that changes have been
- * made and this partition's metadata should not be reused during the next
- * incremental metadata refresh.
- */
- public void markDirty() { isDirty_ = true; }
- public boolean isDirty() { return isDirty_; }
-
- /**
- * Returns an immutable list of partition key expressions
- */
- public List<LiteralExpr> getPartitionValues() { return partitionKeyValues_; }
- public LiteralExpr getPartitionValue(int i) { return partitionKeyValues_.get(i); }
- public List<HdfsPartition.FileDescriptor> getFileDescriptors() {
- return fileDescriptors_;
- }
- public void setFileDescriptors(List<FileDescriptor> descriptors) {
- fileDescriptors_ = descriptors;
- }
- public long getNumFileDescriptors() {
- return fileDescriptors_ == null ? 0 : fileDescriptors_.size();
- }
-
- public boolean hasFileDescriptors() { return !fileDescriptors_.isEmpty(); }
-
- // Struct-style class for caching all the information we need to reconstruct an
- // HMS-compatible Partition object, for use in RPCs to the metastore. We do this rather
- // than cache the Thrift partition object itself as the latter can be large - thanks
- // mostly to the inclusion of the full FieldSchema list. This class is read-only - if
- // any field can be mutated by Impala it should belong to HdfsPartition itself (see
- // HdfsPartition.location_ for an example).
- //
- // TODO: Cache this descriptor in HdfsTable so that identical descriptors are shared
- // between HdfsPartition instances.
- // TODO: sdInputFormat and sdOutputFormat can be mutated by Impala when the file format
- // of a partition changes; move these fields to HdfsPartition.
- private static class CachedHmsPartitionDescriptor {
- public String sdInputFormat;
- public String sdOutputFormat;
- public final boolean sdCompressed;
- public final int sdNumBuckets;
- public final org.apache.hadoop.hive.metastore.api.SerDeInfo sdSerdeInfo;
- public final List<String> sdBucketCols;
- public final List<org.apache.hadoop.hive.metastore.api.Order> sdSortCols;
- public final Map<String, String> sdParameters;
- public final int msCreateTime;
- public final int msLastAccessTime;
-
- public CachedHmsPartitionDescriptor(
- org.apache.hadoop.hive.metastore.api.Partition msPartition) {
- org.apache.hadoop.hive.metastore.api.StorageDescriptor sd = null;
- if (msPartition != null) {
- sd = msPartition.getSd();
- msCreateTime = msPartition.getCreateTime();
- msLastAccessTime = msPartition.getLastAccessTime();
- } else {
- msCreateTime = msLastAccessTime = 0;
- }
- if (sd != null) {
- sdInputFormat = sd.getInputFormat();
- sdOutputFormat = sd.getOutputFormat();
- sdCompressed = sd.isCompressed();
- sdNumBuckets = sd.getNumBuckets();
- sdSerdeInfo = sd.getSerdeInfo();
- sdBucketCols = ImmutableList.copyOf(sd.getBucketCols());
- sdSortCols = ImmutableList.copyOf(sd.getSortCols());
- sdParameters = ImmutableMap.copyOf(sd.getParameters());
- } else {
- sdInputFormat = "";
- sdOutputFormat = "";
- sdCompressed = false;
- sdNumBuckets = 0;
- sdSerdeInfo = null;
- sdBucketCols = ImmutableList.of();
- sdSortCols = ImmutableList.of();
- sdParameters = ImmutableMap.of();
- }
- }
- }
-
- private final CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_;
-
- public CachedHmsPartitionDescriptor getCachedMsPartitionDescriptor() {
- return cachedMsPartitionDescriptor_;
- }
-
- /**
- * Returns a Hive-compatible partition object that may be used in calls to the
- * metastore.
- */
- public org.apache.hadoop.hive.metastore.api.Partition toHmsPartition() {
- if (cachedMsPartitionDescriptor_ == null) return null;
- Preconditions.checkNotNull(table_.getNonPartitionFieldSchemas());
- // Update the serde library class based on the currently used file format.
- org.apache.hadoop.hive.metastore.api.StorageDescriptor storageDescriptor =
- new org.apache.hadoop.hive.metastore.api.StorageDescriptor(
- table_.getNonPartitionFieldSchemas(),
- getLocation(),
- cachedMsPartitionDescriptor_.sdInputFormat,
- cachedMsPartitionDescriptor_.sdOutputFormat,
- cachedMsPartitionDescriptor_.sdCompressed,
- cachedMsPartitionDescriptor_.sdNumBuckets,
- cachedMsPartitionDescriptor_.sdSerdeInfo,
- cachedMsPartitionDescriptor_.sdBucketCols,
- cachedMsPartitionDescriptor_.sdSortCols,
- cachedMsPartitionDescriptor_.sdParameters);
- org.apache.hadoop.hive.metastore.api.Partition partition =
- new org.apache.hadoop.hive.metastore.api.Partition(
- getPartitionValuesAsStrings(true), getTable().getDb().getName(),
- getTable().getName(), cachedMsPartitionDescriptor_.msCreateTime,
- cachedMsPartitionDescriptor_.msLastAccessTime, storageDescriptor,
- getParameters());
- return partition;
- }
-
- private HdfsPartition(HdfsTable table,
- org.apache.hadoop.hive.metastore.api.Partition msPartition,
- List<LiteralExpr> partitionKeyValues,
- HdfsStorageDescriptor fileFormatDescriptor,
- Collection<HdfsPartition.FileDescriptor> fileDescriptors, long id,
- HdfsPartitionLocationCompressor.Location location, TAccessLevel accessLevel) {
- table_ = table;
- if (msPartition == null) {
- cachedMsPartitionDescriptor_ = null;
- } else {
- cachedMsPartitionDescriptor_ = new CachedHmsPartitionDescriptor(msPartition);
- }
- location_ = location;
- partitionKeyValues_ = ImmutableList.copyOf(partitionKeyValues);
- fileDescriptors_ = ImmutableList.copyOf(fileDescriptors);
- fileFormatDescriptor_ = fileFormatDescriptor;
- id_ = id;
- accessLevel_ = accessLevel;
- if (msPartition != null && msPartition.getParameters() != null) {
- isMarkedCached_ = HdfsCachingUtil.getCacheDirectiveId(
- msPartition.getParameters()) != null;
- hmsParameters_ = msPartition.getParameters();
- } else {
- hmsParameters_ = Maps.newHashMap();
- }
-
- // TODO: instead of raising an exception, we should consider marking this partition
- // invalid and moving on, so that table loading won't fail and user can query other
- // partitions.
- for (FileDescriptor fileDescriptor: fileDescriptors_) {
- StringBuilder errorMsg = new StringBuilder();
- if (!getInputFormatDescriptor().getFileFormat().isFileCompressionTypeSupported(
- fileDescriptor.getFileName(), errorMsg)) {
- throw new RuntimeException(errorMsg.toString());
- }
- }
- }
-
- public HdfsPartition(HdfsTable table,
- org.apache.hadoop.hive.metastore.api.Partition msPartition,
- List<LiteralExpr> partitionKeyValues,
- HdfsStorageDescriptor fileFormatDescriptor,
- Collection<HdfsPartition.FileDescriptor> fileDescriptors,
- TAccessLevel accessLevel) {
- this(table, msPartition, partitionKeyValues, fileFormatDescriptor, fileDescriptors,
- partitionIdCounter_.getAndIncrement(),
- table.getPartitionLocationCompressor().new Location(msPartition != null
- ? msPartition.getSd().getLocation()
- : table.getLocation()),
- accessLevel);
- }
-
- public static HdfsPartition defaultPartition(
- HdfsTable table, HdfsStorageDescriptor storageDescriptor) {
- List<LiteralExpr> emptyExprList = Lists.newArrayList();
- List<FileDescriptor> emptyFileDescriptorList = Lists.newArrayList();
- return new HdfsPartition(table, null, emptyExprList,
- storageDescriptor, emptyFileDescriptorList,
- ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID, null,
- TAccessLevel.READ_WRITE);
- }
-
- /**
- * Return the size (in bytes) of all the files inside this partition
- */
- public long getSize() {
- long result = 0;
- for (HdfsPartition.FileDescriptor fileDescriptor: fileDescriptors_) {
- result += fileDescriptor.getFileLength();
- }
- return result;
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("fileDescriptors", fileDescriptors_)
- .toString();
- }
-
- private static Predicate<String> isIncrementalStatsKey = new Predicate<String>() {
- @Override
- public boolean apply(String key) {
- return !(key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_NUM_CHUNKS)
- || key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_CHUNK_PREFIX));
- }
- };
-
- /**
- * Returns hmsParameters_ after filtering out all the partition
- * incremental stats information.
- */
- private Map<String, String> getFilteredHmsParameters() {
- return Maps.filterKeys(hmsParameters_, isIncrementalStatsKey);
- }
-
- public static HdfsPartition fromThrift(HdfsTable table,
- long id, THdfsPartition thriftPartition) {
- HdfsStorageDescriptor storageDesc = new HdfsStorageDescriptor(table.getName(),
- HdfsFileFormat.fromThrift(thriftPartition.getFileFormat()),
- thriftPartition.lineDelim,
- thriftPartition.fieldDelim,
- thriftPartition.collectionDelim,
- thriftPartition.mapKeyDelim,
- thriftPartition.escapeChar,
- (byte) '"', // TODO: We should probably add quoteChar to THdfsPartition.
- thriftPartition.blockSize);
-
- List<LiteralExpr> literalExpr = Lists.newArrayList();
- if (id != ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
- List<Column> clusterCols = Lists.newArrayList();
- for (int i = 0; i < table.getNumClusteringCols(); ++i) {
- clusterCols.add(table.getColumns().get(i));
- }
-
- List<TExprNode> exprNodes = Lists.newArrayList();
- for (TExpr expr: thriftPartition.getPartitionKeyExprs()) {
- for (TExprNode node: expr.getNodes()) {
- exprNodes.add(node);
- }
- }
- Preconditions.checkState(clusterCols.size() == exprNodes.size(),
- String.format("Number of partition columns (%d) does not match number " +
- "of partition key expressions (%d)",
- clusterCols.size(), exprNodes.size()));
-
- for (int i = 0; i < exprNodes.size(); ++i) {
- literalExpr.add(LiteralExpr.fromThrift(
- exprNodes.get(i), clusterCols.get(i).getType()));
- }
- }
-
- List<HdfsPartition.FileDescriptor> fileDescriptors = Lists.newArrayList();
- if (thriftPartition.isSetFile_desc()) {
- for (THdfsFileDesc desc: thriftPartition.getFile_desc()) {
- fileDescriptors.add(HdfsPartition.FileDescriptor.fromThrift(desc));
- }
- }
-
- TAccessLevel accessLevel = thriftPartition.isSetAccess_level() ?
- thriftPartition.getAccess_level() : TAccessLevel.READ_WRITE;
- HdfsPartitionLocationCompressor.Location location = thriftPartition.isSetLocation()
- ? table.getPartitionLocationCompressor().new Location(
- thriftPartition.getLocation())
- : null;
- HdfsPartition partition = new HdfsPartition(table, null, literalExpr, storageDesc,
- fileDescriptors, id, location, accessLevel);
- if (thriftPartition.isSetStats()) {
- partition.setNumRows(thriftPartition.getStats().getNum_rows());
- }
- if (thriftPartition.isSetIs_marked_cached()) {
- partition.isMarkedCached_ = thriftPartition.isIs_marked_cached();
- }
-
- if (thriftPartition.isSetHms_parameters()) {
- partition.hmsParameters_ = thriftPartition.getHms_parameters();
- } else {
- partition.hmsParameters_ = Maps.newHashMap();
- }
-
- return partition;
- }
-
- /**
- * Checks that this partition's metadata is well formed. This does not necessarily
- * mean the partition is supported by Impala.
- * Throws a CatalogException if there are any errors in the partition metadata.
- */
- public void checkWellFormed() throws CatalogException {
- try {
- // Validate all the partition key/values to ensure you can convert them toThrift()
- Expr.treesToThrift(getPartitionValues());
- } catch (Exception e) {
- throw new CatalogException("Partition (" + getPartitionName() +
- ") has invalid partition column values: ", e);
- }
- }
-
- public THdfsPartition toThrift(boolean includeFileDesc,
- boolean includeIncrementalStats) {
- List<TExpr> thriftExprs = Expr.treesToThrift(getPartitionValues());
-
- THdfsPartition thriftHdfsPart = new THdfsPartition(
- fileFormatDescriptor_.getLineDelim(),
- fileFormatDescriptor_.getFieldDelim(),
- fileFormatDescriptor_.getCollectionDelim(),
- fileFormatDescriptor_.getMapKeyDelim(),
- fileFormatDescriptor_.getEscapeChar(),
- fileFormatDescriptor_.getFileFormat().toThrift(), thriftExprs,
- fileFormatDescriptor_.getBlockSize());
- if (location_ != null) thriftHdfsPart.setLocation(location_.toThrift());
- thriftHdfsPart.setStats(new TTableStats(numRows_));
- thriftHdfsPart.setAccess_level(accessLevel_);
- thriftHdfsPart.setIs_marked_cached(isMarkedCached_);
- thriftHdfsPart.setId(getId());
- thriftHdfsPart.setHms_parameters(
- includeIncrementalStats ? hmsParameters_ : getFilteredHmsParameters());
- if (includeFileDesc) {
- // Add block location information
- for (FileDescriptor fd: fileDescriptors_) {
- thriftHdfsPart.addToFile_desc(fd.toThrift());
- }
- }
-
- return thriftHdfsPart;
- }
-
- /**
- * Comparison method to allow ordering of HdfsPartitions by their partition-key values.
- */
- @Override
- public int compareTo(HdfsPartition o) {
- return comparePartitionKeyValues(partitionKeyValues_, o.getPartitionValues());
- }
-
- @VisibleForTesting
- public static int comparePartitionKeyValues(List<LiteralExpr> lhs,
- List<LiteralExpr> rhs) {
- int sizeDiff = lhs.size() - rhs.size();
- if (sizeDiff != 0) return sizeDiff;
- for(int i = 0; i < lhs.size(); ++i) {
- int cmp = lhs.get(i).compareTo(rhs.get(i));
- if (cmp != 0) return cmp;
- }
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartitionLocationCompressor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartitionLocationCompressor.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartitionLocationCompressor.java
deleted file mode 100644
index b72b846..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartitionLocationCompressor.java
+++ /dev/null
@@ -1,153 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import com.cloudera.impala.common.Pair;
-import com.cloudera.impala.thrift.THdfsPartitionLocation;
-import com.cloudera.impala.util.ListMap;
-import com.google.common.base.Preconditions;
-
-/**
- * Utility class for storing HdfsPartition locations in a comrpessed format. Each
- * instance of this class is owned by a single HdfsTable instance.
- *
- * This class is not thread-safe by itself since it is only modified when the lock on an
- * HdfsTable object is held.
- *
- * TODO: Generalize this to compress other sets of Strings that are likely to share common
- * prefixes, like table locations.
- *
- */
-class HdfsPartitionLocationCompressor {
- int numClusteringColumns_;
-
- // A bi-directional map between partition location prefixes and their compressed
- // representation, an int.
- final private ListMap<String> prefixMap_ = new ListMap<String>();
-
- public HdfsPartitionLocationCompressor(int numClusteringColumns) {
- numClusteringColumns_ = numClusteringColumns;
- }
-
- // Construct an HdfsPartitionLocationCompressor with a pre-filled bidirectional map
- // (indexToPrefix_, prefixToIndex_).
- public HdfsPartitionLocationCompressor(
- int numClusteringColumns, ArrayList<String> prefixes) {
- numClusteringColumns_ = numClusteringColumns;
- prefixMap_.populate(prefixes);
- }
-
- public void setClusteringColumns(int numClusteringColumns) {
- numClusteringColumns_ = numClusteringColumns;
- }
-
- public List<String> getPrefixes() {
- return prefixMap_.getList();
- }
-
- // One direction of the map: returns the prefix associated with an index, or "" is the
- // index is -1. Indexes less than -1 or greater than indexToPrefix_.size()-1 are invalid
- // and casue and IllegalArgumentException to be thrown.
- private String indexToPrefix(int i) {
- // Uncompressed location are represented by -1:
- if (i == -1) return "";
- Preconditions.checkElementIndex(i, prefixMap_.size());
- return prefixMap_.getEntry(i);
- }
-
- // Compress a location prefix, adding it to the bidirectional map (indexToPrefix_,
- // prefixToIndex_) if it is not already present.
- private int prefixToIndex(String s) {
- return prefixMap_.getIndex(s);
- }
-
- // A surrogate for THdfsPartitionLocation, which represents a partition's location
- // relative to its parent table's list of partition prefixes.
- public class Location {
- // 'prefix_index_' represents the portion of the partition's location that comes before
- // the last N directories, where N is the number of partitioning columns.
- // 'prefix_index_' is an index into
- // HdfsPartitionLocationCompressor.this.indexToPrefix_. 'suffix_' is the rest of the
- // partition location.
- //
- // TODO: Since each partition stores the literal values for the partitioning columns,
- // we could also elide the column names and values from suffix_ when a partition is in
- // the canonical location "/partitioning_column_name_1=value_1/..."
- private final int prefix_index_;
- private final String suffix_;
-
- public Location(String location) {
- Preconditions.checkNotNull(location);
- Pair<String,String> locationParts = decompose(location);
- prefix_index_ =
- HdfsPartitionLocationCompressor.this.prefixToIndex(locationParts.first);
- suffix_ = locationParts.second;
- }
-
- public Location(THdfsPartitionLocation thrift) {
- Preconditions.checkNotNull(thrift);
- prefix_index_ = thrift.prefix_index;
- suffix_ = thrift.getSuffix();
- }
-
- public THdfsPartitionLocation toThrift() {
- return new THdfsPartitionLocation(prefix_index_, suffix_);
- }
-
- @Override
- public String toString() {
- return HdfsPartitionLocationCompressor.this.indexToPrefix(prefix_index_) + suffix_;
- }
-
- @Override
- public int hashCode() { return toString().hashCode(); }
-
- @Override
- public boolean equals(Object obj) {
- return (obj instanceof Location) && (toString() == obj.toString());
- }
-
- // Decompose a location string by removing its last N directories, where N is the
- // number of clustering columns. The result is a Pair<String,String> where the first
- // String is the prefix and the second is the suffix. (In orther words, their
- // concatenation equals the input.) If the input does not have at least N '/'
- // characters, the prefix is empty and the suffix is the entire input.
- private Pair<String,String> decompose(String s) {
- Preconditions.checkNotNull(s);
- int numClusteringColumns =
- HdfsPartitionLocationCompressor.this.numClusteringColumns_;
- if (numClusteringColumns == 0) return new Pair<String,String>(s, "");
- // Iterate backwards over the input until we have passed 'numClusteringColumns'
- // directories. What is left is the prefix.
- int i = s.length() - 1;
- // If the string ends in '/', iterating past it does not pass a clustering column.
- if (i >= 0 && s.charAt(i) == '/') --i;
- for (; numClusteringColumns > 0 && i >= 0; --i) {
- if (s.charAt(i) == '/') --numClusteringColumns;
- }
- // If we successfully removed all the partition directories, s.charAt(i+1) is '/'
- // and we can include it in the prefix.
- if (0 == numClusteringColumns) ++i;
- return new Pair<String,String>(s.substring(0, i + 1), s.substring(i + 1));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/catalog/HdfsStorageDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsStorageDescriptor.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsStorageDescriptor.java
deleted file mode 100644
index f018ce3..0000000
--- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsStorageDescriptor.java
+++ /dev/null
@@ -1,240 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.catalog;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-
-/**
- * Represents the file format metadata for files stored in a table or partition.
- */
-public class HdfsStorageDescriptor {
- public static final char DEFAULT_LINE_DELIM = '\n';
- // hive by default uses ctrl-a as field delim
- public static final char DEFAULT_FIELD_DELIM = '\u0001';
- // hive by default has no escape char
- public static final char DEFAULT_ESCAPE_CHAR = '\u0000';
-
- // Serde parameters that are recognized by table writers.
- private static final String BLOCK_SIZE = "blocksize";
- private static final String COMPRESSION = "compression";
-
- // Important: don't change the ordering of these keys - if e.g. FIELD_DELIM is not
- // found, the value of LINE_DELIM is used, so LINE_DELIM must be found first.
- // Package visible for testing.
- final static List<String> DELIMITER_KEYS = ImmutableList.of(
- serdeConstants.LINE_DELIM, serdeConstants.FIELD_DELIM,
- serdeConstants.COLLECTION_DELIM, serdeConstants.MAPKEY_DELIM,
- serdeConstants.ESCAPE_CHAR, serdeConstants.QUOTE_CHAR);
-
- // The Parquet serde shows up multiple times as the location of the implementation
- // has changed between Impala versions.
- final static List<String> COMPATIBLE_SERDES = ImmutableList.of(
- "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", // (seq / text / parquet)
- "org.apache.hadoop.hive.serde2.avro.AvroSerDe", // (avro)
- "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", // (rc)
- "parquet.hive.serde.ParquetHiveSerDe", // (parquet - legacy)
- // TODO: Verify the following Parquet SerDe works with Impala and add
- // support for the new input/output format classes. See CDH-17085.
- "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"); // (parquet)
-
- private final static Logger LOG = LoggerFactory.getLogger(HdfsStorageDescriptor.class);
-
- private HdfsFileFormat fileFormat_;
- private final byte lineDelim_;
- private final byte fieldDelim_;
- private final byte collectionDelim_;
- private final byte mapKeyDelim_;
- private final byte escapeChar_;
- private final byte quoteChar_;
- private final int blockSize_;
-
- public void setFileFormat(HdfsFileFormat fileFormat) {
- fileFormat_ = fileFormat;
- }
-
- /**
- * Returns a map from delimiter key to a single delimiter character,
- * filling in defaults if explicit values are not found in the supplied
- * serde descriptor.
- *
- * @throws InvalidStorageDescriptorException - if an invalid delimiter is found
- */
- private static Map<String, Byte> extractDelimiters(SerDeInfo serdeInfo)
- throws InvalidStorageDescriptorException {
- // The metastore may return null for delimiter parameters,
- // which means we need to use a default instead.
- // We tried long and hard to find default values for delimiters in Hive,
- // but could not find them.
- Map<String, Byte> delimMap = Maps.newHashMap();
-
- for (String delimKey: DELIMITER_KEYS) {
- String delimValue = serdeInfo.getParameters().get(delimKey);
- if (delimValue == null) {
- if (delimKey.equals(serdeConstants.FIELD_DELIM)) {
- delimMap.put(delimKey, (byte) DEFAULT_FIELD_DELIM);
- } else if (delimKey.equals(serdeConstants.ESCAPE_CHAR)) {
- delimMap.put(delimKey, (byte) DEFAULT_ESCAPE_CHAR);
- } else if (delimKey.equals(serdeConstants.LINE_DELIM)) {
- delimMap.put(delimKey, (byte) DEFAULT_LINE_DELIM);
- } else {
- delimMap.put(delimKey, delimMap.get(serdeConstants.FIELD_DELIM));
- }
- } else {
- Byte delimByteValue = parseDelim(delimValue);
- if (delimByteValue == null) {
- throw new InvalidStorageDescriptorException("Invalid delimiter: '" +
- delimValue + "'. Delimiter must be specified as a single character or " +
- "as a decimal value in the range [-128:127]");
- }
- delimMap.put(delimKey, parseDelim(delimValue));
- }
- }
- return delimMap;
- }
-
- /**
- * Parses a delimiter in a similar way as Hive, with some additional error checking.
- * A delimiter must fit in a single byte and can be specified in the following
- * formats, as far as I can tell (there isn't documentation):
- * - A single ASCII or unicode character (ex. '|')
- * - An escape character in octal format (ex. \001. Stored in the metastore as a
- * unicode character: \u0001).
- * - A signed decimal integer in the range [-128:127]. Used to support delimiters
- * for ASCII character values between 128-255 (-2 maps to ASCII 254).
- *
- * The delimiter is first parsed as a decimal number. If the parsing succeeds AND
- * the resulting value fits in a signed byte, the byte value of the parsed int is
- * returned. Otherwise, if the string has a single char, the byte value of this
- * char is returned.
- * If the delimiter is invalid, null will be returned.
- */
- public static Byte parseDelim(String delimVal) {
- Preconditions.checkNotNull(delimVal);
- try {
- // In the future we could support delimiters specified in hex format, but we would
- // need support from the Hive side.
- return Byte.parseByte(delimVal);
- } catch (NumberFormatException e) {
- if (delimVal.length() == 1) return (byte) delimVal.charAt(0);
- }
- return null;
- }
-
- public HdfsStorageDescriptor(String tblName, HdfsFileFormat fileFormat, byte lineDelim,
- byte fieldDelim, byte collectionDelim, byte mapKeyDelim, byte escapeChar,
- byte quoteChar, int blockSize) {
- this.fileFormat_ = fileFormat;
- this.lineDelim_ = lineDelim;
- this.fieldDelim_ = fieldDelim;
- this.collectionDelim_ = collectionDelim;
- this.mapKeyDelim_ = mapKeyDelim;
- this.quoteChar_ = quoteChar;
- this.blockSize_ = blockSize;
-
- // You can set the escape character as a tuple or row delim. Empirically,
- // this is ignored by hive.
- if (escapeChar == fieldDelim ||
- escapeChar == lineDelim ||
- escapeChar == collectionDelim) {
- // TODO: we should output the table name here but it's hard to get to now.
- this.escapeChar_ = DEFAULT_ESCAPE_CHAR;
- LOG.warn("Escape character for table, " + tblName + " is set to "
- + "the same character as one of the delimiters. Ignoring escape character.");
- } else {
- this.escapeChar_ = escapeChar;
- }
- }
-
- /**
- * Thrown when constructing an HdfsStorageDescriptor from an invalid/unsupported
- * metastore storage descriptor.
- * TODO: Get rid of this class.
- */
- public static class InvalidStorageDescriptorException extends CatalogException {
- // Mandatory since Exception implements Serialisable
- private static final long serialVersionUID = -555234913768134760L;
- public InvalidStorageDescriptorException(String s) { super(s); }
- public InvalidStorageDescriptorException(Exception ex) {
- super(ex.getMessage(), ex);
- }
- public InvalidStorageDescriptorException(String msg, Throwable cause) {
- super(msg, cause);
- }
- }
-
- /**
- * Constructs a new HdfsStorageDescriptor from a StorageDescriptor retrieved from the
- * metastore.
- *
- * @throws InvalidStorageDescriptorException - if the storage descriptor has invalid
- * delimiters, an unsupported SerDe, or an unknown file format.
- */
- public static HdfsStorageDescriptor fromStorageDescriptor(String tblName,
- StorageDescriptor sd)
- throws InvalidStorageDescriptorException {
- Map<String, Byte> delimMap = extractDelimiters(sd.getSerdeInfo());
- if (!COMPATIBLE_SERDES.contains(sd.getSerdeInfo().getSerializationLib())) {
- throw new InvalidStorageDescriptorException(String.format("Impala does not " +
- "support tables of this type. REASON: SerDe library '%s' is not " +
- "supported.", sd.getSerdeInfo().getSerializationLib()));
- }
- // Extract the blocksize and compression specification from the SerDe parameters,
- // if present.
- Map<String, String> parameters = sd.getSerdeInfo().getParameters();
- int blockSize = 0;
- String blockValue = parameters.get(BLOCK_SIZE);
- if (blockValue != null) {
- blockSize = Integer.parseInt(blockValue);
- }
-
- try {
- return new HdfsStorageDescriptor(tblName,
- HdfsFileFormat.fromJavaClassName(sd.getInputFormat()),
- delimMap.get(serdeConstants.LINE_DELIM),
- delimMap.get(serdeConstants.FIELD_DELIM),
- delimMap.get(serdeConstants.COLLECTION_DELIM),
- delimMap.get(serdeConstants.MAPKEY_DELIM),
- delimMap.get(serdeConstants.ESCAPE_CHAR),
- delimMap.get(serdeConstants.QUOTE_CHAR),
- blockSize);
- } catch (IllegalArgumentException ex) {
- // Thrown by fromJavaClassName
- throw new InvalidStorageDescriptorException(ex);
- }
- }
-
- public byte getLineDelim() { return lineDelim_; }
- public byte getFieldDelim() { return fieldDelim_; }
- public byte getCollectionDelim() { return collectionDelim_; }
- public byte getMapKeyDelim() { return mapKeyDelim_; }
- public byte getEscapeChar() { return escapeChar_; }
- public byte getQuoteChar() { return quoteChar_; }
- public HdfsFileFormat getFileFormat() { return fileFormat_; }
- public int getBlockSize() { return blockSize_; }
-}