You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2018/08/07 18:10:04 UTC

[1/5] impala git commit: IMPALA-7140 (part 9): add support for SHOW FILES in LocalFsTable

Repository: impala
Updated Branches:
  refs/heads/master b8222003f -> bf06e38df


IMPALA-7140 (part 9): add support for SHOW FILES in LocalFsTable

This adds support for the SHOW FILES IN ... command for LocalFsTable
instances. Tested manually against functional.alltypesagg and also
covered by existing e2e tests.

Change-Id: I143bea9f72b6a25ae2545663e06f4849b58533ba
Reviewed-on: http://gerrit.cloudera.org:8080/10973
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bf06e38d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bf06e38d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bf06e38d

Branch: refs/heads/master
Commit: bf06e38df7101e7c504c58373f48c7537f381f2d
Parents: 4a049c6
Author: Todd Lipcon <to...@cloudera.com>
Authored: Tue Jul 17 12:30:30 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 7 17:38:04 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/FeFsTable.java    | 125 +++++++++++++++++--
 .../org/apache/impala/catalog/HdfsTable.java    | 101 +--------------
 .../impala/catalog/local/LocalCatalog.java      |   3 +-
 .../impala/catalog/local/LocalFsTable.java      |   8 --
 .../org/apache/impala/service/Frontend.java     |   2 +-
 5 files changed, 123 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bf06e38d/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 19bebfc..fbb5267 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -25,18 +25,26 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartitionKeyValue;
+import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.ListMap;
+import org.apache.impala.util.TResultRowBuilder;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * Frontend interface for interacting with a filesystem-backed table.
@@ -78,14 +86,6 @@ public interface FeFsTable extends FeTable {
    */
   public String getNullPartitionKeyValue();
 
-  /**
-   * Get file info for the given set of partitions, or all partitions if
-   * partitionSet is null.
-   *
-   * @return partition file info, ordered by partition
-   */
-  TResultSet getFiles(List<List<TPartitionKeyValue>> partitionSet)
-      throws CatalogException;
 
   /**
    * @return the base HDFS directory where files of this table are stored.
@@ -224,6 +224,45 @@ public interface FeFsTable extends FeTable {
     }
 
     /**
+     * Get file info for the given set of partitions, or all partitions if
+     * partitionSet is null.
+     *
+     * @return partition file info, ordered by partition
+     */
+    public static TResultSet getFiles(FeFsTable table,
+        List<List<TPartitionKeyValue>> partitionSet) {
+      TResultSet result = new TResultSet();
+      TResultSetMetadata resultSchema = new TResultSetMetadata();
+      result.setSchema(resultSchema);
+      resultSchema.addToColumns(new TColumn("Path", Type.STRING.toThrift()));
+      resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
+      resultSchema.addToColumns(new TColumn("Partition", Type.STRING.toThrift()));
+      result.setRows(Lists.<TResultRow>newArrayList());
+
+      List<? extends FeFsPartition> orderedPartitions;
+      if (partitionSet == null) {
+        orderedPartitions = Lists.newArrayList(FeCatalogUtils.loadAllPartitions(table));
+      } else {
+        // Get a list of HdfsPartition objects for the given partition set.
+        orderedPartitions = getPartitionsFromPartitionSet(table, partitionSet);
+      }
+      Collections.sort(orderedPartitions, HdfsPartition.KV_COMPARATOR);
+
+      for (FeFsPartition p: orderedPartitions) {
+        List<FileDescriptor> orderedFds = Lists.newArrayList(p.getFileDescriptors());
+        Collections.sort(orderedFds);
+        for (FileDescriptor fd: orderedFds) {
+          TResultRowBuilder rowBuilder = new TResultRowBuilder();
+          rowBuilder.add(p.getLocation() + "/" + fd.getFileName());
+          rowBuilder.add(PrintUtils.printBytes(fd.getFileLength()));
+          rowBuilder.add(p.getPartitionName());
+          result.addToRows(rowBuilder.get());
+        }
+      }
+      return result;
+    }
+
+    /**
      * Selects a random sample of files from the given list of partitions such that the
      * sum of file sizes is at least 'percentBytes' percent of the total number of bytes
      * in those partitions and at least 'minSampleBytes'. The sample is returned as a map
@@ -314,5 +353,75 @@ public interface FeFsTable extends FeTable {
       }
       return result;
     }
+
+    /**
+     * Get and load the specified partitions from the table.
+     */
+    public static List<? extends FeFsPartition> getPartitionsFromPartitionSet(
+        FeFsTable table, List<List<TPartitionKeyValue>> partitionSet) {
+      List<Long> partitionIds = Lists.newArrayList();
+      for (List<TPartitionKeyValue> kv : partitionSet) {
+        PrunablePartition partition = getPartitionFromThriftPartitionSpec(table, kv);
+        if (partition != null) partitionIds.add(partition.getId());
+      }
+      return table.loadPartitions(partitionIds);
+    }
+
+    /**
+     * Get the specified partition from the table, or null if no such partition
+     * exists.
+     */
+    public static PrunablePartition getPartitionFromThriftPartitionSpec(
+        FeFsTable table,
+        List<TPartitionKeyValue> partitionSpec) {
+      // First, build a list of the partition values to search for in the same order they
+      // are defined in the table.
+      List<String> targetValues = Lists.newArrayList();
+      Set<String> keys = Sets.newHashSet();
+      for (FieldSchema fs: table.getMetaStoreTable().getPartitionKeys()) {
+        for (TPartitionKeyValue kv: partitionSpec) {
+          if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
+            targetValues.add(kv.getValue());
+            // Same key was specified twice
+            if (!keys.add(kv.getName().toLowerCase())) {
+              return null;
+            }
+          }
+        }
+      }
+
+      // Make sure the number of values match up and that some values were found.
+      if (targetValues.size() == 0 ||
+          (targetValues.size() != table.getMetaStoreTable().getPartitionKeysSize())) {
+        return null;
+      }
+
+      // Search through all the partitions and check if their partition key values
+      // match the values being searched for.
+      for (PrunablePartition partition: table.getPartitions()) {
+        List<LiteralExpr> partitionValues = partition.getPartitionValues();
+        Preconditions.checkState(partitionValues.size() == targetValues.size());
+        boolean matchFound = true;
+        for (int i = 0; i < targetValues.size(); ++i) {
+          String value;
+          if (partitionValues.get(i) instanceof NullLiteral) {
+            value = table.getNullPartitionKeyValue();
+          } else {
+            value = partitionValues.get(i).getStringValue();
+            Preconditions.checkNotNull(value);
+            // See IMPALA-252: we deliberately map empty strings on to
+            // NULL when they're in partition columns. This is for
+            // backwards compatibility with Hive, and is clearly broken.
+            if (value.isEmpty()) value = table.getNullPartitionKeyValue();
+          }
+          if (!targetValues.get(i).equals(value)) {
+            matchFound = false;
+            break;
+          }
+        }
+        if (matchFound) return partition;
+      }
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/bf06e38d/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0b66f25..2b7967f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -59,7 +59,6 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
-import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.Reference;
 import org.apache.impala.compat.HdfsShim;
 import org.apache.impala.fb.FbFileBlock;
@@ -72,7 +71,6 @@ import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartitionKeyValue;
-import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.thrift.TTable;
@@ -672,7 +670,7 @@ public class HdfsTable extends Table implements FeFsTable {
           kv.getLiteralValue(), table.getNullPartitionKeyValue());
       partitionKeyValues.add(new TPartitionKeyValue(kv.getColName(), value));
     }
