You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2018/08/07 18:10:08 UTC

[5/5] impala git commit: IMPALA-7308. Support Avro tables in LocalCatalog

IMPALA-7308. Support Avro tables in LocalCatalog

This adds support for loading Avro-formatted tables in LocalCatalog. In
the case that the table properties indicate a table is Avro-formatted,
the semantics are identical to the existing catalog implementation:

- if an explicit avro schema is specified, it overrides the schema
  provided by the HMS
- if no explicit avro schema is specified, one is inferred, and then the
  inferred schema takes the place of the one provided by the HMS (thus
  promoting columns like TINYINT to INT)
- on COMPUTE STATS, if any discrepancy is discovered between the HMS
  schema and the inferred schema, an error is emitted.

The semantics for LocalCatalog are slightly different in the case of
tables which have not been configured as Avro format on the table level:

The existing implementation has the behavior that, when a table is
loaded, all partitions are inspected, and, if any partition is
discovered with Avro format, the above rules are applied. This has some
very unexpected results, described in an earlier email to
dev@impala.apache.org [1]. To summarize that email thread, the existing
behavior was decided to be unintuitive and inconsistent with Hive.
Additionally, this behavior requires loading all partitions up-front,
which gets in the goal of lazy/granular metadata loading in
LocalCatalog.

Thus, the LocalCatalog implementation differs as follows:

- the "schema override" behavior ONLY occurs if the Avro file format has
  been selected at a table level.

- if an Avro partition is added to a non-Avro table, and that partition
  has a schema that isn't compatible with the table's schema, an error
  will occur on read.

The thread additionally discusses adding an error message on "alter" to
prevent users from adding an Avro partition to a table with an
incompatible schema. To keep the scope of this patch minimal, that is
not yet implemented here. I filed IMPALA-7309 to change the behavior of
the existing catalog implementation to match.

A new test verifies the behavior, set to 'xfail' when running on the
existing catalog implementation.

[1] https://lists.apache.org/thread.html/fb68c54bd66a40982ee17f9f16f87a4112220a5df035a311bda310f1@%3Cdev.impala.apache.org%3E

Change-Id: Ie4b86c8203271b773a711ed77558ec3e3070cb69
Reviewed-on: http://gerrit.cloudera.org:8080/10970
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4aec5048
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4aec5048
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4aec5048

