You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:29 UTC
[12/61] [partial] incubator-impala git commit: IMPALA-3786: Replace
"cloudera" with "apache" (part 1)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/AvroSchemaParser.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaParser.java b/fe/src/main/java/com/cloudera/impala/util/AvroSchemaParser.java
deleted file mode 100644
index 60b0c7a..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaParser.java
+++ /dev/null
@@ -1,204 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import static org.apache.avro.Schema.Type.BOOLEAN;
-import static org.apache.avro.Schema.Type.DOUBLE;
-import static org.apache.avro.Schema.Type.FLOAT;
-import static org.apache.avro.Schema.Type.INT;
-import static org.apache.avro.Schema.Type.LONG;
-import static org.apache.avro.Schema.Type.STRING;
-
-import java.util.Collections;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaParseException;
-import org.codehaus.jackson.JsonNode;
-
-import com.cloudera.impala.analysis.ColumnDef;
-import com.cloudera.impala.analysis.TypeDef;
-import com.cloudera.impala.catalog.ArrayType;
-import com.cloudera.impala.catalog.MapType;
-import com.cloudera.impala.catalog.ScalarType;
-import com.cloudera.impala.catalog.StructField;
-import com.cloudera.impala.catalog.StructType;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.common.AnalysisException;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Utility class used to parse Avro schema. Checks that the schema is valid
- * and performs mapping of Avro types to Impala types.
- * Note: This code is loosely based off the parsing code in the Hive AvroSerDe.
- */
-public class AvroSchemaParser {
- // Map of Avro to Impala primitive types.
- private static final Map<Schema.Type, Type> avroToImpalaPrimitiveTypeMap_;
- static {
- Map<Schema.Type, Type> typeMap = new Hashtable<Schema.Type, Type>();
- typeMap.put(STRING, Type.STRING);
- typeMap.put(INT, Type.INT);
- typeMap.put(BOOLEAN, Type.BOOLEAN);
- typeMap.put(LONG, Type.BIGINT);
- typeMap.put(FLOAT, Type.FLOAT);
- typeMap.put(DOUBLE, Type.DOUBLE);
- avroToImpalaPrimitiveTypeMap_ = Collections.unmodifiableMap(typeMap);
- }
-
- /**
- * Parses the Avro schema string literal, mapping the Avro types to Impala types.
- * Returns a list of ColumnDef objects with their name and type info set.
- * Throws an AnalysisException if the Avro type maps to a type that Impala
- * does not yet support.
- * Throws a SchemaParseException if the Avro schema was invalid.
- */
- public static List<ColumnDef> parse(String schemaStr)
- throws SchemaParseException, AnalysisException {
- Schema.Parser avroSchemaParser = new Schema.Parser();
- Schema schema = avroSchemaParser.parse(schemaStr);
- if (!schema.getType().equals(Schema.Type.RECORD)) {
- throw new UnsupportedOperationException("Schema for table must be of type " +
- "RECORD. Received type: " + schema.getType());
- }
- List<ColumnDef> colDefs = Lists.newArrayListWithCapacity(schema.getFields().size());
- for (Schema.Field field: schema.getFields()) {
- ColumnDef colDef = new ColumnDef(field.name(),
- new TypeDef(getTypeInfo(field.schema(), field.name())), field.doc());
- colDef.analyze();
- colDefs.add(colDef);
- }
- return colDefs;
- }
-
- /**
- * Parses the given Avro schema and returns the matching Impala type
- * for this field. Handles primitive and complex types.
- */
- private static Type getTypeInfo(Schema schema, String colName)
- throws AnalysisException {
- // Avro requires NULLable types to be defined as unions of some type T
- // and NULL. This is annoying and we're going to hide it from the user.
- if (isNullableType(schema)) {
- return getTypeInfo(getColumnType(schema), colName);
- }
-
- Schema.Type type = schema.getType();
- if (avroToImpalaPrimitiveTypeMap_.containsKey(type)) {
- return avroToImpalaPrimitiveTypeMap_.get(type);
- }
-
- switch(type) {
- case ARRAY:
- Type itemType = getTypeInfo(schema.getElementType(), colName);
- return new ArrayType(itemType);
- case MAP:
- Type valueType = getTypeInfo(schema.getValueType(), colName);
- return new MapType(Type.STRING, valueType);
- case RECORD:
- StructType structType = new StructType();
- for (Schema.Field field: schema.getFields()) {
- Type fieldType = getTypeInfo(field.schema(), colName);
- structType.addField(new StructField(field.name(), fieldType, field.doc()));
- }
- return structType;
- case BYTES:
- // Decimal is stored in Avro as a BYTE.
- Type decimalType = getDecimalType(schema);
- if (decimalType != null) return decimalType;
- // TODO: Add support for stored Avro UNIONs by exposing them as STRUCTs in Impala.
- case UNION:
- case ENUM:
- case FIXED:
- case NULL:
- default: {
- throw new AnalysisException(String.format(
- "Unsupported type '%s' of column '%s'", type.getName(), colName));
- }
- }
- }
-
- /**
- * Returns true if this is a nullable type (a Union[T, Null]), false otherwise.
- */
- private static boolean isNullableType(Schema schema) {
- // [null, null] not allowed, so this check is ok.
- return schema.getType().equals(Schema.Type.UNION) && schema.getTypes().size() == 2 &&
- (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) ||
- schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
- }
-
- /**
- * If a nullable type, get the schema for the non-nullable type which will
- * provide Impala column type information.
- */
- private static Schema getColumnType(Schema schema) {
- List<Schema> types = schema.getTypes();
- return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
- }
-
- /**
- * Attempts to parse decimal type information from the Avro schema, returning
- * a decimal ColumnType if successful or null if this schema does not map
- * to a decimal type.
- * Decimal is defined in Avro as a BYTE type with the logicalType property
- * set to "decimal" and a specified scale/precision.
- * Throws a SchemaParseException if the logicType=decimal, but scale/precision
- * is not specified or in the incorrect format.
- */
- private static Type getDecimalType(Schema schema) {
- Preconditions.checkState(schema.getType() == Schema.Type.BYTES);
- String logicalType = schema.getProp("logicalType");
- if (logicalType != null && logicalType.equalsIgnoreCase("decimal")) {
- // Parse the scale/precision of the decimal type.
- Integer scale = getDecimalProp(schema, "scale");
- // The Avro spec states that scale should default to zero if not set.
- if (scale == null) scale = 0;
-
- // Precision is a required property according to the Avro spec.
- Integer precision = getDecimalProp(schema, "precision");
- if (precision == null) {
- throw new SchemaParseException(
- "No 'precision' property specified for 'decimal' logicalType");
- }
- return ScalarType.createDecimalType(precision, scale);
- }
- return null;
- }
-
- /**
- * Parses a decimal property and returns the value as an integer, or null
- * if the property isn't set. Used to parse decimal scale/precision.
- * Throws a SchemaParseException if the property doesn't parse to a
- * natural number.
- */
- private static Integer getDecimalProp(Schema schema, String propName)
- throws SchemaParseException {
- JsonNode node = schema.getJsonProp(propName);
- if (node == null) return null;
- int propValue = node.getValueAsInt(-1);
- if (propValue < 0) {
- throw new SchemaParseException(String.format("Invalid decimal '%s' " +
- "property value: %s", propName, node.getValueAsText()));
- }
- return propValue;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java b/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java
deleted file mode 100644
index f86c347..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java
+++ /dev/null
@@ -1,189 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-
-import com.cloudera.impala.analysis.ColumnDef;
-import com.cloudera.impala.catalog.PrimitiveType;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.FileSystemUtil;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-/**
- * Contains utility functions for dealing with Avro schemas.
- */
-public class AvroSchemaUtils {
-
- /**
- * Gets an Avro table's JSON schema from the list of given table property search
- * locations. The schema may be specified as a string literal or provided as a
- * Hadoop FileSystem or http URL that points to the schema. Apart from ensuring
- * that the JSON schema is not SCHEMA_NONE, this function does not perform any
- * additional validation on the returned string (e.g., it may not be a valid
- * schema). Returns the Avro schema or null if none was specified in the search
- * locations. Throws an AnalysisException if a schema was specified, but could not
- * be retrieved, e.g., because of an invalid URL.
- */
- public static String getAvroSchema(List<Map<String, String>> schemaSearchLocations)
- throws AnalysisException {
- String url = null;
- // Search all locations and break out on the first valid schema found.
- for (Map<String, String> schemaLocation: schemaSearchLocations) {
- if (schemaLocation == null) continue;
-
- String literal =
- schemaLocation.get(
- AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
- if (literal != null && !literal.equals(AvroSerdeUtils.SCHEMA_NONE)) return literal;
-
- url = schemaLocation.get(
- AvroSerdeUtils.AvroTableProperties.SCHEMA_URL.getPropName());
- if (url != null && !url.equals(AvroSerdeUtils.SCHEMA_NONE)) {
- url = url.trim();
- break;
- }
- }
- if (url == null) return null;
-
- String schema = null;
- InputStream urlStream = null;
- try {
- // TODO: Add support for https:// here.
- if (url.toLowerCase().startsWith("http://")) {
- urlStream = new URL(url).openStream();
- schema = IOUtils.toString(urlStream);
- } else {
- Path path = new Path(url);
- FileSystem fs = null;
- fs = path.getFileSystem(FileSystemUtil.getConfiguration());
- if (!fs.exists(path)) {
- throw new AnalysisException(String.format(
- "Invalid avro.schema.url: %s. Path does not exist.", url));
- }
- schema = FileSystemUtil.readFile(path);
- }
- } catch (AnalysisException e) {
- throw e;
- } catch (IOException e) {
- throw new AnalysisException(String.format(
- "Failed to read Avro schema at: %s. %s ", url, e.getMessage()));
- } catch (Exception e) {
- throw new AnalysisException(String.format(
- "Invalid avro.schema.url: %s. %s", url, e.getMessage()));
- } finally {
- if (urlStream != null) IOUtils.closeQuietly(urlStream);
- }
- return schema;
- }
-
- /**
- * Reconciles differences in names/types between the given list of column definitions
- * and the column definitions corresponding to an Avro Schema. Populates 'warning'
- * if there are inconsistencies between the column definitions and the Avro schema,
- * Returns the reconciled column definitions according to the following conflict
- * resolution policy:
- *
- * Mismatched number of columns -> Prefer Avro columns.
- * Always prefer Avro schema except for column type CHAR/VARCHAR/STRING:
- * A CHAR/VARCHAR/STRING column definition maps to an Avro STRING. The reconciled
- * column will preserve the type in the column definition but use the column name
- * and comment from the Avro schema.
- */
- public static List<ColumnDef> reconcileSchemas(
- List<ColumnDef> colDefs, List<ColumnDef> avroCols, StringBuilder warning) {
- if (colDefs.size() != avroCols.size()) {
- warning.append(String.format(
- "Ignoring column definitions in favor of Avro schema.\n" +
- "The Avro schema has %s column(s) but %s column definition(s) were given.",
- avroCols.size(), colDefs.size()));
- return avroCols;
- }
-
- List<ColumnDef> result = Lists.newArrayListWithCapacity(colDefs.size());
- for (int i = 0; i < avroCols.size(); ++i) {
- ColumnDef colDef = colDefs.get(i);
- ColumnDef avroCol = avroCols.get(i);
- Preconditions.checkNotNull(colDef.getType());
- Preconditions.checkNotNull(avroCol.getType());
-
- // A CHAR/VARCHAR/STRING column definition maps to an Avro STRING, and is preserved
- // as a CHAR/VARCHAR/STRING in the reconciled schema. Column name and comment
- // are taken from the Avro schema.
- if ((colDef.getType().isStringType() && avroCol.getType().isStringType())) {
- Preconditions.checkState(
- avroCol.getType().getPrimitiveType() == PrimitiveType.STRING);
- ColumnDef reconciledColDef = new ColumnDef(
- avroCol.getColName(), colDef.getTypeDef(), avroCol.getComment());
- try {
- reconciledColDef.analyze();
- } catch (AnalysisException e) {
- Preconditions.checkNotNull(
- null, "reconciledColDef.analyze() should never throw.");
- }
- result.add(reconciledColDef);
- } else {
- result.add(avroCol);
- }
-
- // Populate warning string if there are name and/or type inconsistencies.
- if (!colDef.getColName().equals(avroCol.getColName()) ||
- !colDef.getType().equals(avroCol.getType())) {
- if (warning.length() == 0) {
- // Add warning preamble for the first mismatch.
- warning.append("Resolved the following name and/or type inconsistencies " +
- "between the column definitions and the Avro schema.\n");
- }
- warning.append(String.format("Column definition at position %s: %s %s\n",
- i, colDefs.get(i).getColName(), colDefs.get(i).getType().toSql()));
- warning.append(String.format("Avro schema column at position %s: %s %s\n",
- i, avroCols.get(i).getColName(), avroCols.get(i).getType().toSql()));
- warning.append(String.format("Resolution at position %s: %s %s\n",
- i, result.get(i).getColName(), result.get(i).getType().toSql()));
- }
- }
- Preconditions.checkState(result.size() == avroCols.size());
- Preconditions.checkState(result.size() == colDefs.size());
- return result;
- }
-
- /**
- * Sets the comment of each column definition to 'from deserializer' if not already
- * set. The purpose of this function is to provide behavioral consistency with
- * Hive ('deserializer' is not applicable to Impala) with respect to column comments
- * set for Avro tables.
- */
- public static void setFromSerdeComment(List<ColumnDef> colDefs) {
- for (ColumnDef colDef: colDefs) {
- if (Strings.isNullOrEmpty(colDef.getComment())) {
- colDef.setComment("from deserializer");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/DisjointSet.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/DisjointSet.java b/fe/src/main/java/com/cloudera/impala/util/DisjointSet.java
deleted file mode 100644
index bce214e..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/DisjointSet.java
+++ /dev/null
@@ -1,142 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Basic implementation of the disjoint-set data structure.
- * Stores a set of disjoint item sets and provides efficient implementations of mainly
- * two operations:
- * 1. Find the item set corresponding to a given member item (get() function)
- * 2. Compute the union of two item sets (union() function)
- */
-public class DisjointSet<T> {
- // Maps from an item to its item set.
- private final Map<T, Set<T>> itemSets_ = Maps.newHashMap();
- private final Set<Set<T>> uniqueSets_ = Sets.newHashSet();
-
- /**
- * Returns the item set corresponding to the given item or null if it
- * doesn't exist.
- */
- public Set<T> get(T item) { return itemSets_.get(item); }
-
- public Set<Set<T>> getSets() { return uniqueSets_; }
-
- /**
- * Registers a new item set with a single item. Returns the new item set.
- * Throws if such an item set already exists.
- */
- public Set<T> makeSet(T item) {
- if (itemSets_.containsKey(item)) {
- throw new IllegalStateException(
- "Item set for item already exists: " + item.toString());
- }
- Set<T> s = Sets.newHashSet(item);
- itemSets_.put(item, s);
- uniqueSets_.add(s);
- return s;
- }
-
- /**
- * Merges the two item sets belonging to the members a and b. The merged set contains
- * at least a and b even if a or b did not have an associated item set.
- * Returns false if the item sets of a and b are non-empty and already identical,
- * true otherwise.
- */
- public boolean union(T a, T b) {
- Set<T> aItems = itemSets_.get(a);
- Set<T> bItems = itemSets_.get(b);
- // check if the sets are already identical
- if (aItems != null && bItems != null && aItems == bItems) return false;
-
- // union(x, x) is equivalent to makeSet(x)
- if (a.equals(b) && aItems == null) {
- makeSet(a);
- return true;
- }
-
- // create sets for a or b if not present already
- if (aItems == null) aItems = makeSet(a);
- if (bItems == null) bItems = makeSet(b);
-
- // will contain the union of aItems and bItems
- Set<T> mergedItems = aItems;
- // always the smaller of the two sets to be merged
- Set<T> updateItems = bItems;
- if (bItems.size() > aItems.size()) {
- mergedItems = bItems;
- updateItems = aItems;
- }
- for (T item: updateItems) {
- mergedItems.add(item);
- itemSets_.put(item, mergedItems);
- }
- uniqueSets_.remove(updateItems);
- return true;
- }
-
- /**
- * Merges all the item sets corresponding to the given items. Returns true if any item
- * sets were merged or created, false otherwise (item sets are already identical).
- */
- public boolean bulkUnion(Collection<T> items) {
- if (items.isEmpty()) return false;
- Iterator<T> it = items.iterator();
- T head = it.next();
- // bulkUnion(x) is equivalent to makeSet(x)
- if (!it.hasNext()) {
- if (get(head) != null) return false;
- makeSet(head);
- return true;
- }
- boolean result = false;
- while(it.hasNext()) {
- boolean changed = union(head, it.next());
- result = result || changed;
- }
- return result;
- }
-
- /**
- * Checks the internal consistency of this data structure.
- * Throws an IllegalStateException if an inconsistency is detected.
- */
- public void checkConsistency() {
- Set<Set<T>> validatedSets = Sets.newHashSet();
- for (Set<T> itemSet: itemSets_.values()) {
- // Avoid checking the same item set multiple times.
- if (validatedSets.contains(itemSet)) continue;
- // Validate that all items in this set are properly mapped to
- // the set itself.
- for (T item: itemSet) {
- if (itemSet != itemSets_.get(item)) {
- throw new IllegalStateException("DisjointSet is in an inconsistent state.");
- }
- }
- validatedSets.add(itemSet);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/EventSequence.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/EventSequence.java b/fe/src/main/java/com/cloudera/impala/util/EventSequence.java
deleted file mode 100644
index 6b12c2e..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/EventSequence.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import java.util.List;
-
-import com.cloudera.impala.thrift.TEventSequence;
-
-import com.google.common.collect.Lists;
-
-/**
- * Wrapper around TEventSequence so that we can mark events with a single method call.
- * Events are 'marked' as they happen (so in order, with no time-travel backwards).
- */
-public class EventSequence {
- private final List<Long> timestamps_ = Lists.newArrayList();
- private final List<String> labels_ = Lists.newArrayList();
-
- private final long startTime_;
- private final String name_;
-
- public EventSequence(String name) {
- name_ = name;
- startTime_ = System.nanoTime();
- }
-
- /**
- * Saves an event at the current time with the given label.
- */
- public void markEvent(String label) {
- // Timestamps should be in ns resolution
- timestamps_.add(System.nanoTime() - startTime_);
- labels_.add(label);
- }
-
- public TEventSequence toThrift() {
- TEventSequence ret = new TEventSequence();
- ret.timestamps = timestamps_;
- ret.labels = labels_;
- ret.name = name_;
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/FileWatchService.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/FileWatchService.java b/fe/src/main/java/com/cloudera/impala/util/FileWatchService.java
deleted file mode 100644
index 88a456d..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/FileWatchService.java
+++ /dev/null
@@ -1,140 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import java.io.File;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Service to watch a file for changes. A thread periodically checks the file
- * modification time and uses the provided {@link FileChangeListener} to notify
- * a consumer.
- */
-public class FileWatchService {
- final static Logger LOG = LoggerFactory.getLogger(FileWatchService.class);
-
- // Default time to wait between checking the file.
- static final long DEFAULT_CHECK_INTERVAL_MS = 10 * 1000;
-
- // Time between checking for changes. Mutable for unit tests.
- private long checkIntervalMs_ = DEFAULT_CHECK_INTERVAL_MS;
-
- // Future returned by scheduleAtFixedRate(), needed to stop the checking thread.
- private ScheduledFuture<?> fileCheckFuture_;
-
- private final AtomicBoolean running_;
- private final FileChangeListener changeListener_; // Used to notify when changes occur.
- private final File file_; // The file to check for changes.
- private boolean alreadyWarned_; // Avoid repeatedly warning if the file is missing
- private long prevChange_; // Time of the last observed change
-
- /**
- * Listener used to notify of file changes.
- */
- public interface FileChangeListener {
-
- /**
- * Called when the file changes.
- */
- void onFileChange();
- }
-
- public FileWatchService(File file, FileChangeListener listener) {
- Preconditions.checkNotNull(file);
- Preconditions.checkNotNull(listener);
- Preconditions.checkArgument(file.exists());
- running_ = new AtomicBoolean(false);
- file_ = file;
- changeListener_ = listener;
- prevChange_ = 0L;
- alreadyWarned_ = false;
- }
-
- /**
- * Set the time (in milliseconds) to wait between checking the file for changes.
- * Only used in tests.
- */
- @VisibleForTesting
- public void setCheckIntervalMs(long checkIntervalMs) {
- checkIntervalMs_ = checkIntervalMs;
- }
-
- /**
- * Checks if the file has changed since the last observed change and if so,
- * notifies the listener.
- */
- private void checkFile() {
- if (file_.exists()) {
- long lastChange = file_.lastModified();
- if (lastChange > prevChange_) {
- changeListener_.onFileChange();
- prevChange_ = lastChange;
- alreadyWarned_ = false;
- }
- } else {
- if (!alreadyWarned_) {
- LOG.warn("File does not exist: {}", file_.getPath());
- alreadyWarned_ = true;
- }
- }
- }
-
- /**
- * Starts the thread to check for file changes. Continues checking for file changes
- * every 'checkIntervalMs_' milliseconds until stop() is called.
- */
- public synchronized void start() {
- Preconditions.checkState(!running_.get());
- running_.set(true);
-
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("FileWatchThread(" + file_.getPath() + ")-%d")
- .build());
- fileCheckFuture_ = executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- try {
- checkFile();
- } catch (SecurityException e) {
- LOG.warn("Not allowed to check read file existence: " + file_.getPath(), e);
- }
- }
- }, 0L, checkIntervalMs_, TimeUnit.MILLISECONDS);
- }
-
- /**
- * Stops the file watching thread.
- */
- public synchronized void stop() {
- Preconditions.checkState(running_.get());
- running_.set(false);
- fileCheckFuture_.cancel(false);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/FsPermissionChecker.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/FsPermissionChecker.java b/fe/src/main/java/com/cloudera/impala/util/FsPermissionChecker.java
deleted file mode 100644
index 7523cc8..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/FsPermissionChecker.java
+++ /dev/null
@@ -1,301 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Map;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclEntryType;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.AclEntryScope;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.hdfs.protocol.AclException;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Lists;
-
-/**
- * Singleton class that can check whether the current user has permission to access paths
- * in a FileSystem.
- */
-public class FsPermissionChecker {
- private final static Logger LOG = LoggerFactory.getLogger(FsPermissionChecker.class);
- private final static FsPermissionChecker instance_;
- private final static Configuration CONF;
- protected final String user_;
- private final Set<String> groups_ = new HashSet<String>();
- private final String supergroup_;
-
- static {
- CONF = new Configuration();
- try {
- instance_ = new FsPermissionChecker();
- } catch (IOException e) {
- throw new RuntimeException(
- "Error initializing FsPermissionChecker: " + e.getMessage(), e);
- }
- }
-
- private FsPermissionChecker() throws IOException {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- groups_.addAll(Arrays.asList(ugi.getGroupNames()));
- supergroup_ = CONF.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
- DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
- user_ = ugi.getShortUserName();
- }
-
- private boolean isSuperUser() { return groups_.contains(supergroup_); }
-
- private static List<AclEntryType> ACL_TYPE_PRIORITY =
- ImmutableList.of(AclEntryType.USER, AclEntryType.GROUP, AclEntryType.OTHER);
-
- /**
- * Allows checking different access permissions of a file without repeatedly accessing
- * the underlying filesystem by caching the results of a status call at construction.
- */
- public class Permissions {
- private final FileStatus fileStatus_;
- private final FsPermission permissions_;
- private final AclStatus aclStatus_;
- private Map<AclEntryType, List<AclEntry>> entriesByTypes_ = Maps.newHashMap();
- private AclEntry mask_;
-
- /**
- * If aclStatus is null, ACL permissions are not checked.
- */
- protected Permissions(FileStatus fileStatus, AclStatus aclStatus) {
- Preconditions.checkNotNull(fileStatus);
- fileStatus_ = fileStatus;
- permissions_ = fileStatus.getPermission();
- aclStatus_ = aclStatus;
- if (aclStatus_ == null) return;
-
- // Group the ACLs by type, so that we can apply them in correct priority order. Not
- // clear from documentation whether aclStatus_.getEntries() guarantees this
- // ordering, so this is defensive.
- for (AclEntryType t: ACL_TYPE_PRIORITY) {
- entriesByTypes_.put(t, Lists.<AclEntry>newArrayList());
- }
-
- List<AclEntry> fullAclList =
- getAclFromPermAndEntries(permissions_, aclStatus_.getEntries());
- for (AclEntry e: fullAclList) {
- if (e.getType() == AclEntryType.MASK && e.getScope() != AclEntryScope.DEFAULT) {
- mask_ = e;
- } else if (isApplicableAcl(e)) {
- entriesByTypes_.get(e.getType()).add(e);
- }
- }
- }
-
- /**
- * Returns true if the mask should apply. The mask ACL applies only to unnamed user
- * ACLs (e.g. user::r-x), and all group ACLs.
- */
- private boolean shouldApplyMask(AclEntry acl) {
- if (mask_ == null) return false;
-
- switch (acl.getType()) {
- case USER:
- return acl.getName() != null;
- case GROUP:
- return true;
- }
- return false;
- }
-
- /**
- * Returns true if this ACL applies to the current user and / or group
- */
- private boolean isApplicableAcl(AclEntry e) {
- // Default ACLs are not used for permission checking, but instead control the
- // permissions received by child directories
- if (e.getScope() == AclEntryScope.DEFAULT) return false;
-
- switch (e.getType()) {
- case USER:
- String aclUser = e.getName() == null ? aclStatus_.getOwner() : e.getName();
- return FsPermissionChecker.this.user_.equals(aclUser);
- case GROUP:
- String aclGroup = e.getName() == null ? aclStatus_.getGroup() : e.getName();
- return FsPermissionChecker.this.groups_.contains(aclGroup);
- case OTHER:
- return true;
- case MASK:
- return false;
- default:
- LOG.warn("Unknown Acl type: " + e.getType());
- return false;
- }
- }
-
- /**
- * Returns true if ACLs allow 'action', false if they explicitly disallow 'action',
- * and 'null' if no ACLs are available.
- * See http://users.suse.com/~agruen/acl/linux-acls/online for more details about
- * acl access check algorithm.
- */
- private Boolean checkAcls(FsAction action) {
- // ACLs may not be enabled, so we need this ternary logic. If no ACLs are available,
- // returning null causes us to fall back to standard ugo permissions.
- if (aclStatus_ == null) return null;
-
- // Remember if there is an applicable ACL entry, including owner user, named user,
- // owning group, named group.
- boolean foundMatch = false;
- for (AclEntryType t: ACL_TYPE_PRIORITY) {
- for (AclEntry e: entriesByTypes_.get(t)) {
- if (t == AclEntryType.OTHER) {
- // Processed all ACL entries except the OTHER entry.
- // If found applicable ACL entries but none of them contain requested
- // permission, deny access. Otherwise check OTHER entry.
- return foundMatch ? false : e.getPermission().implies(action);
- }
- // If there is an applicable mask, 'action' is allowed iff both the mask and
- // the underlying ACL permit it.
- if (e.getPermission().implies(action)) {
- if (shouldApplyMask(e)) {
- if (mask_.getPermission().implies(action)) return true;
- } else {
- return true;
- }
- }
- // User ACL entry has priority, no need to continue check.
- if (t == AclEntryType.USER) return false;
-
- foundMatch = true;
- }
- }
- return false;
- }
-
- /**
- * Returns true if the current user can perform the given action given these
- * permissions.
- */
- public boolean checkPermissions(FsAction action) {
- if (FsPermissionChecker.this.isSuperUser()) return true;
- Boolean aclPerms = checkAcls(action);
- if (aclPerms != null) return aclPerms;
-
- // Check user, group and then 'other' permissions in turn.
- if (FsPermissionChecker.this.user_.equals(fileStatus_.getOwner())) {
- // If the user matches, we must return their access rights whether or not the user
- // is allowed to access without checking the group. This is counter-intuitive if
- // the user cannot access the file, but the group permissions would allow it, but
- // is consistent with UNIX behaviour.
- return permissions_.getUserAction().implies(action);
- }
-
- if (FsPermissionChecker.this.groups_.contains(fileStatus_.getGroup())) {
- return permissions_.getGroupAction().implies(action);
- }
- return permissions_.getOtherAction().implies(action);
- }
-
- public boolean canRead() { return checkPermissions(FsAction.READ); }
- public boolean canWrite() { return checkPermissions(FsAction.WRITE); }
- public boolean canReadAndWrite() { return canRead() && canWrite(); }
-
- // This was originally lifted from Hadoop. Won't need it if HDFS-7177 is resolved.
- // getAclStatus() returns just extended ACL entries, the default file permissions
- // like "user::,group::,other::" are not included. We need to combine them together
- // to get full logic ACL list.
- private List<AclEntry> getAclFromPermAndEntries(FsPermission perm,
- List<AclEntry> entries) {
- // File permission always have 3 items.
- List<AclEntry> aclEntries = Lists.newArrayListWithCapacity(entries.size() + 3);
-
- // Owner entry implied by owner permission bits.
- aclEntries.add(new AclEntry.Builder()
- .setScope(AclEntryScope.ACCESS)
- .setType(AclEntryType.USER)
- .setPermission(perm.getUserAction())
- .build());
-
- // All extended access ACL entries add by "-setfacl" other than default file
- // permission.
- boolean hasAccessAcl = false;
- for (AclEntry entry: entries) {
- // AclEntry list should be ordered, all ACCESS one are in first half, DEFAULT one
- // are in second half, so no need to continue here.
- if (entry.getScope() == AclEntryScope.DEFAULT) break;
- hasAccessAcl = true;
- aclEntries.add(entry);
- }
-
- // Mask entry implied by group permission bits, or group entry if there is
- // no access ACL (only default ACL).
- aclEntries.add(new AclEntry.Builder()
- .setScope(AclEntryScope.ACCESS)
- .setType(hasAccessAcl ? AclEntryType.MASK : AclEntryType.GROUP)
- .setPermission(perm.getGroupAction())
- .build());
-
- // Other entry implied by other bits.
- aclEntries.add(new AclEntry.Builder()
- .setScope(AclEntryScope.ACCESS)
- .setType(AclEntryType.OTHER)
- .setPermission(perm.getOtherAction())
- .build());
-
- return aclEntries;
- }
- }
-
- /**
- * Returns a Permissions object that can answer all access permission queries for the
- * given path.
- */
- public Permissions getPermissions(FileSystem fs, Path path) throws IOException {
- Preconditions.checkNotNull(fs);
- Preconditions.checkNotNull(path);
- AclStatus aclStatus = null;
- try {
- aclStatus = fs.getAclStatus(path);
- } catch (AclException ex) {
- LOG.trace("No ACLs retrieved, skipping ACLs check (HDFS will enforce ACLs)", ex);
- } catch (UnsupportedOperationException ex) {
- LOG.trace("No ACLs retrieved, unsupported", ex);
- }
- return new Permissions(fs.getFileStatus(path), aclStatus);
- }
-
- /**
- * Returns the FsPermissionChecker singleton.
- */
- public static FsPermissionChecker getInstance() { return instance_; }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/GlogAppender.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/GlogAppender.java b/fe/src/main/java/com/cloudera/impala/util/GlogAppender.java
deleted file mode 100644
index a5e1eb1..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/GlogAppender.java
+++ /dev/null
@@ -1,129 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import java.util.Properties;
-
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.spi.LoggingEvent;
-
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.thrift.TLogLevel;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-
-/**
- * log4j appender which calls into C++ code to log messages at their correct severities
- * via glog.
- */
-public class GlogAppender extends AppenderSkeleton {
- // GLOG takes care of formatting, so we don't require a layout
- public boolean requiresLayout() { return false; }
-
- // Required implementation by superclass.
- public void ActivateOptions() { }
-
- // Required implementation by superclass
- public void close() { }
-
- private TLogLevel levelToSeverity(Level level) {
- Preconditions.checkState(!level.equals(Level.OFF));
- // TODO: Level does not work well in a HashMap or switch statement due to some
- // strangeness with equality testing.
- if (level.equals(Level.TRACE)) return TLogLevel.VLOG_3;
- if (level.equals(Level.ALL)) return TLogLevel.VLOG_3;
- if (level.equals(Level.DEBUG)) return TLogLevel.VLOG;
- if (level.equals(Level.ERROR)) return TLogLevel.ERROR;
- if (level.equals(Level.FATAL)) return TLogLevel.FATAL;
- if (level.equals(Level.INFO)) return TLogLevel.INFO;
- if (level.equals(Level.WARN)) return TLogLevel.WARN;
-
- throw new IllegalStateException("Unknown log level: " + level.toString());
- }
-
- @Override
- public void append(LoggingEvent event) {
- Level level = event.getLevel();
- if (level.equals(Level.OFF)) return;
-
- String msg = event.getRenderedMessage();
- if (event.getThrowableInformation() != null) {
- msg = msg + "\nJava exception follows:\n" +
- Joiner.on("\n").join(event.getThrowableStrRep());
- }
- int lineNumber = Integer.parseInt(event.getLocationInformation().getLineNumber());
- String fileName = event.getLocationInformation().getFileName();
- NativeLogger.LogToGlog(
- levelToSeverity(level).getValue(), msg, fileName, lineNumber);
- }
-
- /**
- * Returns a log4j level string corresponding to the Glog log level
- */
- private static String log4jLevelForTLogLevel(TLogLevel logLevel)
- throws InternalException {
- switch (logLevel) {
- case INFO: return "INFO";
- case WARN: return "WARN";
- case ERROR: return "ERROR";
- case FATAL: return "FATAL";
- case VLOG:
- case VLOG_2: return "DEBUG";
- case VLOG_3: return "TRACE";
- default: throw new InternalException("Unknown log level:" + logLevel);
- }
- }
-
- /**
- * Manually override Log4j root logger configuration. Any values in log4j.properties
- * not overridden (that is, anything but the root logger and its default level) will
- * continue to have effect.
- * - impalaLogLevel - the maximum log level for com.cloudera.impala.* classes
- * - otherLogLevel - the maximum log level for all other classes
- */
- public static void Install(TLogLevel impalaLogLevel, TLogLevel otherLogLevel)
- throws InternalException {
- Properties properties = new Properties();
- properties.setProperty("log4j.appender.glog", GlogAppender.class.getName());
-
- // These settings are relatively subtle. log4j provides many ways to filter log
- // messages, and configuring them in the right order is a bit of black magic.
- //
- // The 'Threshold' property supercedes everything, so must be set to its most
- // permissive and applies to any message sent to the glog appender.
- //
- // The 'rootLogger' property controls the default maximum logging level (where more
- // verbose->larger logging level) for the entire space of classes. This will apply to
- // all non-Impala classes, so is set to otherLogLevel.
- //
- // Finally we can configure per-package logging which overrides the rootLogger
- // setting. In order to control Impala's logging independently of the rest of the
- // world, we set the log level for com.cloudera.impala.
- properties.setProperty("log4j.rootLogger",
- log4jLevelForTLogLevel(otherLogLevel) + ",glog");
- properties.setProperty("log4j.appender.glog.Threshold", "TRACE");
- properties.setProperty("log4j.logger.com.cloudera.impala",
- log4jLevelForTLogLevel(impalaLogLevel));
- PropertyConfigurator.configure(properties);
- Logger.getLogger(GlogAppender.class).info(String.format("Logging initialized. " +
- "Impala: %s, All other: %s", impalaLogLevel, otherLogLevel));
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/HdfsCachingUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/HdfsCachingUtil.java b/fe/src/main/java/com/cloudera/impala/util/HdfsCachingUtil.java
deleted file mode 100644
index a3a1fa0..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/HdfsCachingUtil.java
+++ /dev/null
@@ -1,515 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.log4j.Logger;
-
-import com.cloudera.impala.analysis.TableName;
-import com.cloudera.impala.catalog.HdfsPartition;
-import com.cloudera.impala.common.FileSystemUtil;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.thrift.JniCatalogConstants;
-import com.cloudera.impala.thrift.THdfsCachingOp;
-import com.google.common.base.Preconditions;
-
-/**
- * Utility class for submitting and dropping HDFS cache requests.
- */
-public class HdfsCachingUtil {
- private static final Logger LOG = Logger.getLogger(HdfsCachingUtil.class);
-
- // The key name used to save cache directive IDs in table/partition properties.
- public final static String CACHE_DIR_ID_PROP_NAME = "cache_directive_id";
-
- // The key name used to store the replication factor for cached files
- public final static String CACHE_DIR_REPLICATION_PROP_NAME = "cache_replication";
-
- // The number of caching refresh intervals that can go by when waiting for data to
- // become cached before assuming no more progress is being made.
- private final static int MAX_UNCHANGED_CACHING_REFRESH_INTERVALS = 5;
-
- private static DistributedFileSystem dfs = null;
-
- /**
- * Returns the dfs singleton object.
- */
- private static DistributedFileSystem getDfs() throws ImpalaRuntimeException {
- if (dfs == null) {
- try {
- dfs = FileSystemUtil.getDistributedFileSystem();
- } catch (IOException e) {
- throw new ImpalaRuntimeException("HdfsCachingUtil failed to initialize the " +
- "DistributedFileSystem: ", e);
- }
- }
- return dfs;
- }
-
- /**
- * Caches the location of the given Hive Metastore Table and updates the
- * table's properties with the submitted cache directive ID. The caller is
- * responsible for not caching the same table twice, as HDFS will create a second
- * cache directive even if it is similar to an already existing one.
- *
- * Returns the ID of the submitted cache directive and throws if there is an error
- * submitting.
- */
- public static long submitCacheTblDirective(
- org.apache.hadoop.hive.metastore.api.Table table,
- String poolName, short replication) throws ImpalaRuntimeException {
- long id = HdfsCachingUtil.submitDirective(new Path(table.getSd().getLocation()),
- poolName, replication);
- table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
- table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
- return id;
- }
-
- /**
- * Caches the location of the given partition and updates the
- * partitions's properties with the submitted cache directive ID. The caller is
- * responsible for not caching the same partition twice, as HDFS will create a second
- * cache directive even if it is similar to an already existing one.
- *
- * Returns the ID of the submitted cache directive and throws if there is an error
- * submitting the directive.
- */
- public static long submitCachePartitionDirective(HdfsPartition part,
- String poolName, short replication) throws ImpalaRuntimeException {
- long id = HdfsCachingUtil.submitDirective(new Path(part.getLocation()),
- poolName, replication);
- part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
- part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
- return id;
- }
-
- /**
- * Convenience method for working directly on a metastore partition. See
- * submitCachePartitionDirective(HdfsPartition, String, short) for more details.
- */
- public static long submitCachePartitionDirective(
- org.apache.hadoop.hive.metastore.api.Partition part,
- String poolName, short replication) throws ImpalaRuntimeException {
- long id = HdfsCachingUtil.submitDirective(new Path(part.getSd().getLocation()),
- poolName, replication);
- part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
- part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
- return id;
- }
-
- /**
- * Removes the cache directive associated with the table from HDFS, uncaching all
- * data. Also updates the table's metadata. No-op if the table is not cached.
- */
- public static void uncacheTbl(org.apache.hadoop.hive.metastore.api.Table table)
- throws ImpalaRuntimeException {
- Preconditions.checkNotNull(table);
- LOG.debug("Uncaching table: " + table.getDbName() + "." + table.getTableName());
- Long id = getCacheDirectiveId(table.getParameters());
- if (id == null) return;
- HdfsCachingUtil.removeDirective(id);
- table.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
- table.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
- }
-
- /**
- * Removes the cache directive associated with the partition from HDFS, uncaching all
- * data. Also updates the partition's metadata to remove the cache directive ID.
- * No-op if the table is not cached.
- */
- public static void uncachePartition(HdfsPartition part) throws ImpalaException {
- Preconditions.checkNotNull(part);
- Long id = getCacheDirectiveId(part.getParameters());
- if (id == null) return;
- HdfsCachingUtil.removeDirective(id);
- part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
- part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
- }
-
- /**
- * Convenience method for working directly on a metastore partition. See
- * uncachePartition(HdfsPartition) for more details.
- */
- public static void uncachePartition(
- org.apache.hadoop.hive.metastore.api.Partition part) throws ImpalaException {
- Preconditions.checkNotNull(part);
- Long id = getCacheDirectiveId(part.getParameters());
- if (id == null) return;
- HdfsCachingUtil.removeDirective(id);
- part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
- part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
- }
-
- /**
- * Returns the cache directive ID from the given table/partition parameter
- * map. Returns null if the CACHE_DIR_ID_PROP_NAME key was not set or if
- * there was an error parsing the associated ID.
- */
- public static Long getCacheDirectiveId(Map<String, String> params) {
- if (params == null) return null;
- String idStr = params.get(CACHE_DIR_ID_PROP_NAME);
- if (idStr == null) return null;
- try {
- return Long.parseLong(idStr);
- } catch (NumberFormatException e) {
- return null;
- }
- }
-
- /**
- * Given a cache directive ID, returns the pool the directive is cached in.
- * Returns null if no outstanding cache directive match this ID.
- */
- public static String getCachePool(long directiveId)
- throws ImpalaRuntimeException {
- CacheDirectiveEntry entry = getDirective(directiveId);
- return entry == null ? null : entry.getInfo().getPool();
- }
-
- /**
- * Given a cache directive ID, returns the replication factor for the directive.
- * Returns null if no outstanding cache directives match this ID.
- */
- public static Short getCacheReplication(long directiveId)
- throws ImpalaRuntimeException {
- CacheDirectiveEntry entry = getDirective(directiveId);
- return entry != null ? entry.getInfo().getReplication() : null;
- }
-
- /**
- * Returns the cache replication value from the parameters map. We assume that only
- * cached table parameters are used and the property is always present.
- */
- public static Short getCachedCacheReplication(Map<String, String> params) {
- Preconditions.checkNotNull(params);
- String replication = params.get(CACHE_DIR_REPLICATION_PROP_NAME);
- if (replication == null) {
- return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
- }
- try {
- return Short.parseShort(replication);
- } catch (NumberFormatException e) {
- return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
- }
- }
-
- /**
- * Waits on a cache directive to either complete or stop making progress. Progress is
- * checked by polling the HDFS caching stats every
- * DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS. We verify the request's
- * "currentBytesCached" is increasing compared to "bytesNeeded".
- * If "currentBytesCached" == "bytesNeeded" or if no progress is made for a
- * MAX_UNCHANGED_CACHING_REFRESH_INTERVALS, this function returns.
- */
- public static void waitForDirective(long directiveId)
- throws ImpalaRuntimeException {
- long bytesNeeded = 0L;
- long currentBytesCached = 0L;
- CacheDirectiveEntry cacheDir = getDirective(directiveId);
- if (cacheDir == null) return;
-
- bytesNeeded = cacheDir.getStats().getBytesNeeded();
- currentBytesCached = cacheDir.getStats().getBytesCached();
- LOG.debug(String.format("Waiting on cache directive id: %d. Bytes " +
- "cached (%d) / needed (%d)", directiveId, currentBytesCached, bytesNeeded));
- // All the bytes are cached, just return.
- if (bytesNeeded == currentBytesCached) return;
-
- // The refresh interval is how often HDFS will update cache directive stats. We use
- // this value to determine how frequently we should poll for changes.
- long hdfsRefreshIntervalMs = getDfs().getConf().getLong(
- DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
- DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
- Preconditions.checkState(hdfsRefreshIntervalMs > 0);
-
- // Loop until either MAX_UNCHANGED_CACHING_REFRESH_INTERVALS have passed with no
- // changes or all required data is cached.
- int unchangedCounter = 0;
- while (unchangedCounter < MAX_UNCHANGED_CACHING_REFRESH_INTERVALS) {
- long previousBytesCached = currentBytesCached;
- cacheDir = getDirective(directiveId);
- if (cacheDir == null) return;
- currentBytesCached = cacheDir.getStats().getBytesCached();
- bytesNeeded = cacheDir.getStats().getBytesNeeded();
- if (currentBytesCached == bytesNeeded) {
- LOG.debug(String.format("Cache directive id: %d has completed." +
- "Bytes cached (%d) / needed (%d)", directiveId, currentBytesCached,
- bytesNeeded));
- return;
- }
-
- if (currentBytesCached == previousBytesCached) {
- ++unchangedCounter;
- } else {
- unchangedCounter = 0;
- }
- try {
- // Sleep for the refresh interval + a little bit more to ensure a full interval
- // has completed. A value of 25% the refresh interval was arbitrarily chosen.
- Thread.sleep((long) (hdfsRefreshIntervalMs * 1.25));
- } catch (InterruptedException e) { /* ignore */ }
- }
- LOG.warn(String.format("No changes in cached bytes in: %d(ms). All data may not " +
- "be cached. Final stats for cache directive id: %d. Bytes cached (%d)/needed " +
- "(%d)", hdfsRefreshIntervalMs * MAX_UNCHANGED_CACHING_REFRESH_INTERVALS,
- directiveId, currentBytesCached, bytesNeeded));
- }
-
- /**
- * Submits a new caching directive for the specified cache pool name, path and
- * replication. Returns the directive ID if the submission was successful or an
- * ImpalaRuntimeException if the submission fails.
- */
- private static long submitDirective(Path path, String poolName, short replication)
- throws ImpalaRuntimeException {
- Preconditions.checkNotNull(path);
- Preconditions.checkState(poolName != null && !poolName.isEmpty());
- CacheDirectiveInfo info = new CacheDirectiveInfo.Builder()
- .setExpiration(Expiration.NEVER)
- .setPool(poolName)
- .setReplication(replication)
- .setPath(path).build();
- LOG.debug("Submitting cache directive: " + info.toString());
- try {
- return getDfs().addCacheDirective(info);
- } catch (IOException e) {
- throw new ImpalaRuntimeException(e.getMessage(), e);
- }
- }
-
- /**
- * Update cache directive for a table and updates the metastore parameters.
- * Returns the cache directive ID
- */
- public static long modifyCacheDirective(Long id,
- org.apache.hadoop.hive.metastore.api.Table table,
- String poolName, short replication) throws ImpalaRuntimeException {
- Preconditions.checkNotNull(id);
- HdfsCachingUtil.modifyCacheDirective(id, new Path(table.getSd().getLocation()),
- poolName, replication);
- table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
- table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
- return id;
- }
-
- /**
- * Update cache directive for a partition and update the metastore parameters.
- * Returns the cache directive ID
- */
- public static long modifyCacheDirective(Long id, HdfsPartition part, String poolName,
- short replication) throws ImpalaRuntimeException {
- Preconditions.checkNotNull(id);
- HdfsCachingUtil.modifyCacheDirective(id, new Path(part.getLocation()),
- poolName, replication);
- part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
- part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
- return id;
- }
-
- /**
- * Update an existing cache directive to avoid having the same entry multiple
- * times
- */
- private static void modifyCacheDirective(Long id, Path path, String poolName,
- short replication) throws ImpalaRuntimeException {
- Preconditions.checkNotNull(path);
- Preconditions.checkNotNull(id);
- Preconditions.checkState(poolName != null && !poolName.isEmpty());
- CacheDirectiveInfo info = new CacheDirectiveInfo.Builder()
- .setId(id)
- .setExpiration(Expiration.NEVER)
- .setPool(poolName)
- .setReplication(replication)
- .setPath(path).build();
- LOG.debug("Modifying cache directive: " + info.toString());
- try {
- getDfs().modifyCacheDirective(info);
- } catch (IOException e) {
- throw new ImpalaRuntimeException(e.getMessage(), e);
- }
- }
-
- /**
- * Removes the given cache directive if it exists, uncaching the data. If the
- * cache request does not exist in HDFS no error is returned.
- * Throws an ImpalaRuntimeException if there was any problem removing the
- * directive.
- */
- private static void removeDirective(long directiveId) throws ImpalaRuntimeException {
- LOG.debug("Removing cache directive id: " + directiveId);
- try {
- getDfs().removeCacheDirective(directiveId);
- } catch (IOException e) {
- // There is no special exception type for the case where a directive ID does not
- // exist so we must inspect the error message.
- if (e.getMessage().contains("No directive with ID")) return;
- throw new ImpalaRuntimeException(e.getMessage(), e);
- }
- }
-
- /**
- * Gets the cache directive matching the given ID. Returns null if no matching
- * directives were found.
- */
- private static CacheDirectiveEntry getDirective(long directiveId)
- throws ImpalaRuntimeException {
- LOG.trace("Getting cache directive id: " + directiveId);
- CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder()
- .setId(directiveId)
- .build();
- try {
- RemoteIterator<CacheDirectiveEntry> itr = getDfs().listCacheDirectives(filter);
- if (itr.hasNext()) return itr.next();
- } catch (IOException e) {
- // Handle connection issues with e.g. HDFS and possible not found errors
- throw new ImpalaRuntimeException(e.getMessage(), e);
- }
- throw new ImpalaRuntimeException(
- "HDFS cache directive filter returned empty result. This must not happen");
- }
-
- /**
- * Check if the poolName matches the pool of the cache directive
- * identified by directiveId
- */
- public static boolean isSamePool(String poolName, Long directiveId)
- throws ImpalaRuntimeException {
- return poolName.equals(getCachePool(directiveId));
- }
-
- /**
- * Helper method for frequent lookup of replication factor in the thrift caching
- * structure.
- */
- public static short getReplicationOrDefault(THdfsCachingOp op) {
- return op.isSetReplication() ? op.getReplication() :
- JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
- }
-
- /**
- * Returns a boolean indicating if the given thrift caching operation would perform an
- * update on an already existing cache directive.
- */
- public static boolean isUpdateOp(THdfsCachingOp op, Map<String, String> params)
- throws ImpalaRuntimeException {
-
- Long directiveId = Long.parseLong(params.get(CACHE_DIR_ID_PROP_NAME));
- CacheDirectiveEntry entry = getDirective(directiveId);
- Preconditions.checkNotNull(entry);
-
- // Verify cache pool
- if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
- return false;
- }
-
- // Check cache replication factor
- if ((op.isSetReplication() && op.getReplication() !=
- entry.getInfo().getReplication()) || ( !op.isSetReplication() &&
- entry.getInfo().getReplication() !=
- JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR)) {
- return true;
- }
- return false;
- }
-
- /**
- * Validates the properties of the chosen cache pool. Throws on error.
- */
- public static void validateCachePool(THdfsCachingOp op, Long directiveId,
- TableName table, HdfsPartition partition)
- throws ImpalaRuntimeException {
-
- CacheDirectiveEntry entry = getDirective(directiveId);
- Preconditions.checkNotNull(entry);
-
- if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
- throw new ImpalaRuntimeException(String.format("Cannot cache partition in " +
- "pool '%s' because it is already cached in '%s'. To change the cache " +
- "pool for this partition, first uncache using: ALTER TABLE %s.%s " +
- "%sSET UNCACHED", op.getCache_pool_name(),
- entry.getInfo().getPool(), table.getDb(), table,
- // Insert partition string if partition non null
- partition != null ? String.format(" PARTITION(%s) ",
- partition.getPartitionName().replaceAll("/", ", ")) : ""));
- }
- }
-
- /**
- * Validates the properties of the chosen cache pool. Throws on error.
- */
- public static void validateCachePool(THdfsCachingOp op, Long directiveId,
- TableName table) throws ImpalaRuntimeException {
- validateCachePool(op, directiveId, table, null);
- }
-
- /**
- * Validates and returns true if a parameter map contains a cache directive ID and
- * validates it against the NameNode to make sure it exists. If the cache
- * directive ID does not exist, we remove the value from the parameter map,
- * issue a log message and return false. As the value is not written back to the
- * Hive MS from this method, the result will be only valid until the next metadata
- * fetch. Lastly, we update the cache replication factor in the parameters with the
- * value read from HDFS.
- */
- public static boolean validateCacheParams(Map<String, String> params) {
- Long directiveId = getCacheDirectiveId(params);
- if (directiveId == null) return false;
-
- CacheDirectiveEntry entry = null;
- try {
- entry = getDirective(directiveId);
- } catch (ImpalaRuntimeException e) {
- if (e.getCause() != null && e.getCause() instanceof RemoteException) {
- // This exception signals that the cache directive no longer exists.
- LOG.error("Cache directive does not exist", e);
- params.remove(CACHE_DIR_ID_PROP_NAME);
- params.remove(CACHE_DIR_REPLICATION_PROP_NAME);
- } else {
- // This exception signals that there was a connection problem with HDFS.
- LOG.error("IO Exception, possible connectivity issues with HDFS", e);
- }
- return false;
- }
- Preconditions.checkNotNull(entry);
-
- // On the upgrade path the property might not exist, if it exists
- // and is different from the one from the meta store, issue a warning.
- String replicationFactor = params.get(CACHE_DIR_REPLICATION_PROP_NAME);
- if (replicationFactor != null &&
- Short.parseShort(replicationFactor) != entry.getInfo().getReplication()) {
- LOG.info("Replication factor for entry in HDFS differs from value in Hive MS: " +
- entry.getInfo().getPath().toString() + " " +
- entry.getInfo().getReplication().toString() + " != " +
- params.get(CACHE_DIR_REPLICATION_PROP_NAME));
- }
- params.put(CACHE_DIR_REPLICATION_PROP_NAME,
- String.valueOf(entry.getInfo().getReplication()));
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java b/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java
deleted file mode 100644
index 4f627d8..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java
+++ /dev/null
@@ -1,268 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import java.io.StringReader;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import javax.json.Json;
-import javax.json.JsonArray;
-import javax.json.JsonReader;
-
-import com.cloudera.impala.catalog.ScalarType;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.thrift.TDistributeByRangeParam;
-import com.cloudera.impala.thrift.TRangeLiteral;
-import com.cloudera.impala.thrift.TRangeLiteralList;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.PartialRow;
-
-import static com.cloudera.impala.catalog.Type.parseColumnType;
-import static java.lang.String.format;
-
-public class KuduUtil {
-
- private static final String SPLIT_KEYS_ERROR_MESSAGE = "Error parsing splits keys.";
-
- /**
- * Compare the schema of a HMS table and a Kudu table. Returns true if both tables have
- * a matching schema.
- */
- public static boolean compareSchema(Table msTable, KuduTable kuduTable)
- throws ImpalaRuntimeException {
- List<FieldSchema> msFields = msTable.getSd().getCols();
- List<ColumnSchema> kuduFields = kuduTable.getSchema().getColumns();
- if (msFields.size() != kuduFields.size()) return false;
-
- HashMap<String, ColumnSchema> kuduFieldMap = Maps.newHashMap();
- for (ColumnSchema kuduField : kuduFields) {
- kuduFieldMap.put(kuduField.getName().toUpperCase(), kuduField);
- }
-
- for (FieldSchema msField : msFields) {
- ColumnSchema kuduField = kuduFieldMap.get(msField.getName().toUpperCase());
- if (kuduField == null
- || fromImpalaType(parseColumnType(msField.getType())) != kuduField.getType()) {
- return false;
- }
- }
-
- return true;
- }
-
- /**
- * Parses split keys from statements.
- *
- * Split keys are expected to be in json, as an array of arrays, in the form:
- * '[[value1_col1, value1_col2, ...], [value2_col1, value2_col2, ...], ...]'
- *
- * Each inner array corresponds to a split key and should have one matching entry for
- * each key column specified in 'schema'.
- */
- public static List<PartialRow> parseSplits(Schema schema, String kuduSplits)
- throws ImpalaRuntimeException {
-
- // If there are no splits return early.
- if (kuduSplits == null || kuduSplits.isEmpty()) return ImmutableList.of();
-
- ImmutableList.Builder<PartialRow> splitRows = ImmutableList.builder();
-
- // ...Otherwise parse the splits. We're expecting splits in the format of a list of
- // lists of keys. We only support specifying splits for int and string keys
- // (currently those are the only type of keys allowed in Kudu too).
- try {
- JsonReader jr = Json.createReader(new StringReader(kuduSplits));
- JsonArray keysList = jr.readArray();
- for (int i = 0; i < keysList.size(); i++) {
- PartialRow splitRow = new PartialRow(schema);
- JsonArray compoundKey = keysList.getJsonArray(i);
- if (compoundKey.size() != schema.getPrimaryKeyColumnCount()) {
- throw new ImpalaRuntimeException(SPLIT_KEYS_ERROR_MESSAGE +
- " Wrong number of keys.");
- }
- for (int j = 0; j < compoundKey.size(); j++) {
- setKey(schema.getColumnByIndex(j).getType(), compoundKey, j, splitRow);
- }
- splitRows.add(splitRow);
- }
- } catch (ImpalaRuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new ImpalaRuntimeException(SPLIT_KEYS_ERROR_MESSAGE + " Problem parsing json"
- + ": " + e.getMessage(), e);
- }
-
- return splitRows.build();
- }
-
- /**
- * Given the TDistributeByRangeParam from the CREATE statement, creates the
- * appropriate split rows.
- */
- public static List<PartialRow> parseSplits(Schema schema,
- TDistributeByRangeParam param) throws ImpalaRuntimeException {
- ImmutableList.Builder<PartialRow> splitRows = ImmutableList.builder();
- for (TRangeLiteralList literals : param.getSplit_rows()) {
- PartialRow splitRow = new PartialRow(schema);
- List<TRangeLiteral> literalValues = literals.getValues();
- for (int i = 0; i < literalValues.size(); ++i) {
- String colName = param.getColumns().get(i);
- ColumnSchema col = schema.getColumn(colName);
- setKey(col.getType(), literalValues.get(i), schema.getColumnIndex(colName),
- colName, splitRow);
- }
- splitRows.add(splitRow);
- }
- return splitRows.build();
- }
-
- /**
- * Sets the value in 'key' at 'pos', given the json representation.
- */
- private static void setKey(Type type, JsonArray array, int pos, PartialRow key)
- throws ImpalaRuntimeException {
- switch (type) {
- case BOOL: key.addBoolean(pos, array.getBoolean(pos)); break;
- case INT8: key.addByte(pos, (byte) array.getInt(pos)); break;
- case INT16: key.addShort(pos, (short) array.getInt(pos)); break;
- case INT32: key.addInt(pos, array.getInt(pos)); break;
- case INT64: key.addLong(pos, array.getJsonNumber(pos).longValue()); break;
- case STRING: key.addString(pos, array.getString(pos)); break;
- default:
- throw new ImpalaRuntimeException("Key columns not supported for type: "
- + type.toString());
- }
- }
-
- /**
- * Sets the value in 'key' at 'pos', given the range literal.
- */
- private static void setKey(Type type, TRangeLiteral literal, int pos, String colName,
- PartialRow key) throws ImpalaRuntimeException {
- switch (type) {
- case BOOL:
- checkCorrectType(literal.isSetBool_literal(), type, colName, literal);
- key.addBoolean(pos, literal.isBool_literal());
- break;
- case INT8:
- checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
- key.addByte(pos, (byte) literal.getInt_literal());
- break;
- case INT16:
- checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
- key.addShort(pos, (short) literal.getInt_literal());
- break;
- case INT32:
- checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
- key.addInt(pos, (int) literal.getInt_literal());
- break;
- case INT64:
- checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
- key.addLong(pos, literal.getInt_literal());
- break;
- case STRING:
- checkCorrectType(literal.isSetString_literal(), type, colName, literal);
- key.addString(pos, literal.getString_literal());
- break;
- default:
- throw new ImpalaRuntimeException("Key columns not supported for type: "
- + type.toString());
- }
- }
-
- /**
- * If correctType is true, returns. Otherwise throws a formatted error message
- * indicating problems with the type of the literal of the range literal.
- */
- private static void checkCorrectType(boolean correctType, Type t, String colName,
- TRangeLiteral literal) throws ImpalaRuntimeException {
- if (correctType) return;
- throw new ImpalaRuntimeException(
- format("Expected %s literal for column '%s' got '%s'", t.getName(), colName,
- toString(literal)));
- }
-
- /**
- * Parses a string of the form "a, b, c" and returns a set of values split by ',' and
- * stripped of the whitespace.
- */
- public static HashSet<String> parseKeyColumns(String cols) {
- return Sets.newHashSet(Splitter.on(",").trimResults().split(cols.toLowerCase()));
- }
-
- public static List<String> parseKeyColumnsAsList(String cols) {
- return Lists.newArrayList(Splitter.on(",").trimResults().split(cols.toLowerCase()));
- }
-
- /**
- * Converts a given Impala catalog type to the Kudu type. Throws an exception if the
- * type cannot be converted.
- */
- public static Type fromImpalaType(com.cloudera.impala.catalog.Type t)
- throws ImpalaRuntimeException {
- if (!t.isScalarType()) {
- throw new ImpalaRuntimeException(format(
- "Non-scalar type %s is not supported in Kudu", t.toSql()));
- }
- ScalarType s = (ScalarType) t;
- switch (s.getPrimitiveType()) {
- case TINYINT: return Type.INT8;
- case SMALLINT: return Type.INT16;
- case INT: return Type.INT32;
- case BIGINT: return Type.INT64;
- case BOOLEAN: return Type.BOOL;
- case CHAR: return Type.STRING;
- case STRING: return Type.STRING;
- case VARCHAR: return Type.STRING;
- case DOUBLE: return Type.DOUBLE;
- case FLOAT: return Type.FLOAT;
- /* Fall through below */
- case INVALID_TYPE:
- case NULL_TYPE:
- case TIMESTAMP:
- case BINARY:
- case DATE:
- case DATETIME:
- case DECIMAL:
- default:
- throw new ImpalaRuntimeException(format(
- "Type %s is not supported in Kudu", s.toSql()));
- }
- }
-
- /**
- * Returns the string value of the RANGE literal.
- */
- static String toString(TRangeLiteral l) throws ImpalaRuntimeException {
- if (l.isSetBool_literal()) return String.valueOf(l.bool_literal);
- if (l.isSetString_literal()) return String.valueOf(l.string_literal);
- if (l.isSetInt_literal()) return String.valueOf(l.int_literal);
- throw new ImpalaRuntimeException("Unsupported type for RANGE literal.");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/ListMap.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/ListMap.java b/fe/src/main/java/com/cloudera/impala/util/ListMap.java
deleted file mode 100644
index 989a510..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/ListMap.java
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Lists;
-
-/**
- * Implementation of a bi-directional map between an index of type
- * Integer and an object of type T. The indices are allocated on
- * demand when a reverse lookup occurs for an object not already in
- * the map.
- *
- * The forward mapping is implemented as a List<> so that it can be
- * directly used as a Thrift structure.
- */
-public class ListMap<T> {
- // Maps from Integer to T.
- private ArrayList<T> list_ = Lists.newArrayList();
- // Maps from T to Integer.
- private final Map<T, Integer> map_ = Maps.newHashMap();
-
- public ArrayList<T> getList() { return list_; }
- public int size() { return list_.size(); }
-
- /**
- * Map from Integer index to T object.
- */
- public T getEntry(int index) { return list_.get(index); }
-
- /**
- * Map from T t to Integer index. If the mapping from t doesn't
- * exist, then create a new mapping from t to a unique index.
- */
- public int getIndex(T t) {
- Integer index = map_.get(t);
- if (index == null) {
- // No match was found, add a new entry.
- list_.add(t);
- index = list_.size() - 1;
- map_.put(t, index);
- }
- return index;
- }
-
- /**
- * Populate the bi-map from the given list. Does not perform a copy
- * of the list.
- */
- public void populate(ArrayList<T> list) {
- Preconditions.checkState(list_.isEmpty() && map_.isEmpty());
- list_ = list;
- for (int i = 0; i < list_.size(); ++i) {
- map_.put(list_.get(i), i);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java
deleted file mode 100644
index ac85ff8..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.util;
-
-import com.cloudera.impala.planner.NestedLoopJoinNode;
-import com.cloudera.impala.planner.HashJoinNode;
-import com.cloudera.impala.planner.PlanNode;
-import com.cloudera.impala.planner.ScanNode;
-
-/**
- * Returns the maximum number of rows processed by any node in a given plan tree
- */
-public class MaxRowsProcessedVisitor implements Visitor<PlanNode> {
-
- private boolean abort_ = false;
- private long result_ = -1l;
-
- @Override
- public void visit(PlanNode caller) {
- if (abort_) return;
-
- if (caller instanceof ScanNode) {
- long tmp = caller.getInputCardinality();
- ScanNode scan = (ScanNode) caller;
- boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats();
- // In the absence of collection stats, treat scans on collections as if they
- // have no limit.
- if (scan.isAccessingCollectionType() || (missingStats && !scan.hasLimit())) {
- abort_ = true;
- return;
- }
- result_ = Math.max(result_, tmp);
- } else if (caller instanceof HashJoinNode || caller instanceof NestedLoopJoinNode) {
- // Revisit when multiple scan nodes can be executed in a single fragment, IMPALA-561
- abort_ = true;
- return;
- } else {
- long in = caller.getInputCardinality();
- long out = caller.getCardinality();
- if ((in == -1) || (out == -1)) {
- abort_ = true;
- return;
- }
- result_ = Math.max(result_, Math.max(in, out));
- }
- }
-
- public long get() {
- return abort_ ? -1 : result_;
- }
-}