-    return getPartitionFromThriftPartitionSpec(table, partitionKeyValues);
+    return Utils.getPartitionFromThriftPartitionSpec(table, partitionKeyValues);
   }
 
   /**
@@ -681,73 +679,16 @@ public class HdfsTable extends Table implements FeFsTable {
    */
   public HdfsPartition getPartitionFromThriftPartitionSpec(
       List<TPartitionKeyValue> partitionSpec) {
-    return (HdfsPartition)getPartitionFromThriftPartitionSpec(this, partitionSpec);
-  }
-
-  public static PrunablePartition getPartitionFromThriftPartitionSpec(
-      FeFsTable table,
-      List<TPartitionKeyValue> partitionSpec) {
-      // First, build a list of the partition values to search for in the same order they
-    // are defined in the table.
-    List<String> targetValues = Lists.newArrayList();
-    Set<String> keys = Sets.newHashSet();
-    for (FieldSchema fs: table.getMetaStoreTable().getPartitionKeys()) {
-      for (TPartitionKeyValue kv: partitionSpec) {
-        if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
-          targetValues.add(kv.getValue());
-          // Same key was specified twice
-          if (!keys.add(kv.getName().toLowerCase())) {
-            return null;
-          }
-        }
-      }
-    }
-
-    // Make sure the number of values match up and that some values were found.
-    if (targetValues.size() == 0 ||
-        (targetValues.size() != table.getMetaStoreTable().getPartitionKeysSize())) {
-      return null;
-    }
-
-    // Search through all the partitions and check if their partition key values
-    // match the values being searched for.
-    for (PrunablePartition partition: table.getPartitions()) {
-      List<LiteralExpr> partitionValues = partition.getPartitionValues();
-      Preconditions.checkState(partitionValues.size() == targetValues.size());
-      boolean matchFound = true;
-      for (int i = 0; i < targetValues.size(); ++i) {
-        String value;
-        if (partitionValues.get(i) instanceof NullLiteral) {
-          value = table.getNullPartitionKeyValue();
-        } else {
-          value = partitionValues.get(i).getStringValue();
-          Preconditions.checkNotNull(value);
-          // See IMPALA-252: we deliberately map empty strings on to
-          // NULL when they're in partition columns. This is for
-          // backwards compatibility with Hive, and is clearly broken.
-          if (value.isEmpty()) value = table.getNullPartitionKeyValue();
-        }
-        if (!targetValues.get(i).equals(value)) {
-          matchFound = false;
-          break;
-        }
-      }
-      if (matchFound) return partition;
-    }
-    return null;
+    return (HdfsPartition)Utils.getPartitionFromThriftPartitionSpec(this, partitionSpec);
   }
 
   /**
    * Gets hdfs partitions by the given partition set.
    */
+  @SuppressWarnings("unchecked")
   public List<HdfsPartition> getPartitionsFromPartitionSet(
       List<List<TPartitionKeyValue>> partitionSet) {
-    List<HdfsPartition> partitions = Lists.newArrayList();
-    for (List<TPartitionKeyValue> kv : partitionSet) {
-      HdfsPartition partition = getPartitionFromThriftPartitionSpec(kv);
-      if (partition != null) partitions.add(partition);
-    }
-    return partitions;
+    return (List<HdfsPartition>)Utils.getPartitionsFromPartitionSet(this, partitionSet);
   }
 
   /**
@@ -2065,40 +2006,6 @@ public class HdfsTable extends Table implements FeFsTable {
     return result;
   }
 
-  @Override // FeFsTable
-  public TResultSet getFiles(List<List<TPartitionKeyValue>> partitionSet)
-      throws CatalogException {
-    TResultSet result = new TResultSet();
-    TResultSetMetadata resultSchema = new TResultSetMetadata();
-    result.setSchema(resultSchema);
-    resultSchema.addToColumns(new TColumn("Path", Type.STRING.toThrift()));
-    resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
-    resultSchema.addToColumns(new TColumn("Partition", Type.STRING.toThrift()));
-    result.setRows(Lists.<TResultRow>newArrayList());
-
-    List<HdfsPartition> orderedPartitions;
-    if (partitionSet == null) {
-      orderedPartitions = Lists.newArrayList(partitionMap_.values());
-    } else {
-      // Get a list of HdfsPartition objects for the given partition set.
-      orderedPartitions = getPartitionsFromPartitionSet(partitionSet);
-    }
-    Collections.sort(orderedPartitions, HdfsPartition.KV_COMPARATOR);
-
-    for (HdfsPartition p: orderedPartitions) {
-      List<FileDescriptor> orderedFds = Lists.newArrayList(p.getFileDescriptors());
-      Collections.sort(orderedFds);
-      for (FileDescriptor fd: orderedFds) {
-        TResultRowBuilder rowBuilder = new TResultRowBuilder();
-        rowBuilder.add(p.getLocation() + "/" + fd.getFileName());
-        rowBuilder.add(PrintUtils.printBytes(fd.getFileLength()));
-        rowBuilder.add(p.getPartitionName());
-        result.addToRows(rowBuilder.get());
-      }
-    }
-    return result;
-  }
-
   /**
    * Constructs a partition name from a list of TPartitionKeyValue objects.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/bf06e38d/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index c630b5c..681d852 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -40,7 +40,6 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.Function.CompareMode;
 import org.apache.impala.catalog.HdfsCachePool;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.PartitionNotFoundException;
 import org.apache.impala.catalog.PrunablePartition;
 import org.apache.impala.thrift.TCatalogObject;
@@ -166,7 +165,7 @@ public class LocalCatalog implements FeCatalog {
       throwPartitionNotFound(partitionSpec);
     }
     // Get the FeFsPartition object for the given partition spec.
-    PrunablePartition partition = HdfsTable.getPartitionFromThriftPartitionSpec(
+    PrunablePartition partition = FeFsTable.Utils.getPartitionFromThriftPartitionSpec(
         (FeFsTable)table, partitionSpec);
     if (partition == null) throwPartitionNotFound(partitionSpec);
     return FeCatalogUtils.loadPartition((FeFsTable)table, partition.getId());

http://git-wip-us.apache.org/repos/asf/impala/blob/bf06e38d/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 7f016b5..86dce6b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -49,7 +49,6 @@ import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TNetworkAddress;
-import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
@@ -217,13 +216,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public TResultSet getFiles(List<List<TPartitionKeyValue>> partitionSet)
-      throws CatalogException {
-    // TODO(todd): implement for SHOW FILES.
-    return null;
-  }
-
-  @Override
   public String getHdfsBaseDir() {
     // TODO(todd): this is redundant with getLocation, it seems.
     return getLocation();

http://git-wip-us.apache.org/repos/asf/impala/blob/bf06e38d/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 477d130..29d917e 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1225,7 +1225,7 @@ public class Frontend {
     FeTable table = getCatalog().getTable(request.getTable_name().getDb_name(),
         request.getTable_name().getTable_name());
     if (table instanceof FeFsTable) {
-      return ((FeFsTable) table).getFiles(request.getPartition_set());
+      return FeFsTable.Utils.getFiles((FeFsTable)table, request.getPartition_set());
     } else {
       throw new InternalException("SHOW FILES only supports Hdfs table. " +
           "Unsupported table class: " + table.getClass());


[4/5] impala git commit: IMPALA-7307 (part 2). Support TABLESAMPLE in LocalCatalog

Posted by to...@apache.org.
IMPALA-7307 (part 2). Support TABLESAMPLE in LocalCatalog

Tested with 'run-tests.py -k tablesample' and the tests passed.

Change-Id: I2f7baf05f16c6389ed900e0459708005ab44491e
Reviewed-on: http://gerrit.cloudera.org:8080/10972
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4a049c61
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4a049c61
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4a049c61

Branch: refs/heads/master
Commit: 4a049c61562e6fc5562e3cb41c78161ec32e74f0
Parents: aa26087
Author: Todd Lipcon <to...@cloudera.com>
Authored: Mon Jul 16 17:50:00 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 7 17:38:04 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/ComputeStatsStmt.java       |   2 +-
 .../org/apache/impala/catalog/FeFsTable.java    | 113 ++++++++++++++++---
 .../org/apache/impala/catalog/HdfsTable.java    |  82 --------------
 .../impala/catalog/local/LocalFsTable.java      |   9 --
 .../org/apache/impala/planner/HdfsScanNode.java |   3 +-
 5 files changed, 101 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 098c862..3836e4b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -616,7 +616,7 @@ public class ComputeStatsStmt extends StatementBase {
     // TODO(todd): can we avoid loading all the partitions for this?
     Collection<? extends FeFsPartition> partitions =
         FeCatalogUtils.loadAllPartitions(hdfsTable);
-    Map<Long, List<FileDescriptor>> sample = hdfsTable.getFilesSample(
+    Map<Long, List<FileDescriptor>> sample = FeFsTable.Utils.getFilesSample(hdfsTable,
         partitions, samplePerc, minSampleBytes, sampleSeed);
     long sampleFileBytes = 0;
     for (List<FileDescriptor> fds: sample.values()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index faaf5bd..19bebfc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -17,9 +17,11 @@
 package org.apache.impala.catalog;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -32,6 +34,10 @@ import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.ListMap;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 /**
  * Frontend interface for interacting with a filesystem-backed table.
  *
@@ -170,21 +176,6 @@ public interface FeFsTable extends FeTable {
   int parseSkipHeaderLineCount(StringBuilder error);
 
   /**
-   * Selects a random sample of files from the given list of partitions such that the sum
-   * of file sizes is at least 'percentBytes' percent of the total number of bytes in
-   * those partitions and at least 'minSampleBytes'. The sample is returned as a map from
-   * partition id to a list of file descriptors selected from that partition.
-   * This function allocates memory proportional to the number of files in 'inputParts'.
-   * Its implementation tries to minimize the constant factor and object generation.
-   * The given 'randomSeed' is used for random number generation.
-   * The 'percentBytes' parameter must be between 0 and 100.
-   */
-  Map<Long, List<FileDescriptor>> getFilesSample(
-      Collection<? extends FeFsPartition> inputParts,
-      long percentBytes, long minSampleBytes,
-      long randomSeed);
-
-  /**
    * @return the index of hosts that store replicas of blocks of this table.
    */
   ListMap<TNetworkAddress> getHostIndex();
