You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/09/30 02:14:29 UTC

[12/61] [partial] incubator-impala git commit: IMPALA-3786: Replace "cloudera" with "apache" (part 1)
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index 60b0c7a..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,204 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import static org.apache.avro.Schema.Type.BOOLEAN;
-import static org.apache.avro.Schema.Type.DOUBLE;
-import static org.apache.avro.Schema.Type.FLOAT;
-import static org.apache.avro.Schema.Type.INT;
-import static org.apache.avro.Schema.Type.LONG;
-import static org.apache.avro.Schema.Type.STRING;
-import java.util.Collections;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaParseException;
-import org.codehaus.jackson.JsonNode;
-import com.cloudera.impala.analysis.ColumnDef;
-import com.cloudera.impala.analysis.TypeDef;
-import com.cloudera.impala.catalog.ArrayType;
-import com.cloudera.impala.catalog.MapType;
-import com.cloudera.impala.catalog.ScalarType;
-import com.cloudera.impala.catalog.StructField;
-import com.cloudera.impala.catalog.StructType;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.common.AnalysisException;
- * Utility class used to parse Avro schema. Checks that the schema is valid
- * and performs mapping of Avro types to Impala types.
- * Note: This code is loosely based off the parsing code in the Hive AvroSerDe.
- */
-public class AvroSchemaParser {
-  // Map of Avro to Impala primitive types.
-  private static final Map<Schema.Type, Type> avroToImpalaPrimitiveTypeMap_;
-  static {
-    Map<Schema.Type, Type> typeMap = new Hashtable<Schema.Type, Type>();
-    typeMap.put(STRING, Type.STRING);
-    typeMap.put(INT, Type.INT);
-    typeMap.put(BOOLEAN, Type.BOOLEAN);
-    typeMap.put(LONG, Type.BIGINT);
-    typeMap.put(FLOAT, Type.FLOAT);
-    typeMap.put(DOUBLE, Type.DOUBLE);
-    avroToImpalaPrimitiveTypeMap_ = Collections.unmodifiableMap(typeMap);
-  }
-  /**
-   * Parses the Avro schema string literal, mapping the Avro types to Impala types.
-   * Returns a list of ColumnDef objects with their name and type info set.
-   * Throws an AnalysisException if the Avro type maps to a type that Impala
-   * does not yet support.
-   * Throws a SchemaParseException if the Avro schema was invalid.
-   */
-  public static List<ColumnDef> parse(String schemaStr)
-      throws SchemaParseException, AnalysisException {
-    Schema.Parser avroSchemaParser = new Schema.Parser();
-    Schema schema = avroSchemaParser.parse(schemaStr);
-    if (!schema.getType().equals(Schema.Type.RECORD)) {
-      throw new UnsupportedOperationException("Schema for table must be of type " +
-          "RECORD. Received type: " + schema.getType());
-    }
-    List<ColumnDef> colDefs = Lists.newArrayListWithCapacity(schema.getFields().size());
-    for (Schema.Field field: schema.getFields()) {
-      ColumnDef colDef = new ColumnDef(,
-          new TypeDef(getTypeInfo(field.schema(),, field.doc());
-      colDef.analyze();
-      colDefs.add(colDef);
-    }
-    return colDefs;
-  }
-  /**
-   * Parses the given Avro schema and returns the matching Impala type
-   * for this field. Handles primitive and complex types.
-   */
-  private static Type getTypeInfo(Schema schema, String colName)
-      throws AnalysisException {
-    // Avro requires NULLable types to be defined as unions of some type T
-    // and NULL.  This is annoying and we're going to hide it from the user.
-    if (isNullableType(schema)) {
-      return getTypeInfo(getColumnType(schema), colName);
-    }
-    Schema.Type type = schema.getType();
-    if (avroToImpalaPrimitiveTypeMap_.containsKey(type)) {
-      return avroToImpalaPrimitiveTypeMap_.get(type);
-    }
-    switch(type) {
-      case ARRAY:
-        Type itemType = getTypeInfo(schema.getElementType(), colName);
-        return new ArrayType(itemType);
-      case MAP:
-        Type valueType = getTypeInfo(schema.getValueType(), colName);
-        return new MapType(Type.STRING, valueType);
-      case RECORD:
-        StructType structType = new StructType();
-        for (Schema.Field field: schema.getFields()) {
-          Type fieldType = getTypeInfo(field.schema(), colName);
-          structType.addField(new StructField(, fieldType, field.doc()));
-        }
-        return structType;
-      case BYTES:
-        // Decimal is stored in Avro as a BYTE.
-        Type decimalType = getDecimalType(schema);
-        if (decimalType != null) return decimalType;
-      // TODO: Add support for stored Avro UNIONs by exposing them as STRUCTs in Impala.
-      case UNION:
-      case ENUM:
-      case FIXED:
-      case NULL:
-      default: {
-        throw new AnalysisException(String.format(
-            "Unsupported type '%s' of column '%s'", type.getName(), colName));
-      }
-    }
-  }
-  /**
-   * Returns true if this is a nullable type (a Union[T, Null]), false otherwise.
-   */
-  private static boolean isNullableType(Schema schema) {
-    // [null, null] not allowed, so this check is ok.
-    return schema.getType().equals(Schema.Type.UNION) && schema.getTypes().size() == 2 &&
-        (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) ||
-         schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
-  }
-  /**
-   * If a nullable type, get the schema for the non-nullable type which will
-   * provide Impala column type information.
-   */
-  private static Schema getColumnType(Schema schema) {
-    List<Schema> types = schema.getTypes();
-    return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
-  }
-  /**
-   * Attempts to parse decimal type information from the Avro schema, returning
-   * a decimal ColumnType if successful or null if this schema does not map
-   * to a decimal type.
-   * Decimal is defined in Avro as a BYTE type with the logicalType property
-   * set to "decimal" and a specified scale/precision.
-   * Throws a SchemaParseException if the logicType=decimal, but scale/precision
-   * is not specified or in the incorrect format.
-   */
-  private static Type getDecimalType(Schema schema) {
-    Preconditions.checkState(schema.getType() == Schema.Type.BYTES);
-    String logicalType = schema.getProp("logicalType");
-    if (logicalType != null && logicalType.equalsIgnoreCase("decimal")) {
-      // Parse the scale/precision of the decimal type.
-      Integer scale = getDecimalProp(schema, "scale");
-      // The Avro spec states that scale should default to zero if not set.
-      if (scale == null) scale = 0;
-      // Precision is a required property according to the Avro spec.
-      Integer precision = getDecimalProp(schema, "precision");
-      if (precision == null) {
-        throw new SchemaParseException(
-            "No 'precision' property specified for 'decimal' logicalType");
-      }
-      return ScalarType.createDecimalType(precision, scale);
-    }
-    return null;
-  }
-  /**
-   * Parses a decimal property and returns the value as an integer, or null
-   * if the property isn't set. Used to parse decimal scale/precision.
-   * Throws a SchemaParseException if the property doesn't parse to a
-   * natural number.
-   */
-  private static Integer getDecimalProp(Schema schema, String propName)
-      throws SchemaParseException {
-    JsonNode node = schema.getJsonProp(propName);
-    if (node == null) return null;
-    int propValue = node.getValueAsInt(-1);
-    if (propValue < 0) {
-      throw new SchemaParseException(String.format("Invalid decimal '%s' " +
-          "property value: %s", propName, node.getValueAsText()));
-    }
-    return propValue;
-  }
\ No newline at end of file
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index f86c347..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,189 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import com.cloudera.impala.analysis.ColumnDef;
-import com.cloudera.impala.catalog.PrimitiveType;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.FileSystemUtil;
- * Contains utility functions for dealing with Avro schemas.
- */
-public class AvroSchemaUtils {
-  /**
-   * Gets an Avro table's JSON schema from the list of given table property search
-   * locations. The schema may be specified as a string literal or provided as a
-   * Hadoop FileSystem or http URL that points to the schema. Apart from ensuring
-   * that the JSON schema is not SCHEMA_NONE, this function does not perform any
-   * additional validation on the returned string (e.g., it may not be a valid
-   * schema). Returns the Avro schema or null if none was specified in the search
-   * locations. Throws an AnalysisException if a schema was specified, but could not
-   * be retrieved, e.g., because of an invalid URL.
-   */
-  public static String getAvroSchema(List<Map<String, String>> schemaSearchLocations)
-      throws AnalysisException {
-    String url = null;
-    // Search all locations and break out on the first valid schema found.
-    for (Map<String, String> schemaLocation: schemaSearchLocations) {
-      if (schemaLocation == null) continue;
-      String literal =
-          schemaLocation.get(
-              AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
-      if (literal != null && !literal.equals(AvroSerdeUtils.SCHEMA_NONE)) return literal;
-      url = schemaLocation.get(
-          AvroSerdeUtils.AvroTableProperties.SCHEMA_URL.getPropName());
-      if (url != null && !url.equals(AvroSerdeUtils.SCHEMA_NONE)) {
-        url = url.trim();
-        break;
-      }
-    }
-    if (url == null) return null;
-    String schema = null;
-    InputStream urlStream = null;
-    try {
-      // TODO: Add support for https:// here.
-      if (url.toLowerCase().startsWith("http://")) {
-        urlStream = new URL(url).openStream();
-        schema = IOUtils.toString(urlStream);
-      } else {
-        Path path = new Path(url);
-        FileSystem fs = null;
-        fs = path.getFileSystem(FileSystemUtil.getConfiguration());
-        if (!fs.exists(path)) {
-          throw new AnalysisException(String.format(
-              "Invalid avro.schema.url: %s. Path does not exist.", url));
-        }
-        schema = FileSystemUtil.readFile(path);
-      }
-    } catch (AnalysisException e) {
-      throw e;
-    } catch (IOException e) {
-      throw new AnalysisException(String.format(
-          "Failed to read Avro schema at: %s. %s ", url, e.getMessage()));
-    } catch (Exception e) {
-      throw new AnalysisException(String.format(
-          "Invalid avro.schema.url: %s. %s", url, e.getMessage()));
-    } finally {
-      if (urlStream != null) IOUtils.closeQuietly(urlStream);
-    }
-    return schema;
-  }
-  /**
-   * Reconciles differences in names/types between the given list of column definitions
-   * and the column definitions corresponding to an Avro Schema. Populates 'warning'
-   * if there are inconsistencies between the column definitions and the Avro schema,
-   * Returns the reconciled column definitions according to the following conflict
-   * resolution policy:
-   *
-   * Mismatched number of columns -> Prefer Avro columns.
-   * Always prefer Avro schema except for column type CHAR/VARCHAR/STRING:
-   *   A CHAR/VARCHAR/STRING column definition maps to an Avro STRING. The reconciled
-   *   column will preserve the type in the column definition but use the column name
-   *   and comment from the Avro schema.
-   */
-  public static List<ColumnDef> reconcileSchemas(
-      List<ColumnDef> colDefs, List<ColumnDef> avroCols, StringBuilder warning) {
-    if (colDefs.size() != avroCols.size()) {
-      warning.append(String.format(
-          "Ignoring column definitions in favor of Avro schema.\n" +
-          "The Avro schema has %s column(s) but %s column definition(s) were given.",
-           avroCols.size(), colDefs.size()));
-      return avroCols;
-    }
-    List<ColumnDef> result = Lists.newArrayListWithCapacity(colDefs.size());
-    for (int i = 0; i < avroCols.size(); ++i) {
-      ColumnDef colDef = colDefs.get(i);
-      ColumnDef avroCol = avroCols.get(i);
-      Preconditions.checkNotNull(colDef.getType());
-      Preconditions.checkNotNull(avroCol.getType());
-      // A CHAR/VARCHAR/STRING column definition maps to an Avro STRING, and is preserved
-      // as a CHAR/VARCHAR/STRING in the reconciled schema. Column name and comment
-      // are taken from the Avro schema.
-      if ((colDef.getType().isStringType() && avroCol.getType().isStringType())) {
-        Preconditions.checkState(
-            avroCol.getType().getPrimitiveType() == PrimitiveType.STRING);
-        ColumnDef reconciledColDef = new ColumnDef(
-            avroCol.getColName(), colDef.getTypeDef(), avroCol.getComment());
-        try {
-          reconciledColDef.analyze();
-        } catch (AnalysisException e) {
-          Preconditions.checkNotNull(
-              null, "reconciledColDef.analyze() should never throw.");
-        }
-        result.add(reconciledColDef);
-      } else {
-        result.add(avroCol);
-      }
-      // Populate warning string if there are name and/or type inconsistencies.
-      if (!colDef.getColName().equals(avroCol.getColName()) ||
-          !colDef.getType().equals(avroCol.getType())) {
-        if (warning.length() == 0) {
-          // Add warning preamble for the first mismatch.
-          warning.append("Resolved the following name and/or type inconsistencies " +
-              "between the column definitions and the Avro schema.\n");
-        }
-        warning.append(String.format("Column definition at position %s:  %s %s\n",
-            i, colDefs.get(i).getColName(), colDefs.get(i).getType().toSql()));
-        warning.append(String.format("Avro schema column at position %s: %s %s\n",
-            i, avroCols.get(i).getColName(), avroCols.get(i).getType().toSql()));
-        warning.append(String.format("Resolution at position %s: %s %s\n",
-            i, result.get(i).getColName(), result.get(i).getType().toSql()));
-      }
-    }
-    Preconditions.checkState(result.size() == avroCols.size());
-    Preconditions.checkState(result.size() == colDefs.size());
-    return result;
-  }
-  /**
-   * Sets the comment of each column definition to 'from deserializer' if not already
-   * set. The purpose of this function is to provide behavioral consistency with
-   * Hive ('deserializer' is not applicable to Impala) with respect to column comments
-   * set for Avro tables.
-   */
-  public static void setFromSerdeComment(List<ColumnDef> colDefs) {
-    for (ColumnDef colDef: colDefs) {
-      if (Strings.isNullOrEmpty(colDef.getComment())) {
-        colDef.setComment("from deserializer");
-      }
-    }
-  }
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index bce214e..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,142 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
- * Basic implementation of the disjoint-set data structure.
- * Stores a set of disjoint item sets and provides efficient implementations of mainly
- * two operations:
- * 1. Find the item set corresponding to a given member item (get() function)
- * 2. Compute the union of two item sets (union() function)
- */
-public class DisjointSet<T> {
-  // Maps from an item to its item set.
-  private final Map<T, Set<T>> itemSets_ = Maps.newHashMap();
-  private final Set<Set<T>> uniqueSets_ = Sets.newHashSet();
-  /**
-   * Returns the item set corresponding to the given item or null if it
-   * doesn't exist.
-   */
-  public Set<T> get(T item) { return itemSets_.get(item); }
-  public Set<Set<T>> getSets() { return uniqueSets_; }
-  /**
-   * Registers a new item set with a single item. Returns the new item set.
-   * Throws if such an item set already exists.
-   */
-  public Set<T> makeSet(T item) {
-    if (itemSets_.containsKey(item)) {
-      throw new IllegalStateException(
-          "Item set for item already exists: " + item.toString());
-    }
-    Set<T> s = Sets.newHashSet(item);
-    itemSets_.put(item, s);
-    uniqueSets_.add(s);
-    return s;
-  }
-  /**
-   * Merges the two item sets belonging to the members a and b. The merged set contains
-   * at least a and b even if a or b did not have an associated item set.
-   * Returns false if the item sets of a and b are non-empty and already identical,
-   * true otherwise.
-   */
-  public boolean union(T a, T b) {
-    Set<T> aItems = itemSets_.get(a);
-    Set<T> bItems = itemSets_.get(b);
-    // check if the sets are already identical
-    if (aItems != null && bItems != null && aItems == bItems) return false;
-    // union(x, x) is equivalent to makeSet(x)
-    if (a.equals(b) && aItems == null) {
-      makeSet(a);
-      return true;
-    }
-    // create sets for a or b if not present already
-    if (aItems == null) aItems = makeSet(a);
-    if (bItems == null) bItems = makeSet(b);
-    // will contain the union of aItems and bItems
-    Set<T> mergedItems = aItems;
-    // always the smaller of the two sets to be merged
-    Set<T> updateItems = bItems;
-    if (bItems.size() > aItems.size()) {
-      mergedItems = bItems;
-      updateItems = aItems;
-    }
-    for (T item: updateItems) {
-      mergedItems.add(item);
-      itemSets_.put(item, mergedItems);
-    }
-    uniqueSets_.remove(updateItems);
-    return true;
-  }
-  /**
-   * Merges all the item sets corresponding to the given items. Returns true if any item
-   * sets were merged or created, false otherwise (item sets are already identical).
-   */
-  public boolean bulkUnion(Collection<T> items) {
-    if (items.isEmpty()) return false;
-    Iterator<T> it = items.iterator();
-    T head =;
-    // bulkUnion(x) is equivalent to makeSet(x)
-    if (!it.hasNext()) {
-      if (get(head) != null) return false;
-      makeSet(head);
-      return true;
-    }
-    boolean result = false;
-    while(it.hasNext()) {
-      boolean changed = union(head,;
-      result = result || changed;
-    }
-    return result;
-  }
-  /**
-   * Checks the internal consistency of this data structure.
-   * Throws an IllegalStateException if an inconsistency is detected.
-   */
-  public void checkConsistency() {
-    Set<Set<T>> validatedSets = Sets.newHashSet();
-    for (Set<T> itemSet: itemSets_.values()) {
-      // Avoid checking the same item set multiple times.
-      if (validatedSets.contains(itemSet)) continue;
-      // Validate that all items in this set are properly mapped to
-      // the set itself.
-      for (T item: itemSet) {
-        if (itemSet != itemSets_.get(item)) {
-          throw new IllegalStateException("DisjointSet is in an inconsistent state.");
-        }
-      }
-      validatedSets.add(itemSet);
-    }
-  }
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index 6b12c2e..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import java.util.List;
-import com.cloudera.impala.thrift.TEventSequence;
- * Wrapper around TEventSequence so that we can mark events with a single method call.
- * Events are 'marked' as they happen (so in order, with no time-travel backwards).
- */
-public class EventSequence {
-  private final List<Long> timestamps_ = Lists.newArrayList();
-  private final List<String> labels_ = Lists.newArrayList();
-  private final long startTime_;
-  private final String name_;
-  public EventSequence(String name) {
-    name_ = name;
-    startTime_ = System.nanoTime();
-  }
-  /**
-   * Saves an event at the current time with the given label.
-   */
-  public void markEvent(String label) {
-    // Timestamps should be in ns resolution
-    timestamps_.add(System.nanoTime() - startTime_);
-    labels_.add(label);
-  }
-  public TEventSequence toThrift() {
-    TEventSequence ret = new TEventSequence();
-    ret.timestamps = timestamps_;
-    ret.labels = labels_;
- = name_;
-    return ret;
-  }
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index 88a456d..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,140 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * Service to watch a file for changes. A thread periodically checks the file
- * modification time and uses the provided {@link FileChangeListener} to notify
- * a consumer.
- */
-public class FileWatchService {
-  final static Logger LOG = LoggerFactory.getLogger(FileWatchService.class);
-  // Default time to wait between checking the file.
-  static final long DEFAULT_CHECK_INTERVAL_MS = 10 * 1000;
-  // Time between checking for changes. Mutable for unit tests.
-  private long checkIntervalMs_ = DEFAULT_CHECK_INTERVAL_MS;
-  // Future returned by scheduleAtFixedRate(), needed to stop the checking thread.
-  private ScheduledFuture<?> fileCheckFuture_;
-  private final AtomicBoolean running_;
-  private final FileChangeListener changeListener_; // Used to notify when changes occur.
-  private final File file_; // The file to check for changes.
-  private boolean alreadyWarned_; // Avoid repeatedly warning if the file is missing
-  private long prevChange_; // Time of the last observed change
-  /**
-   * Listener used to notify of file changes.
-   */
-  public interface FileChangeListener {
-    /**
-     * Called when the file changes.
-     */
-    void onFileChange();
-  }
-  public FileWatchService(File file, FileChangeListener listener) {
-    Preconditions.checkNotNull(file);
-    Preconditions.checkNotNull(listener);
-    Preconditions.checkArgument(file.exists());
-    running_ = new AtomicBoolean(false);
-    file_ = file;
-    changeListener_ = listener;
-    prevChange_ = 0L;
-    alreadyWarned_ = false;
-  }
-  /**
-   * Set the time (in milliseconds) to wait between checking the file for changes.
-   * Only used in tests.
-   */
-  @VisibleForTesting
-  public void setCheckIntervalMs(long checkIntervalMs) {
-    checkIntervalMs_ = checkIntervalMs;
-  }
-  /**
-   * Checks if the file has changed since the last observed change and if so,
-   * notifies the listener.
-   */
-  private void checkFile() {
-    if (file_.exists()) {
-      long lastChange = file_.lastModified();
-      if (lastChange > prevChange_) {
-        changeListener_.onFileChange();
-        prevChange_ = lastChange;
-        alreadyWarned_ = false;
-      }
-    } else {
-      if (!alreadyWarned_) {
-        LOG.warn("File does not exist: {}", file_.getPath());
-        alreadyWarned_ = true;
-      }
-    }
-  }
-  /**
-   * Starts the thread to check for file changes. Continues checking for file changes
-   * every 'checkIntervalMs_' milliseconds until stop() is called.
-   */
-  public synchronized void start() {
-    Preconditions.checkState(!running_.get());
-    running_.set(true);
-    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder()
-        .setDaemon(true)
-        .setNameFormat("FileWatchThread(" + file_.getPath() + ")-%d")
-        .build());
-    fileCheckFuture_ = executor.scheduleAtFixedRate(new Runnable() {
-      public void run() {
-        try {
-          checkFile();
-        } catch (SecurityException e) {
-          LOG.warn("Not allowed to check read file existence: " + file_.getPath(), e);
-        }
-      }
-    }, 0L, checkIntervalMs_, TimeUnit.MILLISECONDS);
-  }
-  /**
-   * Stops the file watching thread.
-   */
-  public synchronized void stop() {
-    Preconditions.checkState(running_.get());
-    running_.set(false);
-    fileCheckFuture_.cancel(false);
-  }
\ No newline at end of file
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index 7523cc8..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,301 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Map;
-import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclEntryType;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.AclEntryScope;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.protocol.AclException;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
- * Singleton class that can check whether the current user has permission to access paths
- * in a FileSystem.
- */
-public class FsPermissionChecker {
-  private final static Logger LOG = LoggerFactory.getLogger(FsPermissionChecker.class);
-  private final static FsPermissionChecker instance_;
-  private final static Configuration CONF;
-  protected final String user_;
-  private final Set<String> groups_ = new HashSet<String>();
-  private final String supergroup_;
-  static {
-    CONF = new Configuration();
-    try {
-      instance_ = new FsPermissionChecker();
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "Error initializing FsPermissionChecker: " + e.getMessage(), e);
-    }
-  }
-  private FsPermissionChecker() throws IOException {
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    groups_.addAll(Arrays.asList(ugi.getGroupNames()));
-    user_ = ugi.getShortUserName();
-  }
-  private boolean isSuperUser() { return groups_.contains(supergroup_); }
-  private static List<AclEntryType> ACL_TYPE_PRIORITY =
-      ImmutableList.of(AclEntryType.USER, AclEntryType.GROUP, AclEntryType.OTHER);
-  /**
-   * Allows checking different access permissions of a file without repeatedly accessing
-   * the underlying filesystem by caching the results of a status call at construction.
-   */
-  public class Permissions {
-    private final FileStatus fileStatus_;
-    private final FsPermission permissions_;
-    private final AclStatus aclStatus_;
-    private Map<AclEntryType, List<AclEntry>> entriesByTypes_ = Maps.newHashMap();
-    private AclEntry mask_;
-    /**
-     * If aclStatus is null, ACL permissions are not checked.
-     */
-    protected Permissions(FileStatus fileStatus, AclStatus aclStatus) {
-      Preconditions.checkNotNull(fileStatus);
-      fileStatus_ = fileStatus;
-      permissions_ = fileStatus.getPermission();
-      aclStatus_ = aclStatus;
-      if (aclStatus_ == null) return;
-      // Group the ACLs by type, so that we can apply them in correct priority order. Not
-      // clear from documentation whether aclStatus_.getEntries() guarantees this
-      // ordering, so this is defensive.
-      for (AclEntryType t: ACL_TYPE_PRIORITY) {
-        entriesByTypes_.put(t, Lists.<AclEntry>newArrayList());
-      }
-      List<AclEntry> fullAclList =
-          getAclFromPermAndEntries(permissions_, aclStatus_.getEntries());
-      for (AclEntry e: fullAclList) {
-        if (e.getType() == AclEntryType.MASK && e.getScope() != AclEntryScope.DEFAULT) {
-          mask_ = e;
-        } else if (isApplicableAcl(e)) {
-          entriesByTypes_.get(e.getType()).add(e);
-        }
-      }
-    }
-    /**
-     * Returns true if the mask should apply. The mask ACL applies only to unnamed user
-     * ACLs (e.g. user::r-x), and all group ACLs.
-     */
-    private boolean shouldApplyMask(AclEntry acl) {
-      if (mask_ == null) return false;
-      switch (acl.getType()) {
-        case USER:
-          return acl.getName() != null;
-        case GROUP:
-          return true;
-      }
-      return false;
-    }
-    /**
-     * Returns true if this ACL applies to the current user and / or group
-     */
-    private boolean isApplicableAcl(AclEntry e) {
-      // Default ACLs are not used for permission checking, but instead control the
-      // permissions received by child directories
-      if (e.getScope() == AclEntryScope.DEFAULT) return false;
-      switch (e.getType()) {
-        case USER:
-          String aclUser = e.getName() == null ? aclStatus_.getOwner() : e.getName();
-          return FsPermissionChecker.this.user_.equals(aclUser);
-        case GROUP:
-          String aclGroup = e.getName() == null ? aclStatus_.getGroup() : e.getName();
-          return FsPermissionChecker.this.groups_.contains(aclGroup);
-        case OTHER:
-          return true;
-        case MASK:
-          return false;
-        default:
-          LOG.warn("Unknown Acl type: " + e.getType());
-          return false;
-      }
-    }
-    /**
-     * Returns true if ACLs allow 'action', false if they explicitly disallow 'action',
-     * and 'null' if no ACLs are available.
-     * See for more details about
-     * acl access check algorithm.
-     */
-    private Boolean checkAcls(FsAction action) {
-      // ACLs may not be enabled, so we need this ternary logic. If no ACLs are available,
-      // returning null causes us to fall back to standard ugo permissions.
-      if (aclStatus_ == null) return null;
-      // Remember if there is an applicable ACL entry, including owner user, named user,
-      // owning group, named group.
-      boolean foundMatch = false;
-      for (AclEntryType t: ACL_TYPE_PRIORITY) {
-        for (AclEntry e: entriesByTypes_.get(t)) {
-          if (t == AclEntryType.OTHER) {
-            // Processed all ACL entries except the OTHER entry.
-            // If found applicable ACL entries but none of them contain requested
-            // permission, deny access. Otherwise check OTHER entry.
-            return foundMatch ? false : e.getPermission().implies(action);
-          }
-          // If there is an applicable mask, 'action' is allowed iff both the mask and
-          // the underlying ACL permit it.
-          if (e.getPermission().implies(action)) {
-            if (shouldApplyMask(e)) {
-              if (mask_.getPermission().implies(action)) return true;
-            } else {
-              return true;
-            }
-          }
-          // User ACL entry has priority, no need to continue check.
-          if (t == AclEntryType.USER) return false;
-          foundMatch = true;
-        }
-      }
-      return false;
-    }
-    /**
-     * Returns true if the current user can perform the given action given these
-     * permissions.
-     */
-    public boolean checkPermissions(FsAction action) {
-      if (FsPermissionChecker.this.isSuperUser()) return true;
-      Boolean aclPerms = checkAcls(action);
-      if (aclPerms != null) return aclPerms;
-      // Check user, group and then 'other' permissions in turn.
-      if (FsPermissionChecker.this.user_.equals(fileStatus_.getOwner())) {
-        // If the user matches, we must return their access rights whether or not the user
-        // is allowed to access without checking the group. This is counter-intuitive if
-        // the user cannot access the file, but the group permissions would allow it, but
-        // is consistent with UNIX behaviour.
-        return permissions_.getUserAction().implies(action);
-      }
-      if (FsPermissionChecker.this.groups_.contains(fileStatus_.getGroup())) {
-        return permissions_.getGroupAction().implies(action);
-      }
-      return permissions_.getOtherAction().implies(action);
-    }
-    public boolean canRead() { return checkPermissions(FsAction.READ); }
-    public boolean canWrite() { return checkPermissions(FsAction.WRITE); }
-    public boolean canReadAndWrite() { return canRead() && canWrite(); }
-    // This was originally lifted from Hadoop. Won't need it if HDFS-7177 is resolved.
-    // getAclStatus() returns just extended ACL entries, the default file permissions
-    // like "user::,group::,other::" are not included. We need to combine them together
-    // to get full logic ACL list.
-    private List<AclEntry> getAclFromPermAndEntries(FsPermission perm,
-        List<AclEntry> entries) {
-      // File permission always have 3 items.
-      List<AclEntry> aclEntries = Lists.newArrayListWithCapacity(entries.size() + 3);
-      // Owner entry implied by owner permission bits.
-      aclEntries.add(new AclEntry.Builder()
-          .setScope(AclEntryScope.ACCESS)
-          .setType(AclEntryType.USER)
-          .setPermission(perm.getUserAction())
-          .build());
-      // All extended access ACL entries add by "-setfacl" other than default file
-      // permission.
-      boolean hasAccessAcl = false;
-      for (AclEntry entry: entries) {
-        // AclEntry list should be ordered, all ACCESS one are in first half, DEFAULT one
-        // are in second half, so no need to continue here.
-        if (entry.getScope() == AclEntryScope.DEFAULT) break;
-        hasAccessAcl = true;
-        aclEntries.add(entry);
-      }
-      // Mask entry implied by group permission bits, or group entry if there is
-      // no access ACL (only default ACL).
-      aclEntries.add(new AclEntry.Builder()
-          .setScope(AclEntryScope.ACCESS)
-          .setType(hasAccessAcl ? AclEntryType.MASK : AclEntryType.GROUP)
-          .setPermission(perm.getGroupAction())
-          .build());
-      // Other entry implied by other bits.
-      aclEntries.add(new AclEntry.Builder()
-          .setScope(AclEntryScope.ACCESS)
-          .setType(AclEntryType.OTHER)
-          .setPermission(perm.getOtherAction())
-          .build());
-      return aclEntries;
-    }
-  }
-  /**
-   * Returns a Permissions object that can answer all access permission queries for the
-   * given path.
-   */
-  public Permissions getPermissions(FileSystem fs, Path path) throws IOException {
-    Preconditions.checkNotNull(fs);
-    Preconditions.checkNotNull(path);
-    AclStatus aclStatus = null;
-    try {
-      aclStatus = fs.getAclStatus(path);
-    } catch (AclException ex) {
-      LOG.trace("No ACLs retrieved, skipping ACLs check (HDFS will enforce ACLs)", ex);
-    } catch (UnsupportedOperationException ex) {
-      LOG.trace("No ACLs retrieved, unsupported", ex);
-    }
-    return new Permissions(fs.getFileStatus(path), aclStatus);
-  }
-  /**
-   * Returns the FsPermissionChecker singleton.
-   */
-  public static FsPermissionChecker getInstance() { return instance_; }
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index a5e1eb1..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,129 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import java.util.Properties;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.spi.LoggingEvent;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.thrift.TLogLevel;
- * log4j appender which calls into C++ code to log messages at their correct severities
- * via glog.
- */
-public class GlogAppender extends AppenderSkeleton {
-  // GLOG takes care of formatting, so we don't require a layout
-  public boolean requiresLayout() { return false; }
-  // Required implementation by superclass.
-  public void ActivateOptions() { }
-  // Required implementation by superclass
-  public void close() { }
-  private TLogLevel levelToSeverity(Level level) {
-    Preconditions.checkState(!level.equals(Level.OFF));
-    // TODO: Level does not work well in a HashMap or switch statement due to some
-    // strangeness with equality testing.
-    if (level.equals(Level.TRACE)) return TLogLevel.VLOG_3;
-    if (level.equals(Level.ALL)) return TLogLevel.VLOG_3;
-    if (level.equals(Level.DEBUG)) return TLogLevel.VLOG;
-    if (level.equals(Level.ERROR)) return TLogLevel.ERROR;
-    if (level.equals(Level.FATAL)) return TLogLevel.FATAL;
-    if (level.equals(Level.INFO)) return TLogLevel.INFO;
-    if (level.equals(Level.WARN)) return TLogLevel.WARN;
-    throw new IllegalStateException("Unknown log level: " + level.toString());
-  }
-  @Override
-  public void append(LoggingEvent event) {
-    Level level = event.getLevel();
-    if (level.equals(Level.OFF)) return;
-    String msg = event.getRenderedMessage();
-    if (event.getThrowableInformation() != null) {
-      msg = msg + "\nJava exception follows:\n" +
-          Joiner.on("\n").join(event.getThrowableStrRep());
-    }
-    int lineNumber = Integer.parseInt(event.getLocationInformation().getLineNumber());
-    String fileName = event.getLocationInformation().getFileName();
-    NativeLogger.LogToGlog(
-        levelToSeverity(level).getValue(), msg, fileName, lineNumber);
-  }
-  /**
-   * Returns a log4j level string corresponding to the Glog log level
-   */
-  private static String log4jLevelForTLogLevel(TLogLevel logLevel)
-      throws InternalException {
-    switch (logLevel) {
-      case INFO: return "INFO";
-      case WARN: return "WARN";
-      case ERROR: return "ERROR";
-      case FATAL: return "FATAL";
-      case VLOG:
-      case VLOG_2: return "DEBUG";
-      case VLOG_3: return "TRACE";
-      default: throw new InternalException("Unknown log level:" + logLevel);
-    }
-  }
-  /**
-   * Manually override Log4j root logger configuration. Any values in
-   * not overridden (that is, anything but the root logger and its default level) will
-   * continue to have effect.
-   *  - impalaLogLevel - the maximum log level for com.cloudera.impala.* classes
-   *  - otherLogLevel - the maximum log level for all other classes
-   */
-  public static void Install(TLogLevel impalaLogLevel, TLogLevel otherLogLevel)
-      throws InternalException {
-    Properties properties = new Properties();
-    properties.setProperty("log4j.appender.glog", GlogAppender.class.getName());
-    // These settings are relatively subtle. log4j provides many ways to filter log
-    // messages, and configuring them in the right order is a bit of black magic.
-    //
-    // The 'Threshold' property supercedes everything, so must be set to its most
-    // permissive and applies to any message sent to the glog appender.
-    //
-    // The 'rootLogger' property controls the default maximum logging level (where more
-    // verbose->larger logging level) for the entire space of classes. This will apply to
-    // all non-Impala classes, so is set to otherLogLevel.
-    //
-    // Finally we can configure per-package logging which overrides the rootLogger
-    // setting. In order to control Impala's logging independently of the rest of the
-    // world, we set the log level for com.cloudera.impala.
-    properties.setProperty("log4j.rootLogger",
-        log4jLevelForTLogLevel(otherLogLevel) + ",glog");
-    properties.setProperty("log4j.appender.glog.Threshold", "TRACE");
-    properties.setProperty("",
-        log4jLevelForTLogLevel(impalaLogLevel));
-    PropertyConfigurator.configure(properties);
-    Logger.getLogger(GlogAppender.class).info(String.format("Logging initialized. " +
-        "Impala: %s, All other: %s", impalaLogLevel, otherLogLevel));
-  }
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index a3a1fa0..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,515 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import java.util.Map;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.log4j.Logger;
-import com.cloudera.impala.analysis.TableName;
-import com.cloudera.impala.catalog.HdfsPartition;
-import com.cloudera.impala.common.FileSystemUtil;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.thrift.JniCatalogConstants;
-import com.cloudera.impala.thrift.THdfsCachingOp;
- * Utility class for submitting and dropping HDFS cache requests.
- */
-public class HdfsCachingUtil {
-  private static final Logger LOG = Logger.getLogger(HdfsCachingUtil.class);
-  // The key name used to save cache directive IDs in table/partition properties.
-  public final static String CACHE_DIR_ID_PROP_NAME = "cache_directive_id";
-  // The key name used to store the replication factor for cached files
-  public final static String CACHE_DIR_REPLICATION_PROP_NAME = "cache_replication";
-  // The number of caching refresh intervals that can go by when waiting for data to
-  // become cached before assuming no more progress is being made.
-  private final static int MAX_UNCHANGED_CACHING_REFRESH_INTERVALS = 5;
-  private static DistributedFileSystem dfs = null;
-  /**
-   * Returns the dfs singleton object.
-   */
-  private static DistributedFileSystem getDfs() throws ImpalaRuntimeException {
-    if (dfs == null) {
-      try {
-        dfs = FileSystemUtil.getDistributedFileSystem();
-      } catch (IOException e) {
-        throw new ImpalaRuntimeException("HdfsCachingUtil failed to initialize the " +
-            "DistributedFileSystem: ", e);
-      }
-    }
-    return dfs;
-  }
-  /**
-   * Caches the location of the given Hive Metastore Table and updates the
-   * table's properties with the submitted cache directive ID. The caller is
-   * responsible for not caching the same table twice, as HDFS will create a second
-   * cache directive even if it is similar to an already existing one.
-   *
-   * Returns the ID of the submitted cache directive and throws if there is an error
-   * submitting.
-   */
-  public static long submitCacheTblDirective(
-      org.apache.hadoop.hive.metastore.api.Table table,
-      String poolName, short replication) throws ImpalaRuntimeException {
-    long id = HdfsCachingUtil.submitDirective(new Path(table.getSd().getLocation()),
-        poolName, replication);
-    table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
-    table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
-    return id;
-  }
-  /**
-   * Caches the location of the given partition and updates the
-   * partitions's properties with the submitted cache directive ID. The caller is
-   * responsible for not caching the same partition twice, as HDFS will create a second
-   * cache directive even if it is similar to an already existing one.
-   *
-   * Returns the ID of the submitted cache directive and throws if there is an error
-   * submitting the directive.
-   */
-  public static long submitCachePartitionDirective(HdfsPartition part,
-      String poolName, short replication) throws ImpalaRuntimeException {
-    long id = HdfsCachingUtil.submitDirective(new Path(part.getLocation()),
-        poolName, replication);
-    part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
-    part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
-    return id;
-  }
-  /**
-   * Convenience method for working directly on a metastore partition. See
-   * submitCachePartitionDirective(HdfsPartition, String, short) for more details.
-   */
-  public static long submitCachePartitionDirective(
-      org.apache.hadoop.hive.metastore.api.Partition part,
-      String poolName, short replication) throws ImpalaRuntimeException {
-    long id = HdfsCachingUtil.submitDirective(new Path(part.getSd().getLocation()),
-        poolName, replication);
-    part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
-    part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
-    return id;
-  }
-  /**
-   * Removes the cache directive associated with the table from HDFS, uncaching all
-   * data. Also updates the table's metadata. No-op if the table is not cached.
-   */
-  public static void uncacheTbl(org.apache.hadoop.hive.metastore.api.Table table)
-      throws ImpalaRuntimeException {
-    Preconditions.checkNotNull(table);
-    LOG.debug("Uncaching table: " + table.getDbName() + "." + table.getTableName());
-    Long id = getCacheDirectiveId(table.getParameters());
-    if (id == null) return;
-    HdfsCachingUtil.removeDirective(id);
-    table.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
-    table.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
-  }
-  /**
-   * Removes the cache directive associated with the partition from HDFS, uncaching all
-   * data. Also updates the partition's metadata to remove the cache directive ID.
-   * No-op if the table is not cached.
-   */
-  public static void uncachePartition(HdfsPartition part) throws ImpalaException {
-    Preconditions.checkNotNull(part);
-    Long id = getCacheDirectiveId(part.getParameters());
-    if (id == null) return;
-    HdfsCachingUtil.removeDirective(id);
-    part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
-    part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
-  }
-  /**
-   * Convenience method for working directly on a metastore partition. See
-   * uncachePartition(HdfsPartition) for more details.
-   */
-  public static void uncachePartition(
-    org.apache.hadoop.hive.metastore.api.Partition part) throws ImpalaException {
-    Preconditions.checkNotNull(part);
-    Long id = getCacheDirectiveId(part.getParameters());
-    if (id == null) return;
-    HdfsCachingUtil.removeDirective(id);
-    part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
-    part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
-  }
-  /**
-   * Returns the cache directive ID from the given table/partition parameter
-   * map. Returns null if the CACHE_DIR_ID_PROP_NAME key was not set or if
-   * there was an error parsing the associated ID.
-   */
-  public static Long getCacheDirectiveId(Map<String, String> params) {
-    if (params == null) return null;
-    String idStr = params.get(CACHE_DIR_ID_PROP_NAME);
-    if (idStr == null) return null;
-    try {
-      return Long.parseLong(idStr);
-    } catch (NumberFormatException e) {
-      return null;
-    }
-  }
-  /**
-   * Given a cache directive ID, returns the pool the directive is cached in.
-   * Returns null if no outstanding cache directive match this ID.
-   */
-  public static String getCachePool(long directiveId)
-      throws ImpalaRuntimeException {
-    CacheDirectiveEntry entry = getDirective(directiveId);
-    return entry == null ? null : entry.getInfo().getPool();
-  }
-  /**
-   * Given a cache directive ID, returns the replication factor for the directive.
-   * Returns null if no outstanding cache directives match this ID.
-   */
-  public static Short getCacheReplication(long directiveId)
-      throws ImpalaRuntimeException {
-    CacheDirectiveEntry entry = getDirective(directiveId);
-    return entry != null ? entry.getInfo().getReplication() : null;
-  }
-  /**
-   * Returns the cache replication value from the parameters map. We assume that only
-   * cached table parameters are used and the property is always present.
-   */
-  public static Short getCachedCacheReplication(Map<String, String> params) {
-    Preconditions.checkNotNull(params);
-    String replication = params.get(CACHE_DIR_REPLICATION_PROP_NAME);
-    if (replication == null) {
-      return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
-    }
-    try {
-      return Short.parseShort(replication);
-    } catch (NumberFormatException e) {
-      return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
-    }
-  }
-  /**
-   * Waits on a cache directive to either complete or stop making progress. Progress is
-   * checked by polling the HDFS caching stats every
-   * "currentBytesCached" is increasing compared to "bytesNeeded".
-   * If "currentBytesCached" == "bytesNeeded" or if no progress is made for a
-   */
-  public static void waitForDirective(long directiveId)
-      throws ImpalaRuntimeException  {
-    long bytesNeeded = 0L;
-    long currentBytesCached = 0L;
-    CacheDirectiveEntry cacheDir = getDirective(directiveId);
-    if (cacheDir == null) return;
-    bytesNeeded = cacheDir.getStats().getBytesNeeded();
-    currentBytesCached = cacheDir.getStats().getBytesCached();
-    LOG.debug(String.format("Waiting on cache directive id: %d. Bytes " +
-        "cached (%d) / needed (%d)", directiveId, currentBytesCached, bytesNeeded));
-    // All the bytes are cached, just return.
-    if (bytesNeeded == currentBytesCached) return;
-    // The refresh interval is how often HDFS will update cache directive stats. We use
-    // this value to determine how frequently we should poll for changes.
-    long hdfsRefreshIntervalMs = getDfs().getConf().getLong(
-    Preconditions.checkState(hdfsRefreshIntervalMs > 0);
-    // Loop until either MAX_UNCHANGED_CACHING_REFRESH_INTERVALS have passed with no
-    // changes or all required data is cached.
-    int unchangedCounter = 0;
-    while (unchangedCounter < MAX_UNCHANGED_CACHING_REFRESH_INTERVALS) {
-      long previousBytesCached = currentBytesCached;
-      cacheDir = getDirective(directiveId);
-      if (cacheDir == null) return;
-      currentBytesCached = cacheDir.getStats().getBytesCached();
-      bytesNeeded = cacheDir.getStats().getBytesNeeded();
-      if (currentBytesCached == bytesNeeded) {
-        LOG.debug(String.format("Cache directive id: %d has completed." +
-            "Bytes cached (%d) / needed (%d)", directiveId, currentBytesCached,
-            bytesNeeded));
-        return;
-      }
-      if (currentBytesCached == previousBytesCached) {
-        ++unchangedCounter;
-      } else {
-        unchangedCounter = 0;
-      }
-      try {
-        // Sleep for the refresh interval + a little bit more to ensure a full interval
-        // has completed. A value of 25% the refresh interval was arbitrarily chosen.
-        Thread.sleep((long) (hdfsRefreshIntervalMs * 1.25));
-      } catch (InterruptedException e) { /* ignore */ }
-    }
-    LOG.warn(String.format("No changes in cached bytes in: %d(ms). All data may not " +
-        "be cached. Final stats for cache directive id: %d. Bytes cached (%d)/needed " +
-        "(%d)", hdfsRefreshIntervalMs * MAX_UNCHANGED_CACHING_REFRESH_INTERVALS,
-        directiveId, currentBytesCached, bytesNeeded));
-  }
-  /**
-   * Submits a new caching directive for the specified cache pool name, path and
-   * replication. Returns the directive ID if the submission was successful or an
-   * ImpalaRuntimeException if the submission fails.
-   */
-  private static long submitDirective(Path path, String poolName, short replication)
-      throws ImpalaRuntimeException {
-    Preconditions.checkNotNull(path);
-    Preconditions.checkState(poolName != null && !poolName.isEmpty());
-    CacheDirectiveInfo info = new CacheDirectiveInfo.Builder()
-        .setExpiration(Expiration.NEVER)
-        .setPool(poolName)
-        .setReplication(replication)
-        .setPath(path).build();
-    LOG.debug("Submitting cache directive: " + info.toString());
-    try {
-      return getDfs().addCacheDirective(info);
-    } catch (IOException e) {
-      throw new ImpalaRuntimeException(e.getMessage(), e);
-    }
-  }
-  /**
-   * Update cache directive for a table and updates the metastore parameters.
-   * Returns the cache directive ID
-   */
-  public static long modifyCacheDirective(Long id,
-      org.apache.hadoop.hive.metastore.api.Table table,
-      String poolName, short replication) throws ImpalaRuntimeException {
-    Preconditions.checkNotNull(id);
-    HdfsCachingUtil.modifyCacheDirective(id, new Path(table.getSd().getLocation()),
-        poolName, replication);
-    table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
-    table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
-    return id;
-  }
-  /**
-   * Update cache directive for a partition and update the metastore parameters.
-   * Returns the cache directive ID
-   */
-  public static long modifyCacheDirective(Long id, HdfsPartition part, String poolName,
-      short replication) throws ImpalaRuntimeException {
-    Preconditions.checkNotNull(id);
-    HdfsCachingUtil.modifyCacheDirective(id, new Path(part.getLocation()),
-        poolName, replication);
-    part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id));
-    part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication));
-    return id;
-  }
-  /**
-   * Update an existing cache directive to avoid having the same entry multiple
-   * times
-   */
-  private static void modifyCacheDirective(Long id, Path path, String poolName,
-      short replication) throws ImpalaRuntimeException {
-    Preconditions.checkNotNull(path);
-    Preconditions.checkNotNull(id);
-    Preconditions.checkState(poolName != null && !poolName.isEmpty());
-    CacheDirectiveInfo info = new CacheDirectiveInfo.Builder()
-        .setId(id)
-        .setExpiration(Expiration.NEVER)
-        .setPool(poolName)
-        .setReplication(replication)
-        .setPath(path).build();
-    LOG.debug("Modifying cache directive: " + info.toString());
-    try {
-      getDfs().modifyCacheDirective(info);
-    } catch (IOException e) {
-      throw new ImpalaRuntimeException(e.getMessage(), e);
-    }
-  }
-  /**
-   * Removes the given cache directive if it exists, uncaching the data. If the
-   * cache request does not exist in HDFS no error is returned.
-   * Throws an ImpalaRuntimeException if there was any problem removing the
-   * directive.
-   */
-  private static void removeDirective(long directiveId) throws ImpalaRuntimeException {
-    LOG.debug("Removing cache directive id: " + directiveId);
-    try {
-      getDfs().removeCacheDirective(directiveId);
-    } catch (IOException e) {
-      // There is no special exception type for the case where a directive ID does not
-      // exist so we must inspect the error message.
-      if (e.getMessage().contains("No directive with ID")) return;
-      throw new ImpalaRuntimeException(e.getMessage(), e);
-    }
-  }
-  /**
-   * Gets the cache directive matching the given ID. Returns null if no matching
-   * directives were found.
-   */
-  private static CacheDirectiveEntry getDirective(long directiveId)
-      throws ImpalaRuntimeException {
-    LOG.trace("Getting cache directive id: " + directiveId);
-    CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder()
-        .setId(directiveId)
-        .build();
-    try {
-      RemoteIterator<CacheDirectiveEntry> itr = getDfs().listCacheDirectives(filter);
-      if (itr.hasNext()) return;
-    } catch (IOException e) {
-      // Handle connection issues with e.g. HDFS and possible not found errors
-      throw new ImpalaRuntimeException(e.getMessage(), e);
-    }
-    throw new ImpalaRuntimeException(
-        "HDFS cache directive filter returned empty result. This must not happen");
-  }
-  /**
-   * Check if the poolName matches the pool of the cache directive
-   * identified by directiveId
-   */
-  public static boolean isSamePool(String poolName, Long directiveId)
-      throws ImpalaRuntimeException {
-    return poolName.equals(getCachePool(directiveId));
-  }
-  /**
-   * Helper method for frequent lookup of replication factor in the thrift caching
-   * structure.
-   */
-  public static short getReplicationOrDefault(THdfsCachingOp op) {
-    return op.isSetReplication() ? op.getReplication() :
-  }
-  /**
-   * Returns a boolean indicating if the given thrift caching operation would perform an
-   * update on an already existing cache directive.
-   */
-  public static boolean isUpdateOp(THdfsCachingOp op, Map<String, String> params)
-      throws ImpalaRuntimeException {
-    Long directiveId = Long.parseLong(params.get(CACHE_DIR_ID_PROP_NAME));
-    CacheDirectiveEntry entry = getDirective(directiveId);
-    Preconditions.checkNotNull(entry);
-    // Verify cache pool
-    if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
-      return false;
-    }
-    // Check cache replication factor
-    if ((op.isSetReplication() && op.getReplication() !=
-        entry.getInfo().getReplication()) || ( !op.isSetReplication() &&
-        entry.getInfo().getReplication() !=
-      return true;
-    }
-    return false;
-  }
-  /**
-   * Validates the properties of the chosen cache pool. Throws on error.
-   */
-  public static void validateCachePool(THdfsCachingOp op, Long directiveId,
-      TableName table, HdfsPartition partition)
-      throws ImpalaRuntimeException {
-    CacheDirectiveEntry entry = getDirective(directiveId);
-    Preconditions.checkNotNull(entry);
-    if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) {
-      throw new ImpalaRuntimeException(String.format("Cannot cache partition in " +
-          "pool '%s' because it is already cached in '%s'. To change the cache " +
-          "pool for this partition, first uncache using: ALTER TABLE %s.%s " +
-          "%sSET UNCACHED", op.getCache_pool_name(),
-          entry.getInfo().getPool(), table.getDb(), table,
-          // Insert partition string if partition non null
-          partition != null ? String.format(" PARTITION(%s) ",
-          partition.getPartitionName().replaceAll("/", ", ")) : ""));
-    }
-  }
-  /**
-   * Validates the properties of the chosen cache pool. Throws on error.
-   */
-  public static void validateCachePool(THdfsCachingOp op, Long directiveId,
-      TableName table) throws ImpalaRuntimeException {
-    validateCachePool(op, directiveId, table, null);
-  }
-  /**
-   * Validates and returns true if a parameter map contains a cache directive ID and
-   * validates it against the NameNode to make sure it exists. If the cache
-   * directive ID does not exist, we remove the value from the parameter map,
-   * issue a log message and return false. As the value is not written back to the
-   * Hive MS from this method, the result will be only valid until the next metadata
-   * fetch. Lastly, we update the cache replication factor in the parameters with the
-   * value read from HDFS.
-   */
-  public static boolean validateCacheParams(Map<String, String> params) {
-    Long directiveId = getCacheDirectiveId(params);
-    if (directiveId == null) return false;
-    CacheDirectiveEntry entry = null;
-    try {
-      entry = getDirective(directiveId);
-    } catch (ImpalaRuntimeException e) {
-      if (e.getCause() != null && e.getCause() instanceof RemoteException) {
-        // This exception signals that the cache directive no longer exists.
-        LOG.error("Cache directive does not exist", e);
-        params.remove(CACHE_DIR_ID_PROP_NAME);
-        params.remove(CACHE_DIR_REPLICATION_PROP_NAME);
-      } else {
-        // This exception signals that there was a connection problem with HDFS.
-        LOG.error("IO Exception, possible connectivity issues with HDFS", e);
-      }
-      return false;
-    }
-    Preconditions.checkNotNull(entry);
-    // On the upgrade path the property might not exist, if it exists
-    // and is different from the one from the meta store, issue a warning.
-    String replicationFactor = params.get(CACHE_DIR_REPLICATION_PROP_NAME);
-    if (replicationFactor != null &&
-        Short.parseShort(replicationFactor) != entry.getInfo().getReplication()) {
-"Replication factor for entry in HDFS differs from value in Hive MS: " +
-          entry.getInfo().getPath().toString() + " " +
-          entry.getInfo().getReplication().toString() + " != " +
-          params.get(CACHE_DIR_REPLICATION_PROP_NAME));
-    }
-        String.valueOf(entry.getInfo().getReplication()));
-    return true;
-  }
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index 4f627d8..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,268 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import javax.json.Json;
-import javax.json.JsonArray;
-import javax.json.JsonReader;
-import com.cloudera.impala.catalog.ScalarType;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.thrift.TDistributeByRangeParam;
-import com.cloudera.impala.thrift.TRangeLiteral;
-import com.cloudera.impala.thrift.TRangeLiteralList;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.PartialRow;
-import static com.cloudera.impala.catalog.Type.parseColumnType;
-import static java.lang.String.format;
-public class KuduUtil {
-  private static final String SPLIT_KEYS_ERROR_MESSAGE = "Error parsing splits keys.";
-  /**
-   * Compare the schema of a HMS table and a Kudu table. Returns true if both tables have
-   * a matching schema.
-   */
-  public static boolean compareSchema(Table msTable, KuduTable kuduTable)
-      throws ImpalaRuntimeException {
-    List<FieldSchema> msFields = msTable.getSd().getCols();
-    List<ColumnSchema> kuduFields = kuduTable.getSchema().getColumns();
-    if (msFields.size() != kuduFields.size()) return false;
-    HashMap<String, ColumnSchema> kuduFieldMap = Maps.newHashMap();
-    for (ColumnSchema kuduField : kuduFields) {
-      kuduFieldMap.put(kuduField.getName().toUpperCase(), kuduField);
-    }
-    for (FieldSchema msField : msFields) {
-      ColumnSchema kuduField = kuduFieldMap.get(msField.getName().toUpperCase());
-      if (kuduField == null
-          || fromImpalaType(parseColumnType(msField.getType())) != kuduField.getType()) {
-        return false;
-      }
-    }
-    return true;
-  }
-  /**
-   * Parses split keys from statements.
-   *
-   * Split keys are expected to be in json, as an array of arrays, in the form:
-   * '[[value1_col1, value1_col2, ...], [value2_col1, value2_col2, ...], ...]'
-   *
-   * Each inner array corresponds to a split key and should have one matching entry for
-   * each key column specified in 'schema'.
-   */
-  public static List<PartialRow> parseSplits(Schema schema, String kuduSplits)
-      throws ImpalaRuntimeException {
-    // If there are no splits return early.
-    if (kuduSplits == null || kuduSplits.isEmpty()) return ImmutableList.of();
-    ImmutableList.Builder<PartialRow> splitRows = ImmutableList.builder();
-    // ...Otherwise parse the splits. We're expecting splits in the format of a list of
-    // lists of keys. We only support specifying splits for int and string keys
-    // (currently those are the only type of keys allowed in Kudu too).
-    try {
-      JsonReader jr = Json.createReader(new StringReader(kuduSplits));
-      JsonArray keysList = jr.readArray();
-      for (int i = 0; i < keysList.size(); i++) {
-        PartialRow splitRow = new PartialRow(schema);
-        JsonArray compoundKey = keysList.getJsonArray(i);
-        if (compoundKey.size() != schema.getPrimaryKeyColumnCount()) {
-          throw new ImpalaRuntimeException(SPLIT_KEYS_ERROR_MESSAGE +
-              " Wrong number of keys.");
-        }
-        for (int j = 0; j < compoundKey.size(); j++) {
-          setKey(schema.getColumnByIndex(j).getType(), compoundKey, j, splitRow);
-        }
-        splitRows.add(splitRow);
-      }
-    } catch (ImpalaRuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new ImpalaRuntimeException(SPLIT_KEYS_ERROR_MESSAGE + " Problem parsing json"
-          + ": " + e.getMessage(), e);
-    }
-    return;
-  }
-  /**
-   * Given the TDistributeByRangeParam from the CREATE statement, creates the
-   * appropriate split rows.
-   */
-  public static List<PartialRow> parseSplits(Schema schema,
-      TDistributeByRangeParam param) throws ImpalaRuntimeException {
-    ImmutableList.Builder<PartialRow> splitRows = ImmutableList.builder();
-    for (TRangeLiteralList literals : param.getSplit_rows()) {
-      PartialRow splitRow = new PartialRow(schema);
-      List<TRangeLiteral> literalValues = literals.getValues();
-      for (int i = 0; i < literalValues.size(); ++i) {
-        String colName = param.getColumns().get(i);
-        ColumnSchema col = schema.getColumn(colName);
-        setKey(col.getType(), literalValues.get(i), schema.getColumnIndex(colName),
-            colName, splitRow);
-      }
-      splitRows.add(splitRow);
-    }
-    return;
-  }
-  /**
-   * Sets the value in 'key' at 'pos', given the json representation.
-   */
-  private static void setKey(Type type, JsonArray array, int pos, PartialRow key)
-      throws ImpalaRuntimeException {
-    switch (type) {
-      case BOOL: key.addBoolean(pos, array.getBoolean(pos)); break;
-      case INT8: key.addByte(pos, (byte) array.getInt(pos)); break;
-      case INT16: key.addShort(pos, (short) array.getInt(pos)); break;
-      case INT32: key.addInt(pos, array.getInt(pos)); break;
-      case INT64: key.addLong(pos, array.getJsonNumber(pos).longValue()); break;
-      case STRING: key.addString(pos, array.getString(pos)); break;
-      default:
-        throw new ImpalaRuntimeException("Key columns not supported for type: "
-            + type.toString());
-    }
-  }
-  /**
-   * Sets the value in 'key' at 'pos', given the range literal.
-   */
-  private static void setKey(Type type, TRangeLiteral literal, int pos, String colName,
-      PartialRow key) throws ImpalaRuntimeException {
-    switch (type) {
-      case BOOL:
-        checkCorrectType(literal.isSetBool_literal(), type, colName, literal);
-        key.addBoolean(pos, literal.isBool_literal());
-        break;
-      case INT8:
-        checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
-        key.addByte(pos, (byte) literal.getInt_literal());
-        break;
-      case INT16:
-        checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
-        key.addShort(pos, (short) literal.getInt_literal());
-        break;
-      case INT32:
-        checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
-        key.addInt(pos, (int) literal.getInt_literal());
-        break;
-      case INT64:
-        checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
-        key.addLong(pos, literal.getInt_literal());
-        break;
-      case STRING:
-        checkCorrectType(literal.isSetString_literal(), type, colName, literal);
-        key.addString(pos, literal.getString_literal());
-        break;
-      default:
-        throw new ImpalaRuntimeException("Key columns not supported for type: "
-            + type.toString());
-    }
-  }
-  /**
-   * If correctType is true, returns. Otherwise throws a formatted error message
-   * indicating problems with the type of the literal of the range literal.
-   */
-  private static void checkCorrectType(boolean correctType, Type t, String colName,
-      TRangeLiteral literal) throws ImpalaRuntimeException {
-    if (correctType) return;
-    throw new ImpalaRuntimeException(
-        format("Expected %s literal for column '%s' got '%s'", t.getName(), colName,
-            toString(literal)));
-  }
-  /**
-   * Parses a string of the form "a, b, c" and returns a set of values split by ',' and
-   * stripped of the whitespace.
-   */
-  public static HashSet<String> parseKeyColumns(String cols) {
-    return Sets.newHashSet(Splitter.on(",").trimResults().split(cols.toLowerCase()));
-  }
-  public static List<String> parseKeyColumnsAsList(String cols) {
-    return Lists.newArrayList(Splitter.on(",").trimResults().split(cols.toLowerCase()));
-  }
-  /**
-   * Converts a given Impala catalog type to the Kudu type. Throws an exception if the
-   * type cannot be converted.
-   */
-  public static Type fromImpalaType(com.cloudera.impala.catalog.Type t)
-      throws ImpalaRuntimeException {
-    if (!t.isScalarType()) {
-      throw new ImpalaRuntimeException(format(
-          "Non-scalar type %s is not supported in Kudu", t.toSql()));
-    }
-    ScalarType s = (ScalarType) t;
-    switch (s.getPrimitiveType()) {
-      case TINYINT: return Type.INT8;
-      case SMALLINT: return Type.INT16;
-      case INT: return Type.INT32;
-      case BIGINT: return Type.INT64;
-      case BOOLEAN: return Type.BOOL;
-      case CHAR: return Type.STRING;
-      case STRING: return Type.STRING;
-      case VARCHAR: return Type.STRING;
-      case DOUBLE: return Type.DOUBLE;
-      case FLOAT: return Type.FLOAT;
-        /* Fall through below */
-      case INVALID_TYPE:
-      case NULL_TYPE:
-      case TIMESTAMP:
-      case BINARY:
-      case DATE:
-      case DATETIME:
-      case DECIMAL:
-      default:
-        throw new ImpalaRuntimeException(format(
-            "Type %s is not supported in Kudu", s.toSql()));
-    }
-  }
-  /**
-   * Returns the string value of the RANGE literal.
-   */
-  static String toString(TRangeLiteral l) throws ImpalaRuntimeException {
-    if (l.isSetBool_literal()) return String.valueOf(l.bool_literal);
-    if (l.isSetString_literal()) return String.valueOf(l.string_literal);
-    if (l.isSetInt_literal()) return String.valueOf(l.int_literal);
-    throw new ImpalaRuntimeException("Unsupported type for RANGE literal.");
-  }
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index 989a510..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
- * Implementation of a bi-directional map between an index of type
- * Integer and an object of type T.  The indices are allocated on
- * demand when a reverse lookup occurs for an object not already in
- * the map.
- *
- * The forward mapping is implemented as a List<> so that it can be
- * directly used as a Thrift structure.
- */
-public class ListMap<T> {
-  // Maps from Integer to T.
-  private ArrayList<T> list_ = Lists.newArrayList();
-  // Maps from T to Integer.
-  private final Map<T, Integer> map_ = Maps.newHashMap();
-  public ArrayList<T> getList() { return list_; }
-  public int size() { return list_.size(); }
-  /**
-   * Map from Integer index to T object.
-   */
-  public T getEntry(int index) { return list_.get(index); }
-  /**
-   * Map from T t to Integer index. If the mapping from t doesn't
-   * exist, then create a new mapping from t to a unique index.
-   */
-  public int getIndex(T t) {
-    Integer index = map_.get(t);
-    if (index == null) {
-      // No match was found, add a new entry.
-      list_.add(t);
-      index = list_.size() - 1;
-      map_.put(t, index);
-    }
-    return index;
-  }
-  /**
-   * Populate the bi-map from the given list.  Does not perform a copy
-   * of the list.
-   */
-  public void populate(ArrayList<T> list) {
-    Preconditions.checkState(list_.isEmpty() && map_.isEmpty());
-    list_ = list;
-    for (int i = 0; i < list_.size(); ++i) {
-      map_.put(list_.get(i), i);
-    }
-  }
diff --git a/fe/src/main/java/com/cloudera/impala/util/ b/fe/src/main/java/com/cloudera/impala/util/
deleted file mode 100644
index ac85ff8..0000000
--- a/fe/src/main/java/com/cloudera/impala/util/
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloudera.impala.util;
-import com.cloudera.impala.planner.NestedLoopJoinNode;
-import com.cloudera.impala.planner.HashJoinNode;
-import com.cloudera.impala.planner.PlanNode;
-import com.cloudera.impala.planner.ScanNode;
- * Returns the maximum number of rows processed by any node in a given plan tree
- */
-public class MaxRowsProcessedVisitor implements Visitor<PlanNode> {
-  private boolean abort_ = false;
-  private long result_ = -1l;
-  @Override
-  public void visit(PlanNode caller) {
-    if (abort_) return;
-    if (caller instanceof ScanNode) {
-      long tmp = caller.getInputCardinality();
-      ScanNode scan = (ScanNode) caller;
-      boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats();
-      // In the absence of collection stats, treat scans on collections as if they
-      // have no limit.
-      if (scan.isAccessingCollectionType() || (missingStats && !scan.hasLimit())) {
-        abort_ = true;
-        return;
-      }
-      result_ = Math.max(result_, tmp);
-    } else if (caller instanceof HashJoinNode || caller instanceof NestedLoopJoinNode) {
-      // Revisit when multiple scan nodes can be executed in a single fragment, IMPALA-561
-      abort_ = true;
-      return;
-    } else {
-      long in = caller.getInputCardinality();
-      long out = caller.getCardinality();
-      if ((in == -1) || (out == -1)) {
-        abort_ = true;
-        return;
-      }
-      result_ = Math.max(result_, Math.max(in, out));
-    }
-  }
-  public long get() {
-    return abort_ ? -1 : result_;
-  }