Branch: refs/heads/master
Commit: 4aec50484a51610efdea08db7af9e9737b3bc1c2
Parents: 58191d5
Author: Todd Lipcon <to...@cloudera.com>
Authored: Thu Jul 12 15:34:23 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 7 17:38:04 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/ComputeStatsStmt.java       |   4 +-
 .../org/apache/impala/catalog/FeFsTable.java    |   5 +-
 .../org/apache/impala/catalog/HdfsTable.java    |  30 ++----
 .../impala/catalog/local/LocalFsTable.java      | 104 +++++++++++++++++--
 .../apache/impala/catalog/local/LocalTable.java |   2 +-
 .../org/apache/impala/util/AvroSchemaUtils.java |  38 ++++++-
 .../impala/catalog/local/LocalCatalogTest.java  |  24 +++++
 .../QueryTest/incompatible_avro_partition.test  |  65 ++++++++++++
 tests/common/custom_cluster_test_suite.py       |   3 +
 tests/conftest.py                               |   4 +
 tests/metadata/test_partition_metadata.py       |  25 +++++
 tests/query_test/test_avro_schema_resolution.py |   1 +
 12 files changed, 270 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 52c7a15..4b515e9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -376,7 +376,7 @@ public class ComputeStatsStmt extends StatementBase {
     FeFsTable hdfsTable = null;
     if (table_ instanceof FeFsTable) {
       hdfsTable = (FeFsTable)table_;
-      if (hdfsTable.isAvroTable()) checkIncompleteAvroSchema(hdfsTable);
+      if (hdfsTable.usesAvroSchemaOverride()) checkIncompleteAvroSchema(hdfsTable);
       if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 &&
           partitionSet_ != null) {
         throw new AnalysisException(String.format(
@@ -653,7 +653,7 @@ public class ComputeStatsStmt extends StatementBase {
    * the column definitions match the Avro schema exactly.
    */
   private void checkIncompleteAvroSchema(FeFsTable table) throws AnalysisException {
-    Preconditions.checkState(table.isAvroTable());
+    Preconditions.checkState(table.usesAvroSchemaOverride());
     org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
     // The column definitions from 'CREATE TABLE (column definitions) ...'
     Iterator<FieldSchema> colDefs = msTable.getSd().getCols().iterator();

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 891bf62..4dbbe0b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -90,9 +90,10 @@ public interface FeFsTable extends FeTable {
   long getTotalHdfsBytes();
 
   /**
-   * @return true if this table is backed by the Avro file format
+   * @return true if this table's schema as stored in the HMS has been overridden
+   * by an Avro schema.
    */
-  boolean isAvroTable();
+  boolean usesAvroSchemaOverride();
 
   /**
    * @return the set of file formats that the partitions in this table use.

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index d66ddd2..3d65cbb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.NumericLiteral;
@@ -81,7 +80,6 @@ import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.AvroSchemaConverter;
-import org.apache.impala.util.AvroSchemaParser;
 import org.apache.impala.util.AvroSchemaUtils;
 import org.apache.impala.util.FsPermissionChecker;
 import org.apache.impala.util.HdfsCachingUtil;
@@ -1558,6 +1556,9 @@ public class HdfsTable extends Table implements FeFsTable {
         Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas(
             msTbl.getSd().getCols(), getFullName());
         avroSchema_ = inferredSchema.toString();
+        // NOTE: below we reconcile this inferred schema back into the table
+        // schema in the case of Avro-formatted tables. This has the side effect
+        // of promoting types like TINYINT to INT.
       }
       String serdeLib = msTbl.getSd().getSerdeInfo().getSerializationLib();
       if (serdeLib == null ||
@@ -1568,25 +1569,12 @@ public class HdfsTable extends Table implements FeFsTable {
         // using the fields from the storage descriptor (same as Hive).
         return;
       } else {
-        // Generate new FieldSchemas from the Avro schema. This step reconciles
-        // differences in the column definitions and the Avro schema. For
-        // Impala-created tables this step is not necessary because the same
-        // resolution is done during table creation. But Hive-created tables
-        // store the original column definitions, and not the reconciled ones.
-        List<ColumnDef> colDefs =
-            ColumnDef.createFromFieldSchemas(msTbl.getSd().getCols());
-        List<ColumnDef> avroCols = AvroSchemaParser.parse(avroSchema_);
-        StringBuilder warning = new StringBuilder();
-        List<ColumnDef> reconciledColDefs =
-            AvroSchemaUtils.reconcileSchemas(colDefs, avroCols, warning);
-        if (warning.length() != 0) {
-          LOG.warn(String.format("Warning while loading table %s:\n%s",
-              getFullName(), warning.toString()));
-        }
-        AvroSchemaUtils.setFromSerdeComment(reconciledColDefs);
-        // Reset and update nonPartFieldSchemas_ to the reconcicled colDefs.
+        List<FieldSchema> reconciledFieldSchemas = AvroSchemaUtils.reconcileAvroSchema(
+            msTbl, avroSchema_);
+
+        // Reset and update nonPartFieldSchemas_ to the reconciled colDefs.
         nonPartFieldSchemas_.clear();
-        nonPartFieldSchemas_.addAll(ColumnDef.toFieldSchemas(reconciledColDefs));
+        nonPartFieldSchemas_.addAll(reconciledFieldSchemas);
         // Update the columns as per the reconciled colDefs and re-load stats.
         clearColumns();
         addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
@@ -1802,7 +1790,7 @@ public class HdfsTable extends Table implements FeFsTable {
   public String getHdfsBaseDir() { return hdfsBaseDir_; }
   public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); }
   @Override // FeFsTable
-  public boolean isAvroTable() { return avroSchema_ != null; }
+  public boolean usesAvroSchemaOverride() { return avroSchema_ != null; }
 
   @Override // FeFsTable
   public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 29c9aaa..2f7ae89 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -43,6 +45,7 @@ import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
@@ -51,10 +54,13 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.AvroSchemaConverter;
+import org.apache.impala.util.AvroSchemaUtils;
 import org.apache.impala.util.ListMap;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -84,7 +90,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    */
   private ArrayList<HashSet<Long>> nullPartitionIds_;
 
-
   /**
    * The value that will be stored in a partition name to indicate NULL.
    */
@@ -96,14 +101,77 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    */
   private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<>();
 
-  public LocalFsTable(LocalDb db, Table msTbl) {
-    super(db, msTbl);
+  /**
+   * The Avro schema for this table. Non-null if this table is an Avro table.
+   * If this table is not an Avro table, this is usually null, but may be
+   * non-null in the case that an explicit external avro schema is specified
+   * as a table property. Such a schema is used when querying Avro partitions
+   * of non-Avro tables.
+   */
+  private final String avroSchema_;
+
+  public static LocalFsTable load(LocalDb db, Table msTbl) {
+    String fullName = msTbl.getDbName() + "." + msTbl.getTableName();
+
+    // Set Avro schema if necessary.
+    String avroSchema;
+    ColumnMap cmap;
+    try {
+      // Load the avro schema if it's external (explicitly specified).
+      avroSchema = loadAvroSchema(msTbl);
+
+      // If the table's format is Avro, then we should override the columns
+      // based on the schema (either inferred or explicit). Otherwise, even if
+      // there is an Avro schema set, we don't override the table-level columns:
+      // the Avro schema in that case is just used in case there is an Avro-formatted
+      // partition.
+      if (isAvroFormat(msTbl)) {
+        if (avroSchema == null) {
+          // No Avro schema was explicitly set in the table metadata, so infer the Avro
+          // schema from the column definitions.
+          Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas(
+              msTbl.getSd().getCols(), fullName);
+          avroSchema = inferredSchema.toString();
+        }
+
+        List<FieldSchema> reconciledFieldSchemas = AvroSchemaUtils.reconcileAvroSchema(
+            msTbl, avroSchema);
+        Table msTblWithExplicitAvroSchema = msTbl.deepCopy();
+        msTblWithExplicitAvroSchema.getSd().setCols(reconciledFieldSchemas);
+        cmap = ColumnMap.fromMsTable(msTblWithExplicitAvroSchema);
+      } else {
+        cmap = ColumnMap.fromMsTable(msTbl);
+      }
+
+      return new LocalFsTable(db, msTbl, cmap, avroSchema);
+    } catch (AnalysisException e) {
+      throw new LocalCatalogException("Failed to load Avro schema for table "
+          + fullName);
+    }
+  }
+
+  private LocalFsTable(LocalDb db, Table msTbl, ColumnMap cmap,
+      String explicitAvroSchema) {
+    super(db, msTbl, cmap);
 
     // set NULL indicator string from table properties
     String tableNullFormat =
         msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
     nullColumnValue_ = tableNullFormat != null ? tableNullFormat :
         FeFsTable.DEFAULT_NULL_COLUMN_VALUE;
+
+    avroSchema_ = explicitAvroSchema;
+  }
+
+  private static String loadAvroSchema(Table msTbl) throws AnalysisException {
+    List<Map<String, String>> schemaSearchLocations = ImmutableList.of(
+        msTbl.getSd().getSerdeInfo().getParameters(),
+        msTbl.getParameters());
+
+    // TODO(todd): we should consider moving this to the MetaProvider interface
+    // so that it can more easily be cached rather than re-loaded from HDFS on
+    // each table reference.
+    return AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
   }
 
   /**
@@ -116,7 +184,8 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     // so we can checkState() against it in various other methods and make
     // sure we don't try to do something like load partitions for a not-yet-created
     // table.
-    return new LocalFsTable(db, msTbl);
+    return new LocalFsTable(db, msTbl, ColumnMap.fromMsTable(msTbl),
+        /*explicitAvroSchema=*/null);
   }
 
 
@@ -172,9 +241,8 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public boolean isAvroTable() {
-    // TODO Auto-generated method stub
-    return false;
+  public boolean usesAvroSchemaOverride() {
+    return isAvroFormat(msTable_);
   }
 
   @Override
@@ -238,11 +306,19 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
         createPrototypePartition(), ThriftObjectType.DESCRIPTOR_ONLY,
         /*includeIncrementalStats=*/false);
 
-    // TODO(todd): implement avro schema support
     THdfsTable hdfsTable = new THdfsTable(getHdfsBaseDir(), getColumnNames(),
         getNullPartitionKeyValue(), nullColumnValue_, idToPartition,
         tPrototypePartition);
 
+    if (avroSchema_ != null) {
+      hdfsTable.setAvroSchema(avroSchema_);
+    } else if (hasAnyAvroPartition(partitions)) {
+      // Need to infer an Avro schema for the backend to use if any of the
+      // referenced partitions are Avro, even if the table is mixed-format.
+      hdfsTable.setAvroSchema(AvroSchemaConverter.convertFieldSchemas(
+          getMetaStoreTable().getSd().getCols(), getFullName()).toString());
+    }
+
     TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
         FeCatalogUtils.getTColumnDescriptors(this),
         getNumClusteringCols(), name_, db_.getName());
@@ -250,6 +326,18 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     return tableDesc;
   }
 
+  private static boolean isAvroFormat(Table msTbl) {
+    String inputFormat = msTbl.getSd().getInputFormat();
+    return HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO;
+  }
+
+  private static boolean hasAnyAvroPartition(List<? extends FeFsPartition> partitions) {
+    for (FeFsPartition p : partitions) {
+      if (p.getFileFormat() == HdfsFileFormat.AVRO) return true;
+    }
+    return false;
+  }
+
   private LocalFsPartition createPrototypePartition() {
     Partition protoMsPartition = new Partition();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index dd843ae..1a14831 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -81,7 +81,7 @@ abstract class LocalTable implements FeTable {
       // TODO(todd) support datasource table
     } else if (HdfsFileFormat.isHdfsInputFormatClass(
         msTbl.getSd().getInputFormat())) {
-      t = new LocalFsTable(db, msTbl);
+      t = LocalFsTable.load(db, msTbl);
     }
 
     if (t == null) {

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
index 1da466e..833204d 100644
--- a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
@@ -31,15 +31,19 @@ import org.apache.commons.io.IOUtils;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
  * Contains utility functions for dealing with Avro schemas.
  */
-public class AvroSchemaUtils {
+public abstract class AvroSchemaUtils {
+  private final static Logger LOG = LoggerFactory.getLogger(AvroSchemaUtils.class);
 
   /**
    * Gets an Avro table's JSON schema from the list of given table property search
@@ -104,6 +108,38 @@ public class AvroSchemaUtils {
   }
 
   /**
+   * Reconcile the schema in 'msTbl' with the Avro schema specified in 'avroSchema'.
+   *
+   * See {@link AvroSchemaUtils#reconcileSchemas(List, List, StringBuilder) for
+   * details.
+   */
+  public static List<FieldSchema> reconcileAvroSchema(
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      String avroSchema) throws AnalysisException {
+    Preconditions.checkNotNull(msTbl);
+    Preconditions.checkNotNull(avroSchema);
+
+    // Generate new FieldSchemas from the Avro schema. This step reconciles
+    // differences in the column definitions and the Avro schema. For
+    // Impala-created tables this step is not necessary because the same
+    // resolution is done during table creation. But Hive-created tables
+    // store the original column definitions, and not the reconciled ones.
+    List<ColumnDef> colDefs =
+        ColumnDef.createFromFieldSchemas(msTbl.getSd().getCols());
+    List<ColumnDef> avroCols = AvroSchemaParser.parse(avroSchema);
+    StringBuilder warning = new StringBuilder();
+    List<ColumnDef> reconciledColDefs =
+        AvroSchemaUtils.reconcileSchemas(colDefs, avroCols, warning);
+    if (warning.length() != 0) {
+      LOG.warn(String.format("Warning while loading table %s.%s:\n%s",
+          msTbl.getDbName(), msTbl.getTableName(), warning.toString()));
+    }
+    AvroSchemaUtils.setFromSerdeComment(reconciledColDefs);
+    return ColumnDef.toFieldSchemas(reconciledColDefs);
+  }
+
+
+  /**
    * Reconciles differences in names/types between the given list of column definitions
    * and the column definitions corresponding to an Avro Schema. Populates 'warning'
    * if there are inconsistencies between the column definitions and the Avro schema,

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 93ff5af..4cb2b96 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -239,4 +239,28 @@ public class LocalCatalogTest {
         "'serialization.format'='1')"
     ));
   }
+
+  /**
+   * Test loading an Avro table which has an explicit avro schema. The schema
+   * should override the columns from the HMS.
+   */
+  @Test
+  public void testAvroExplicitSchema() throws Exception {
+    FeFsTable t = (FeFsTable)catalog_.getTable("functional_avro", "zipcode_incomes");
+    assertNotNull(t.toThriftDescriptor(0, null).hdfsTable.avroSchema);
+    assertTrue(t.usesAvroSchemaOverride());
+  }
+
+  /**
+   * Test loading a table which does not have an explicit avro schema property.
+   * In this case we create an avro schema on demand from the table schema.
+   */
+  @Test
+  public void testAvroImplicitSchema() throws Exception {
+    FeFsTable t = (FeFsTable)catalog_.getTable("functional_avro_snap", "no_avro_schema");
+    assertNotNull(t.toThriftDescriptor(0, null).hdfsTable.avroSchema);
+    // The tinyint column should get promoted to INT to be Avro-compatible.
+    assertEquals(t.getColumn("tinyint_col").getType(), Type.INT);
+    assertTrue(t.usesAvroSchemaOverride());
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/testdata/workloads/functional-query/queries/QueryTest/incompatible_avro_partition.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/incompatible_avro_partition.test b/testdata/workloads/functional-query/queries/QueryTest/incompatible_avro_partition.test
new file mode 100644
index 0000000..9ca0df7
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/incompatible_avro_partition.test
@@ -0,0 +1,65 @@
+====
+---- QUERY
+create external table mixed (
+  id INT COMMENT 'int commnet',
+  bool_col BOOLEAN COMMENT 'bool commnet',
+  tinyint_col TINYINT COMMENT 'tinyint comment',
+  smallint_col SMALLINT COMMENT 'smallint comment',
+  int_col INT COMMENT 'int comment',
+  bigint_col BIGINT COMMENT 'bigint comment',
+  float_col FLOAT COMMENT 'float comment',
+  double_col DOUBLE COMMENT 'double comment',
+  date_string_col STRING COMMENT 'string comment',
+  char_col char(2) COMMENT 'char comment',
+  varchar_col varchar(5) COMMENT 'varchar comment'
+) partitioned by (part int) stored as $MAIN_TABLE_FORMAT;
+====
+---- QUERY
+# Add a first partition which is not avro
+insert into mixed partition (part = 1)
+values (
+  1, false, 2, 3, 4, 5, 6.0, 7.0, '1985-07-15',
+  cast('c2' as char(2)),
+  cast('my varchar' as varchar(5)));
+====
+---- QUERY
+# And a second partition which is avro
+alter table mixed add partition (part = 2);
+alter table mixed partition (part = 2) set fileformat avro;
+====
+---- QUERY
+# The query should still yield the original types, even though there is
+# now an avro partition.
+select * from mixed;
+---- TYPES
+int, boolean, tinyint, smallint, int, bigint, float, double, string, char, string, int
+---- RESULTS
+1,false,2,3,4,5,6,7,'1985-07-15','c2','my va',1
+====
+---- QUERY
+# invalidate should have no effect
+invalidate metadata mixed;
+select * from mixed;
+---- TYPES
+int, boolean, tinyint, smallint, int, bigint, float, double, string, char, string, int
+---- RESULTS
+1,false,2,3,4,5,6,7,'1985-07-15','c2','my va',1
+====
+---- QUERY
+# Add incompatible data in the avro partition.
+alter table mixed partition (part = 2) set location '/test-warehouse/alltypes_avro/year=2009/month=1';
+refresh mixed;
+====
+---- QUERY
+# Reading data from the Avro partition should fail.
+select * from mixed;
+---- CATCH
+Unresolvable types for column 'tinyint_col': declared column type: TINYINT, table's Avro schema type: int
+====
+---- QUERY
+# Reading data from the non-Avro partition should be fine, with the same types as before.
+select * from mixed where part = 1;
+---- TYPES
+int, boolean, tinyint, smallint, int, bigint, float, double, string, char, string, int
+---- RESULTS
+1,false,2,3,4,5,6,7,'1985-07-15','c2','my va',1

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 5947ce9..d131252 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -145,6 +145,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if use_exclusive_coordinators:
       cmd.append("--use_exclusive_coordinators")
 
+    if pytest.config.option.use_local_catalog:
+      cmd.append("--impalad_args=--use_local_catalog=1")
+
     if os.environ.get("ERASURE_CODING") == "true":
       cmd.append("--impalad_args=--default_query_options=allow_erasure_coded_files=true")
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/conftest.py b/tests/conftest.py
index f01ecb2..1d64b6b 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -105,6 +105,10 @@ def pytest_addoption(parser):
   parser.addoption("--use_kerberos", action="store_true", default=False,
                    help="use kerberos transport for running tests")
 
+  parser.addoption("--use_local_catalog", dest="use_local_catalog", action="store_true",
+                   default=False, help="Run all tests against Impala configured with "
+                   "LocalCatalog.")
+
   parser.addoption("--sanity", action="store_true", default=False,
                    help="Runs a single test vector from each test to provide a quick "
                    "sanity check at the cost of lower test coverage.")

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/tests/metadata/test_partition_metadata.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py
index d23e3f0..302b0c9 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -125,6 +125,31 @@ class TestPartitionMetadata(ImpalaTestSuite):
     self.run_stmt_in_hive("select * from %s" % FQ_TBL_IMP)
 
 
+class TestMixedPartitions(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestMixedPartitions, cls).add_test_dimensions()
+    # This test only needs to be run once.
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  @pytest.mark.parametrize('main_table_format', ['parquetfile', 'textfile'])
+  def test_incompatible_avro_partition_in_non_avro_table(
+      self, vector, unique_database, main_table_format):
+    if main_table_format == 'parquetfile' and \
+        not pytest.config.option.use_local_catalog:
+      pytest.xfail("IMPALA-7309: adding an avro partition to a parquet table "
+                   "changes its schema")
+    self.run_test_case("QueryTest/incompatible_avro_partition", vector,
+                       unique_database,
+                       test_file_vars={'$MAIN_TABLE_FORMAT': main_table_format})
+
+
 class TestPartitionMetadataUncompressedTextOnly(ImpalaTestSuite):
   @classmethod
   def get_workload(self):

http://git-wip-us.apache.org/repos/asf/impala/blob/4aec5048/tests/query_test/test_avro_schema_resolution.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_avro_schema_resolution.py b/tests/query_test/test_avro_schema_resolution.py
index 4972adb..e660101 100644
--- a/tests/query_test/test_avro_schema_resolution.py
+++ b/tests/query_test/test_avro_schema_resolution.py
@@ -54,4 +54,5 @@ class TestAvroSchemaResolution(ImpalaTestSuite):
     ... ADD COLUMN ...
     Test for IMPALA-3776: Fix describe formatted when changing Avro schema.
     """
+    # TODO(todd): skip the "stale metadata" tests if LocalCatalog is enabled
     self.run_test_case('QueryTest/avro-schema-changes', vector, unique_database)