@@ -231,5 +222,97 @@ public interface FeFsTable extends FeTable {
       double extrapolatedNumRows = fileBytes * rowsPerByte;
       return (long) Math.max(1, Math.round(extrapolatedNumRows));
     }
+
+    /**
+     * Selects a random sample of files from the given list of partitions such that the
+     * sum of file sizes is at least 'percentBytes' percent of the total number of bytes
+     * in those partitions and at least 'minSampleBytes'. The sample is returned as a map
+     * from partition id to a list of file descriptors selected from that partition.
+     *
+     * This function allocates memory proportional to the number of files in 'inputParts'.
+     * Its implementation tries to minimize the constant factor and object generation.
+     * The given 'randomSeed' is used for random number generation.
+     * The 'percentBytes' parameter must be between 0 and 100.
+     */
+    public static Map<Long, List<FileDescriptor>> getFilesSample(
+        FeFsTable table,
+        Collection<? extends FeFsPartition> inputParts,
+        long percentBytes, long minSampleBytes,
+        long randomSeed) {
+      Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
+      Preconditions.checkState(minSampleBytes >= 0);
+
+      long totalNumFiles = 0;
+      for (FeFsPartition part : inputParts) {
+        totalNumFiles += part.getNumFileDescriptors();
+      }
+
+      // Conservative max size for Java arrays. The actual maximum varies
+      // from JVM version and sometimes between configurations.
+      final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
+      if (totalNumFiles > JVM_MAX_ARRAY_SIZE) {
+        throw new IllegalStateException(String.format(
+            "Too many files to generate a table sample of table %s. " +
+            "Sample requested over %s files, but a maximum of %s files are supported.",
+            table.getTableName().toString(), totalNumFiles, JVM_MAX_ARRAY_SIZE));
+      }
+
+      // Ensure a consistent ordering of files for repeatable runs. The files within a
+      // partition are already ordered based on how they are loaded in the catalog.
+      List<FeFsPartition> orderedParts = Lists.newArrayList(inputParts);
+      Collections.sort(orderedParts, HdfsPartition.KV_COMPARATOR);
+
+      // fileIdxs contains indexes into the file descriptor lists of all inputParts
+      // parts[i] contains the partition corresponding to fileIdxs[i]
+      // fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
+      // The purpose of these arrays is to efficiently avoid selecting the same file
+      // multiple times during the sampling, regardless of the sample percent.
+      // We purposely avoid generating objects proportional to the number of files.
+      int[] fileIdxs = new int[(int)totalNumFiles];
+      FeFsPartition[] parts = new FeFsPartition[(int)totalNumFiles];
+      int idx = 0;
+      long totalBytes = 0;
+      for (FeFsPartition part: orderedParts) {
+        totalBytes += part.getSize();
+        int numFds = part.getNumFileDescriptors();
+        for (int fileIdx = 0; fileIdx < numFds; ++fileIdx) {
+          fileIdxs[idx] = fileIdx;
+          parts[idx] = part;
+          ++idx;
+        }
+      }
+      if (idx != totalNumFiles) {
+        throw new AssertionError("partition file counts changed during iteration");
+      }
+
+      int numFilesRemaining = idx;
+      double fracPercentBytes = (double) percentBytes / 100;
+      long targetBytes = (long) Math.round(totalBytes * fracPercentBytes);
+      targetBytes = Math.max(targetBytes, minSampleBytes);
+
+      // Randomly select files until targetBytes has been reached or all files have been
+      // selected.
+      Random rnd = new Random(randomSeed);
+      long selectedBytes = 0;
+      Map<Long, List<FileDescriptor>> result = Maps.newHashMap();
+      while (selectedBytes < targetBytes && numFilesRemaining > 0) {
+        int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
+        FeFsPartition part = parts[selectedIdx];
+        Long partId = Long.valueOf(part.getId());
+        List<FileDescriptor> sampleFileIdxs = result.get(partId);
+        if (sampleFileIdxs == null) {
+          sampleFileIdxs = Lists.newArrayList();
+          result.put(partId, sampleFileIdxs);
+        }
+        FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
+        sampleFileIdxs.add(fd);
+        selectedBytes += fd.getFileLength();
+        // Avoid selecting the same file multiple times.
+        fileIdxs[selectedIdx] = fileIdxs[numFilesRemaining - 1];
+        parts[selectedIdx] = parts[numFilesRemaining - 1];
+        --numFilesRemaining;
+      }
+      return result;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index b704a81..0b66f25 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
@@ -2131,87 +2130,6 @@ public class HdfsTable extends Table implements FeFsTable {
     addPartition(refreshedPartition);
   }
 
-  @Override // FeFsTable
-  public Map<Long, List<FileDescriptor>> getFilesSample(
-      Collection<? extends FeFsPartition> inputParts,
-      long percentBytes, long minSampleBytes,
-      long randomSeed) {
-    Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
-    Preconditions.checkState(minSampleBytes >= 0);
-
-    long totalNumFiles = 0;
-    for (FeFsPartition part : inputParts) {
-      totalNumFiles += part.getNumFileDescriptors();
-    }
-
-    // Conservative max size for Java arrays. The actual maximum varies
-    // from JVM version and sometimes between configurations.
-    final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
-    if (totalNumFiles > JVM_MAX_ARRAY_SIZE) {
-      throw new IllegalStateException(String.format(
-          "Too many files to generate a table sample of table %s. " +
-          "Sample requested over %s files, but a maximum of %s files are supported.",
-          getTableName().toString(), totalNumFiles, JVM_MAX_ARRAY_SIZE));
-    }
-
-    // Ensure a consistent ordering of files for repeatable runs. The files within a
-    // partition are already ordered based on how they are loaded in the catalog.
-    List<FeFsPartition> orderedParts = Lists.newArrayList(inputParts);
-    Collections.sort(orderedParts, HdfsPartition.KV_COMPARATOR);
-
-    // fileIdxs contains indexes into the file descriptor lists of all inputParts
-    // parts[i] contains the partition corresponding to fileIdxs[i]
-    // fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
-    // The purpose of these arrays is to efficiently avoid selecting the same file
-    // multiple times during the sampling, regardless of the sample percent. We purposely
-    // avoid generating objects proportional to the number of files.
-    int[] fileIdxs = new int[(int)totalNumFiles];
-    FeFsPartition[] parts = new FeFsPartition[(int)totalNumFiles];
-    int idx = 0;
-    long totalBytes = 0;
-    for (FeFsPartition part: orderedParts) {
-      totalBytes += part.getSize();
-      int numFds = part.getNumFileDescriptors();
-      for (int fileIdx = 0; fileIdx < numFds; ++fileIdx) {
-        fileIdxs[idx] = fileIdx;
-        parts[idx] = part;
-        ++idx;
-      }
-    }
-    if (idx != totalNumFiles) {
-      throw new AssertionError("partition file counts changed during iteration");
-    }
-
-    int numFilesRemaining = idx;
-    double fracPercentBytes = (double) percentBytes / 100;
-    long targetBytes = (long) Math.round(totalBytes * fracPercentBytes);
-    targetBytes = Math.max(targetBytes, minSampleBytes);
-
-    // Randomly select files until targetBytes has been reached or all files have been
-    // selected.
-    Random rnd = new Random(randomSeed);
-    long selectedBytes = 0;
-    Map<Long, List<FileDescriptor>> result = Maps.newHashMap();
-    while (selectedBytes < targetBytes && numFilesRemaining > 0) {
-      int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
-      FeFsPartition part = parts[selectedIdx];
-      Long partId = Long.valueOf(part.getId());
-      List<FileDescriptor> sampleFileIdxs = result.get(partId);
-      if (sampleFileIdxs == null) {
-        sampleFileIdxs = Lists.newArrayList();
-        result.put(partId, sampleFileIdxs);
-      }
-      FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
-      sampleFileIdxs.add(fd);
-      selectedBytes += fd.getFileLength();
-      // Avoid selecting the same file multiple times.
-      fileIdxs[selectedIdx] = fileIdxs[numFilesRemaining - 1];
-      parts[selectedIdx] = parts[numFilesRemaining - 1];
-      --numFilesRemaining;
-    }
-    return result;
-  }
-
   /**
    * Registers table metrics.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index a47497a..7f016b5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -43,7 +43,6 @@ import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.PrunablePartition;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.CatalogObjectsConstants;
@@ -544,14 +543,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public Map<Long, List<FileDescriptor>> getFilesSample(
-      Collection<? extends FeFsPartition> inputParts,
-      long percentBytes, long minSampleBytes, long randomSeed) {
-    // TODO(todd): implement me
-    return Collections.emptyMap();
-  }
-
-  @Override
   public ListMap<TNetworkAddress> getHostIndex() {
     return hostIndex_;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 706093c..cb33556 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -757,7 +757,8 @@ public class HdfsScanNode extends ScanNode {
       // Pass a minimum sample size of 0 because users cannot set a minimum sample size
       // for scans directly. For compute stats, a minimum sample size can be set, and
       // the sampling percent is adjusted to reflect it.
-      sampledFiles = tbl_.getFilesSample(partitions_, percentBytes, 0, randomSeed);
+      sampledFiles = FeFsTable.Utils.getFilesSample(tbl_, partitions_, percentBytes, 0,
+          randomSeed);
     }
 
     long scanRangeBytesLimit = analyzer.getQueryCtx().client_request.getQuery_options()


[2/5] impala git commit: IMPALA-7307 (part 1). Support stats extrapolation in LocalCatalog

Posted by to...@apache.org.
IMPALA-7307 (part 1). Support stats extrapolation in LocalCatalog

Tested by running 'run-tests.py -k stats_extrap' and the tests passed.

Change-Id: I479b7f517091dd558768601e1e0704a1902b78a5
Reviewed-on: http://gerrit.cloudera.org:8080/10971
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/aa26087f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/aa26087f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/aa26087f

Branch: refs/heads/master
Commit: aa26087fea6f48f3764c9ac39385221737cf4047
Parents: 4aec504
Author: Todd Lipcon <to...@cloudera.com>
Authored: Thu Jul 12 16:45:01 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 7 17:38:04 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/ComputeStatsStmt.java       |  8 ++-
 .../org/apache/impala/catalog/FeFsTable.java    | 58 ++++++++++++++++----
 .../org/apache/impala/catalog/HdfsTable.java    | 46 ++--------------
 .../impala/catalog/local/LocalFsTable.java      | 12 ----
 .../org/apache/impala/planner/HdfsScanNode.java |  4 +-
 .../impala/planner/StatsExtrapolationTest.java  |  4 +-
 6 files changed, 62 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/aa26087f/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 4b515e9..098c862 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -497,7 +497,8 @@ public class ComputeStatsStmt extends StatementBase {
       // Not computing incremental stats.
       expectAllPartitions_ = true;
       if (table_ instanceof FeFsTable) {
-        expectAllPartitions_ = !((FeFsTable) table_).isStatsExtrapolationEnabled();
+        expectAllPartitions_ = !FeFsTable.Utils.isStatsExtrapolationEnabled(
+            (FeFsTable) table_);
       }
     }
 
@@ -593,7 +594,7 @@ public class ComputeStatsStmt extends StatementBase {
       throw new AnalysisException("TABLESAMPLE is only supported on HDFS tables.");
     }
     FeFsTable hdfsTable = (FeFsTable) table_;
-    if (!hdfsTable.isStatsExtrapolationEnabled()) {
+    if (!FeFsTable.Utils.isStatsExtrapolationEnabled(hdfsTable)) {
       throw new AnalysisException(String.format(
           "COMPUTE STATS TABLESAMPLE requires stats extrapolation which is disabled.\n" +
           "Stats extrapolation can be enabled service-wide with %s=true or by altering " +
@@ -718,7 +719,8 @@ public class ComputeStatsStmt extends StatementBase {
    */
   private boolean updateTableStatsOnly() {
     if (!(table_ instanceof FeFsTable)) return true;
-    return !isIncremental_ && ((FeFsTable) table_).isStatsExtrapolationEnabled();
+    return !isIncremental_ && FeFsTable.Utils.isStatsExtrapolationEnabled(
+        (FeFsTable) table_);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/aa26087f/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 4dbbe0b..faaf5bd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -25,9 +25,11 @@ import java.util.TreeMap;
 
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.ListMap;
 
 /**
@@ -119,17 +121,6 @@ public interface FeFsTable extends FeTable {
   public String getFirstLocationWithoutWriteAccess();
 
   /**
-   * @param totalBytes_ the known number of bytes in the table
-   * @return Returns an estimated row count for the given number of file bytes
-   */
-  public long getExtrapolatedNumRows(long totalBytes);
-
-  /**
-   * @return true if stats extrapolation is enabled for this table, false otherwise.
-   */
-  boolean isStatsExtrapolationEnabled();
-
-  /**
    * @return statistics on this table as a tabular result set. Used for the
    * SHOW TABLE STATS statement. The schema of the returned TResultSet is set
    * inside this method.
@@ -198,4 +189,47 @@ public interface FeFsTable extends FeTable {
    */
   ListMap<TNetworkAddress> getHostIndex();
 
- }
+  /**
+   * Utility functions for operating on FeFsTable. When we move fully to Java 8,
+   * these can become default methods of the interface.
+   */
+  abstract class Utils {
+    /**
+     * Returns true if stats extrapolation is enabled for this table, false otherwise.
+     * Reconciles the Impalad-wide --enable_stats_extrapolation flag and the
+     * TBL_PROP_ENABLE_STATS_EXTRAPOLATION table property
+     */
+    public static boolean isStatsExtrapolationEnabled(FeFsTable table) {
+      org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable();
+      String propVal = msTbl.getParameters().get(
+          HdfsTable.TBL_PROP_ENABLE_STATS_EXTRAPOLATION);
+      if (propVal == null) return BackendConfig.INSTANCE.isStatsExtrapolationEnabled();
+      return Boolean.parseBoolean(propVal);
+    }
+
+    /**
+     * Returns an estimated row count for the given number of file bytes. The row count is
+     * extrapolated using the table-level row count and file bytes statistics.
+     * Returns zero only if the given file bytes is zero.
+     * Returns -1 if:
+     * - stats extrapolation has been disabled
+     * - the given file bytes statistic is negative
+     * - the row count or the file byte statistic is missing
+     * - the file bytes statistic is zero or negative
+     * - the row count statistic is zero and the file bytes is non-zero
+     * Otherwise, returns a value >= 1.
+     */
+    public static long getExtrapolatedNumRows(FeFsTable table, long fileBytes) {
+      if (!isStatsExtrapolationEnabled(table)) return -1;
+      if (fileBytes == 0) return 0;
+      if (fileBytes < 0) return -1;
+
+      TTableStats tableStats = table.getTTableStats();
+      if (tableStats.num_rows < 0 || tableStats.total_file_bytes <= 0) return -1;
+      if (tableStats.num_rows == 0 && tableStats.total_file_bytes != 0) return -1;
+      double rowsPerByte = tableStats.num_rows / (double) tableStats.total_file_bytes;
+      double extrapolatedNumRows = fileBytes * rowsPerByte;
+      return (long) Math.max(1, Math.round(extrapolatedNumRows));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/aa26087f/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 3d65cbb..b704a81 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1940,43 +1940,6 @@ public class HdfsTable extends Table implements FeFsTable {
     return new Pair<String, LiteralExpr>(value, expr);
   }
 
-  /**
-   * Returns an estimated row count for the given number of file bytes. The row count is
-   * extrapolated using the table-level row count and file bytes statistics.
-   * Returns zero only if the given file bytes is zero.
-   * Returns -1 if:
-   * - stats extrapolation has been disabled
-   * - the given file bytes statistic is negative
-   * - the row count or the file byte statistic is missing
-   * - the file bytes statistic is zero or negative
-   * - the row count statistic is zero and the file bytes is non-zero
-   * Otherwise, returns a value >= 1.
-   */
-  @Override // FeFsTable
-  public long getExtrapolatedNumRows(long fileBytes) {
-    if (!isStatsExtrapolationEnabled()) return -1;
-    if (fileBytes == 0) return 0;
-    if (fileBytes < 0) return -1;
-    if (tableStats_.num_rows < 0 || tableStats_.total_file_bytes <= 0) return -1;
-    if (tableStats_.num_rows == 0 && tableStats_.total_file_bytes != 0) return -1;
-    double rowsPerByte = tableStats_.num_rows / (double) tableStats_.total_file_bytes;
-    double extrapolatedNumRows = fileBytes * rowsPerByte;
-    return (long) Math.max(1, Math.round(extrapolatedNumRows));
-  }
-
-  /**
-   * Returns true if stats extrapolation is enabled for this table, false otherwise.
-   * Reconciles the Impalad-wide --enable_stats_extrapolation flag and the
-   * TBL_PROP_ENABLE_STATS_EXTRAPOLATION table property
-   */
-  @Override // FeFsTable
-  public boolean isStatsExtrapolationEnabled() {
-    org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
-    String propVal = msTbl.getParameters().get(TBL_PROP_ENABLE_STATS_EXTRAPOLATION);
-    if (propVal == null) return BackendConfig.INSTANCE.isStatsExtrapolationEnabled();
-    return Boolean.parseBoolean(propVal);
-  }
-
   @Override // FeFsTable
   public TResultSet getTableStats() {
     return getTableStats(this);
@@ -1996,7 +1959,7 @@ public class HdfsTable extends Table implements FeFsTable {
       resultSchema.addToColumns(colDesc);
     }
 
-    boolean statsExtrap = table.isStatsExtrapolationEnabled();
+    boolean statsExtrap = Utils.isStatsExtrapolationEnabled(table);
 
     resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
     if (statsExtrap) {
@@ -2036,7 +1999,9 @@ public class HdfsTable extends Table implements FeFsTable {
       // Compute and report the extrapolated row count because the set of files could
       // have changed since we last computed stats for this partition. We also follow
       // this policy during scan-cardinality estimation.
-      if (statsExtrap) rowBuilder.add(table.getExtrapolatedNumRows(size));
+      if (statsExtrap) {
+        rowBuilder.add(Utils.getExtrapolatedNumRows(table, size));
+      }
 
       rowBuilder.add(numFiles).addBytes(size);
       if (!p.isMarkedCached()) {
@@ -2090,7 +2055,8 @@ public class HdfsTable extends Table implements FeFsTable {
       // have changed since we last computed stats for this partition. We also follow
       // this policy during scan-cardinality estimation.
       if (statsExtrap) {
-        rowBuilder.add(table.getExtrapolatedNumRows(table.getTotalHdfsBytes()));
+        rowBuilder.add(Utils.getExtrapolatedNumRows(
+            table, table.getTotalHdfsBytes()));
       }
       rowBuilder.add(totalNumFiles)
           .addBytes(totalBytes)

http://git-wip-us.apache.org/repos/asf/impala/blob/aa26087f/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 2f7ae89..a47497a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -270,18 +270,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public long getExtrapolatedNumRows(long totalBytes) {
-    // TODO Auto-generated method stub
-    return -1;
-  }
-
-  @Override
-  public boolean isStatsExtrapolationEnabled() {
-    // TODO Auto-generated method stub
-    return false;
-  }
-
-  @Override
   public TResultSet getTableStats() {
     return HdfsTable.getTableStats(this);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/aa26087f/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 151cbe0..706093c 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -998,7 +998,7 @@ public class HdfsScanNode extends ScanNode {
    */
   private void computeCardinalities() {
     // Choose between the extrapolated row count and the one based on stored stats.
-    extrapolatedNumRows_ = tbl_.getExtrapolatedNumRows(totalBytes_);
+    extrapolatedNumRows_ = FeFsTable.Utils.getExtrapolatedNumRows(tbl_, totalBytes_);
     long statsNumRows = getStatsNumRows();
     if (extrapolatedNumRows_ != -1) {
       // The extrapolated row count is based on the 'totalBytes_' which already accounts
@@ -1242,7 +1242,7 @@ public class HdfsScanNode extends ScanNode {
       output.append(getStatsExplainString(detailPrefix));
       output.append("\n");
       String extrapRows = String.valueOf(extrapolatedNumRows_);
-      if (!tbl_.isStatsExtrapolationEnabled()) {
+      if (!FeFsTable.Utils.isStatsExtrapolationEnabled(tbl_)) {
         extrapRows = "disabled";
       } else if (extrapolatedNumRows_ == -1) {
         extrapRows = "unavailable";

http://git-wip-us.apache.org/repos/asf/impala/blob/aa26087f/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java b/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
index a0a8566..2582f02 100644
--- a/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.FrontendTestBase;
@@ -70,7 +71,8 @@ public class StatsExtrapolationTest extends FrontendTestBase {
       long fileBytes, long expectedExtrapNumRows) {
     Preconditions.checkState(tbl instanceof HdfsTable);
     setStats(tbl, rowCount, totalSize);
-    long actualExtrapNumRows = ((HdfsTable)tbl).getExtrapolatedNumRows(fileBytes);
+    long actualExtrapNumRows = FeFsTable.Utils.getExtrapolatedNumRows(
+        (HdfsTable)tbl, fileBytes);
     assertEquals(expectedExtrapNumRows, actualExtrapNumRows);
   }
 


[5/5] impala git commit: IMPALA-7308. Support Avro tables in LocalCatalog

Posted by to...@apache.org.
IMPALA-7308. Support Avro tables in LocalCatalog

This adds support for loading Avro-formatted tables in LocalCatalog. In
the case that the table properties indicate a table is Avro-formatted,
the semantics are identical to the existing catalog implementation:

- if an explicit avro schema is specified, it overrides the schema
  provided by the HMS
- if no explicit avro schema is specified, one is inferred, and then the
  inferred schema takes the place of the one provided by the HMS (thus
  promoting columns like TINYINT to INT)
- on COMPUTE STATS, if any discrepancy is discovered between the HMS
  schema and the inferred schema, an error is emitted.

The semantics for LocalCatalog are slightly different in the case of
tables which have not been configured as Avro format on the table level:

The existing implementation has the behavior that, when a table is
loaded, all partitions are inspected, and, if any partition is
discovered with Avro format, the above rules are applied. This has some
very unexpected results, described in an earlier email to
dev@impala.apache.org [1]. To summarize that email thread, the existing
behavior was decided to be unintuitive and inconsistent with Hive.
Additionally, this behavior requires loading all partitions up-front,
which gets in the goal of lazy/granular metadata loading in
LocalCatalog.

Thus, the LocalCatalog implementation differs as follows:

- the "schema override" behavior ONLY occurs if the Avro file format has
  been selected at a table level.

- if an Avro partition is added to a non-Avro table, and that partition
  has a schema that isn't compatible with the table's schema, an error
  will occur on read.

The thread additionally discusses adding an error message on "alter" to
prevent users from adding an Avro partition to a table with an
incompatible schema. To keep the scope of this patch minimal, that is
not yet implemented here. I filed IMPALA-7309 to change the behavior of
the existing catalog implementation to match.

A new test verifies the behavior, set to 'xfail' when running on the
existing catalog implementation.

[1] https://lists.apache.org/thread.html/fb68c54bd66a40982ee17f9f16f87a4112220a5df035a311bda310f1@%3Cdev.impala.apache.org%3E

Change-Id: Ie4b86c8203271b773a711ed77558ec3e3070cb69
Reviewed-on: http://gerrit.cloudera.org:8080/10970
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4aec5048
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4aec5048
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4aec5048

Branch: refs/heads/master
Commit: 4aec50484a51610efdea08db7af9e9737b3bc1c2
Parents: 58191d5
Author: Todd Lipcon <to...@cloudera.com>
Authored: Thu Jul 12 15:34:23 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 7 17:38:04 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/ComputeStatsStmt.java       |   4 +-
 .../org/apache/impala/catalog/FeFsTable.java    |   5 +-
 .../org/apache/impala/catalog/HdfsTable.java    |  30 ++----
 .../impala/catalog/local/LocalFsTable.java      | 104 +++++++++++++++++--
 .../apache/impala/catalog/local/LocalTable.java |   2 +-
 .../org/apache/impala/util/AvroSchemaUtils.java |  38 ++++++-
 .../impala/catalog/local/LocalCatalogTest.java  |  24 +++++
 .../QueryTest/incompatible_avro_partition.test  |  65 ++++++++++++
 tests/common/custom_cluster_test_suite.py       |   3 +
 tests/conftest.py                               |   4 +
 tests/metadata/test_partition_metadata.py       |  25 +++++
 tests/query_test/test_avro_schema_resolution.py |   1 +
 12 files changed, 270 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 52c7a15..4b515e9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -376,7 +376,7 @@ public class ComputeStatsStmt extends StatementBase {
     FeFsTable hdfsTable = null;
     if (table_ instanceof FeFsTable) {
       hdfsTable = (FeFsTable)table_;
-      if (hdfsTable.isAvroTable()) checkIncompleteAvroSchema(hdfsTable);
+      if (hdfsTable.usesAvroSchemaOverride()) checkIncompleteAvroSchema(hdfsTable);
       if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 &&
           partitionSet_ != null) {
         throw new AnalysisException(String.format(
@@ -653,7 +653,7 @@ public class ComputeStatsStmt extends StatementBase {
    * the column definitions match the Avro schema exactly.
    */
   private void checkIncompleteAvroSchema(FeFsTable table) throws AnalysisException {
-    Preconditions.checkState(table.isAvroTable());
+    Preconditions.checkState(table.usesAvroSchemaOverride());
     org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
     // The column definitions from 'CREATE TABLE (column definitions) ...'
     Iterator<FieldSchema> colDefs = msTable.getSd().getCols().iterator();

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 891bf62..4dbbe0b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -90,9 +90,10 @@ public interface FeFsTable extends FeTable {
   long getTotalHdfsBytes();
 
   /**
-   * @return true if this table is backed by the Avro file format
+   * @return true if this table's schema as stored in the HMS has been overridden
+   * by an Avro schema.
    */
-  boolean isAvroTable();
+  boolean usesAvroSchemaOverride();
 
   /**
    * @return the set of file formats that the partitions in this table use.

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index d66ddd2..3d65cbb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.NumericLiteral;
@@ -81,7 +80,6 @@ import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.AvroSchemaConverter;
-import org.apache.impala.util.AvroSchemaParser;
 import org.apache.impala.util.AvroSchemaUtils;
 import org.apache.impala.util.FsPermissionChecker;
 import org.apache.impala.util.HdfsCachingUtil;
@@ -1558,6 +1556,9 @@ public class HdfsTable extends Table implements FeFsTable {
         Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas(
             msTbl.getSd().getCols(), getFullName());
         avroSchema_ = inferredSchema.toString();
+        // NOTE: below we reconcile this inferred schema back into the table
+        // schema in the case of Avro-formatted tables. This has the side effect
+        // of promoting types like TINYINT to INT.
       }
       String serdeLib = msTbl.getSd().getSerdeInfo().getSerializationLib();
       if (serdeLib == null ||
@@ -1568,25 +1569,12 @@ public class HdfsTable extends Table implements FeFsTable {
         // using the fields from the storage descriptor (same as Hive).
         return;
       } else {
-        // Generate new FieldSchemas from the Avro schema. This step reconciles
-        // differences in the column definitions and the Avro schema. For
-        // Impala-created tables this step is not necessary because the same
-        // resolution is done during table creation. But Hive-created tables
-        // store the original column definitions, and not the reconciled ones.
-        List<ColumnDef> colDefs =
-            ColumnDef.createFromFieldSchemas(msTbl.getSd().getCols());
-        List<ColumnDef> avroCols = AvroSchemaParser.parse(avroSchema_);
-        StringBuilder warning = new StringBuilder();
-        List<ColumnDef> reconciledColDefs =
-            AvroSchemaUtils.reconcileSchemas(colDefs, avroCols, warning);
-        if (warning.length() != 0) {
-          LOG.warn(String.format("Warning while loading table %s:\n%s",
-              getFullName(), warning.toString()));
-        }
-        AvroSchemaUtils.setFromSerdeComment(reconciledColDefs);
-        // Reset and update nonPartFieldSchemas_ to the reconcicled colDefs.
+        List<FieldSchema> reconciledFieldSchemas = AvroSchemaUtils.reconcileAvroSchema(
+            msTbl, avroSchema_);
+
+        // Reset and update nonPartFieldSchemas_ to the reconciled colDefs.
         nonPartFieldSchemas_.clear();
-        nonPartFieldSchemas_.addAll(ColumnDef.toFieldSchemas(reconciledColDefs));
+        nonPartFieldSchemas_.addAll(reconciledFieldSchemas);
         // Update the columns as per the reconciled colDefs and re-load stats.
         clearColumns();
         addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
@@ -1802,7 +1790,7 @@ public class HdfsTable extends Table implements FeFsTable {
   public String getHdfsBaseDir() { return hdfsBaseDir_; }
   public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); }
   @Override // FeFsTable
-  public boolean isAvroTable() { return avroSchema_ != null; }
+  public boolean usesAvroSchemaOverride() { return avroSchema_ != null; }
 
   @Override // FeFsTable
   public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 29c9aaa..2f7ae89 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -43,6 +45,7 @@ import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
@@ -51,10 +54,13 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.AvroSchemaConverter;
+import org.apache.impala.util.AvroSchemaUtils;
 import org.apache.impala.util.ListMap;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -84,7 +90,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    */
   private ArrayList<HashSet<Long>> nullPartitionIds_;
 
-
   /**
    * The value that will be stored in a partition name to indicate NULL.
    */
@@ -96,14 +101,77 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    */
   private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<>();
 
-  public LocalFsTable(LocalDb db, Table msTbl) {
-    super(db, msTbl);
+  /**
+   * The Avro schema for this table. Non-null if this table is an Avro table.
+   * If this table is not an Avro table, this is usually null, but may be
+   * non-null in the case that an explicit external avro schema is specified
+   * as a table property. Such a schema is used when querying Avro partitions
+   * of non-Avro tables.
+   */
+  private final String avroSchema_;
+
+  public static LocalFsTable load(LocalDb db, Table msTbl) {
+    String fullName = msTbl.getDbName() + "." + msTbl.getTableName();
+
+    // Set Avro schema if necessary.
+    String avroSchema;
+    ColumnMap cmap;
+    try {
+      // Load the avro schema if it's external (explicitly specified).
+      avroSchema = loadAvroSchema(msTbl);
+
+      // If the table's format is Avro, then we should override the columns
+      // based on the schema (either inferred or explicit). Otherwise, even if
+      // there is an Avro schema set, we don't override the table-level columns:
+      // the Avro schema in that case is just used in case there is an Avro-formatted
+      // partition.
+      if (isAvroFormat(msTbl)) {
+        if (avroSchema == null) {
+          // No Avro schema was explicitly set in the table metadata, so infer the Avro
+          // schema from the column definitions.
+          Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas(
+              msTbl.getSd().getCols(), fullName);
+          avroSchema = inferredSchema.toString();
+        }
+
+        List<FieldSchema> reconciledFieldSchemas = AvroSchemaUtils.reconcileAvroSchema(
+            msTbl, avroSchema);
+        Table msTblWithExplicitAvroSchema = msTbl.deepCopy();
+        msTblWithExplicitAvroSchema.getSd().setCols(reconciledFieldSchemas);
+        cmap = ColumnMap.fromMsTable(msTblWithExplicitAvroSchema);
+      } else {
+        cmap = ColumnMap.fromMsTable(msTbl);
+      }
+
+      return new LocalFsTable(db, msTbl, cmap, avroSchema);
+    } catch (AnalysisException e) {
+      throw new LocalCatalogException("Failed to load Avro schema for table "
+          + fullName);
+    }
+  }
+
+  private LocalFsTable(LocalDb db, Table msTbl, ColumnMap cmap,
+      String explicitAvroSchema) {
+    super(db, msTbl, cmap);
 
     // set NULL indicator string from table properties
     String tableNullFormat =
         msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
     nullColumnValue_ = tableNullFormat != null ? tableNullFormat :
         FeFsTable.DEFAULT_NULL_COLUMN_VALUE;
+
+    avroSchema_ = explicitAvroSchema;
+  }
+
+  private static String loadAvroSchema(Table msTbl) throws AnalysisException {
+    List<Map<String, String>> schemaSearchLocations = ImmutableList.of(
+        msTbl.getSd().getSerdeInfo().getParameters(),
+        msTbl.getParameters());
+
+    // TODO(todd): we should consider moving this to the MetaProvider interface
+    // so that it can more easily be cached rather than re-loaded from HDFS on
+    // each table reference.
+    return AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
   }
 
   /**
@@ -116,7 +184,8 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     // so we can checkState() against it in various other methods and make
     // sure we don't try to do something like load partitions for a not-yet-created
     // table.
-    return new LocalFsTable(db, msTbl);
+    return new LocalFsTable(db, msTbl, ColumnMap.fromMsTable(msTbl),
+        /*explicitAvroSchema=*/null);
   }
 
 
@@ -172,9 +241,8 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public boolean isAvroTable() {
-    // TODO Auto-generated method stub
-    return false;
+  public boolean usesAvroSchemaOverride() {
+    return isAvroFormat(msTable_);
   }
 
   @Override
@@ -238,11 +306,19 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
         createPrototypePartition(), ThriftObjectType.DESCRIPTOR_ONLY,
         /*includeIncrementalStats=*/false);
 
-    // TODO(todd): implement avro schema support
     THdfsTable hdfsTable = new THdfsTable(getHdfsBaseDir(), getColumnNames(),
         getNullPartitionKeyValue(), nullColumnValue_, idToPartition,
         tPrototypePartition);
 
+    if (avroSchema_ != null) {
+      hdfsTable.setAvroSchema(avroSchema_);
+    } else if (hasAnyAvroPartition(partitions)) {
+      // Need to infer an Avro schema for the backend to use if any of the
+      // referenced partitions are Avro, even if the table is mixed-format.
+      hdfsTable.setAvroSchema(AvroSchemaConverter.convertFieldSchemas(
+          getMetaStoreTable().getSd().getCols(), getFullName()).toString());
+    }
+
     TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
         FeCatalogUtils.getTColumnDescriptors(this),
         getNumClusteringCols(), name_, db_.getName());
@@ -250,6 +326,18 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     return tableDesc;
   }
 
+  private static boolean isAvroFormat(Table msTbl) {
+    String inputFormat = msTbl.getSd().getInputFormat();
+    return HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO;
+  }
+
+  private static boolean hasAnyAvroPartition(List<? extends FeFsPartition> partitions) {
+    for (FeFsPartition p : partitions) {
+      if (p.getFileFormat() == HdfsFileFormat.AVRO) return true;
+    }
+    return false;
+  }
+
   private LocalFsPartition createPrototypePartition() {
     Partition protoMsPartition = new Partition();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index dd843ae..1a14831 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -81,7 +81,7 @@ abstract class LocalTable implements FeTable {
       // TODO(todd) support datasource table
     } else if (HdfsFileFormat.isHdfsInputFormatClass(
         msTbl.getSd().getInputFormat())) {
-      t = new LocalFsTable(db, msTbl);
+      t = LocalFsTable.load(db, msTbl);
     }
 
     if (t == null) {

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
index 1da466e..833204d 100644
--- a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
@@ -31,15 +31,19 @@ import org.apache.commons.io.IOUtils;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
  * Contains utility functions for dealing with Avro schemas.
  */
-public class AvroSchemaUtils {
+public abstract class AvroSchemaUtils {
+  private final static Logger LOG = LoggerFactory.getLogger(AvroSchemaUtils.class);
 
   /**
    * Gets an Avro table's JSON schema from the list of given table property search
@@ -104,6 +108,38 @@ public class AvroSchemaUtils {
   }
 
   /**
+   * Reconcile the schema in 'msTbl' with the Avro schema specified in 'avroSchema'.
+   *
+   * See {@link AvroSchemaUtils#reconcileSchemas(List, List, StringBuilder) for
+   * details.
+   */
+  public static List<FieldSchema> reconcileAvroSchema(
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      String avroSchema) throws AnalysisException {
+    Preconditions.checkNotNull(msTbl);
+    Preconditions.checkNotNull(avroSchema);
+
+    // Generate new FieldSchemas from the Avro schema. This step reconciles
+    // differences in the column definitions and the Avro schema. For
+    // Impala-created tables this step is not necessary because the same
+    // resolution is done during table creation. But Hive-created tables
+    // store the original column definitions, and not the reconciled ones.
+    List<ColumnDef> colDefs =
+        ColumnDef.createFromFieldSchemas(msTbl.getSd().getCols());
+    List<ColumnDef> avroCols = AvroSchemaParser.parse(avroSchema);
+    StringBuilder warning = new StringBuilder();
+    List<ColumnDef> reconciledColDefs =
+        AvroSchemaUtils.reconcileSchemas(colDefs, avroCols, warning);
+    if (warning.length() != 0) {
+      LOG.warn(String.format("Warning while loading table %s.%s:\n%s",
+          msTbl.getDbName(), msTbl.getTableName(), warning.toString()));
+    }
+    AvroSchemaUtils.setFromSerdeComment(reconciledColDefs);
+    return ColumnDef.toFieldSchemas(reconciledColDefs);
+  }
+
+
+  /**
    * Reconciles differences in names/types between the given list of column definitions
    * and the column definitions corresponding to an Avro Schema. Populates 'warning'
    * if there are inconsistencies between the column definitions and the Avro schema,

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 93ff5af..4cb2b96 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -239,4 +239,28 @@ public class LocalCatalogTest {
         "'serialization.format'='1')"
     ));
   }
+
+  /**
+   * Test loading an Avro table which has an explicit avro schema. The schema
+   * should override the columns from the HMS.
+   */
+  @Test
+  public void testAvroExplicitSchema() throws Exception {
+    FeFsTable t = (FeFsTable)catalog_.getTable("functional_avro", "zipcode_incomes");
+    assertNotNull(t.toThriftDescriptor(0, null).hdfsTable.avroSchema);
+    assertTrue(t.usesAvroSchemaOverride());
+  }
+
+  /**
+   * Test loading a table which does not have an explicit avro schema property.
+   * In this case we create an avro schema on demand from the table schema.
+   */
+  @Test
+  public void testAvroImplicitSchema() throws Exception {
+    FeFsTable t = (FeFsTable)catalog_.getTable("functional_avro_snap", "no_avro_schema");
+    assertNotNull(t.toThriftDescriptor(0, null).hdfsTable.avroSchema);
+    // The tinyint column should get promoted to INT to be Avro-compatible.
+    assertEquals(t.getColumn("tinyint_col").getType(), Type.INT);
+    assertTrue(t.usesAvroSchemaOverride());
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/testdata/workloads/functional-query/queries/QueryTest/incompatible_avro_partition.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/incompatible_avro_partition.test b/testdata/workloads/functional-query/queries/QueryTest/incompatible_avro_partition.test
new file mode 100644
index 0000000..9ca0df7
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/incompatible_avro_partition.test
@@ -0,0 +1,65 @@
+====
+---- QUERY
+create external table mixed (
+  id INT COMMENT 'int commnet',
+  bool_col BOOLEAN COMMENT 'bool commnet',
+  tinyint_col TINYINT COMMENT 'tinyint comment',
+  smallint_col SMALLINT COMMENT 'smallint comment',
+  int_col INT COMMENT 'int comment',
+  bigint_col BIGINT COMMENT 'bigint comment',
+  float_col FLOAT COMMENT 'float comment',
+  double_col DOUBLE COMMENT 'double comment',
+  date_string_col STRING COMMENT 'string comment',
+  char_col char(2) COMMENT 'char comment',
+  varchar_col varchar(5) COMMENT 'varchar comment'
+) partitioned by (part int) stored as $MAIN_TABLE_FORMAT;
+====
+---- QUERY
+# Add a first partition which is not avro
+insert into mixed partition (part = 1)
+values (
+  1, false, 2, 3, 4, 5, 6.0, 7.0, '1985-07-15',
+  cast('c2' as char(2)),
+  cast('my varchar' as varchar(5)));
+====
+---- QUERY
+# And a second partition which is avro
+alter table mixed add partition (part = 2);
+alter table mixed partition (part = 2) set fileformat avro;
+====
+---- QUERY
+# The query should still yield the original types, even though there is
+# now an avro partition.
+select * from mixed;
+---- TYPES
+int, boolean, tinyint, smallint, int, bigint, float, double, string, char, string, int
+---- RESULTS
+1,false,2,3,4,5,6,7,'1985-07-15','c2','my va',1
+====
+---- QUERY
+# invalidate should have no effect
+invalidate metadata mixed;
+select * from mixed;
+---- TYPES
+int, boolean, tinyint, smallint, int, bigint, float, double, string, char, string, int
+---- RESULTS
+1,false,2,3,4,5,6,7,'1985-07-15','c2','my va',1
+====
+---- QUERY
+# Add incompatible data in the avro partition.
+alter table mixed partition (part = 2) set location '/test-warehouse/alltypes_avro/year=2009/month=1';
+refresh mixed;
+====
+---- QUERY
+# Reading data from the Avro partition should fail.
+select * from mixed;
+---- CATCH
+Unresolvable types for column 'tinyint_col': declared column type: TINYINT, table's Avro schema type: int
+====
+---- QUERY
+# Reading data from the non-Avro partition should be fine, with the same types as before.
+select * from mixed where part = 1;
+---- TYPES
+int, boolean, tinyint, smallint, int, bigint, float, double, string, char, string, int
+---- RESULTS
+1,false,2,3,4,5,6,7,'1985-07-15','c2','my va',1

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 5947ce9..d131252 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -145,6 +145,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if use_exclusive_coordinators:
       cmd.append("--use_exclusive_coordinators")
 
+    if pytest.config.option.use_local_catalog:
+      cmd.append("--impalad_args=--use_local_catalog=1")
+
     if os.environ.get("ERASURE_CODING") == "true":
       cmd.append("--impalad_args=--default_query_options=allow_erasure_coded_files=true")
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/conftest.py b/tests/conftest.py
index f01ecb2..1d64b6b 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -105,6 +105,10 @@ def pytest_addoption(parser):
   parser.addoption("--use_kerberos", action="store_true", default=False,
                    help="use kerberos transport for running tests")
 
+  parser.addoption("--use_local_catalog", dest="use_local_catalog", action="store_true",
+                   default=False, help="Run all tests against Impala configured with "
+                   "LocalCatalog.")
+
   parser.addoption("--sanity", action="store_true", default=False,
                    help="Runs a single test vector from each test to provide a quick "
                    "sanity check at the cost of lower test coverage.")

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/tests/metadata/test_partition_metadata.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py
index d23e3f0..302b0c9 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -125,6 +125,31 @@ class TestPartitionMetadata(ImpalaTestSuite):
     self.run_stmt_in_hive("select * from %s" % FQ_TBL_IMP)
 
 
+class TestMixedPartitions(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestMixedPartitions, cls).add_test_dimensions()
+    # This test only needs to be run once.
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  @pytest.mark.parametrize('main_table_format', ['parquetfile', 'textfile'])
+  def test_incompatible_avro_partition_in_non_avro_table(
+      self, vector, unique_database, main_table_format):
+    if main_table_format == 'parquetfile' and \
+        not pytest.config.option.use_local_catalog:
+      pytest.xfail("IMPALA-7309: adding an avro partition to a parquet table "
+                   "changes its schema")
+    self.run_test_case("QueryTest/incompatible_avro_partition", vector,
+                       unique_database,
+                       test_file_vars={'$MAIN_TABLE_FORMAT': main_table_format})
+
+
 class TestPartitionMetadataUncompressedTextOnly(ImpalaTestSuite):
   @classmethod
   def get_workload(self):

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/tests/query_test/test_avro_schema_resolution.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_avro_schema_resolution.py b/tests/query_test/test_avro_schema_resolution.py
index 4972adb..e660101 100644
--- a/tests/query_test/test_avro_schema_resolution.py
+++ b/tests/query_test/test_avro_schema_resolution.py
@@ -54,4 +54,5 @@ class TestAvroSchemaResolution(ImpalaTestSuite):
     ... ADD COLUMN ...
     Test for IMPALA-3776: Fix describe formatted when changing Avro schema.
     """
+    # TODO(todd): skip the "stale metadata" tests if LocalCatalog is enabled
     self.run_test_case('QueryTest/avro-schema-changes', vector, unique_database)


[3/5] impala git commit: IMPALA-7276 (re-fix). Re-fix CTAS and INSERT for LocalCatalog

Posted by to...@apache.org.
IMPALA-7276 (re-fix). Re-fix CTAS and INSERT for LocalCatalog

A recent commit merged and accidentally added a new downcast to
HdfsTable instead of FeFsTable. It also removed the implementation of
one necessary function in LocalFsTable.

This small fix re-fixes those two points so that CTAS works again.
Tested with a simple CTAS query from the shell.

Change-Id: I30bfd9a02c0a8d4e0f793ed84fd1693a44f6e9ee
Reviewed-on: http://gerrit.cloudera.org:8080/11141
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/58191d54
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/58191d54
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/58191d54

Branch: refs/heads/master
Commit: 58191d546ae20eced50c955bd0051c775960bdc1
Parents: b822200
Author: Todd Lipcon <to...@cloudera.com>
Authored: Mon Aug 6 23:16:09 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 7 17:38:04 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/local/LocalFsTable.java     | 10 ++++++++--
 .../java/org/apache/impala/planner/HdfsTableSink.java     |  3 +--
 2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/58191d54/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 7753faa..29c9aaa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -179,8 +179,14 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
 
   @Override
   public Set<HdfsFileFormat> getFileFormats() {
-    // Needed by HdfsTableSink.
-    throw new UnsupportedOperationException("TODO: implement me");
+    // TODO(todd): can we avoid loading all partitions here? this is called
+    // for any INSERT query, even if the partition is specified.
+    Collection<? extends FeFsPartition> parts = FeCatalogUtils.loadAllPartitions(this);
+    // In the case that we have no partitions added to the table yet, it's
+    // important to add the "prototype" partition as a fallback.
+    Iterable<FeFsPartition> partitionsToConsider = Iterables.concat(
+        parts, Collections.singleton(createPrototypePartition()));
+    return FeCatalogUtils.getFileFormats(partitionsToConsider);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/58191d54/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 48e7c62..03e1348 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -25,7 +25,6 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
@@ -90,7 +89,7 @@ public class HdfsTableSink extends TableSink {
       numPartitionsPerInstance = DEFAULT_NUM_PARTITIONS;
     }
 
-    HdfsTable table = (HdfsTable) targetTable_;
+    FeFsTable table = (FeFsTable) targetTable_;
     // TODO: Estimate the memory requirements more accurately by partition type.
     Set<HdfsFileFormat> formats = table.getFileFormats();
     long perPartitionMemReq = getPerPartitionMemReq(formats);