You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/23 15:40:38 UTC

[12/17] incubator-impala git commit: IMPALA-3314/IMPALA-3513: Fix querying tables/partitions altered to Avro format

IMPALA-3314/IMPALA-3513: Fix querying tables/partitions altered to Avro format

Bug: Impalads crash if we query an Avro table with stale metadata

Cause: This happens because avroSchema_ is not set in HdfsTable,
which is not propagated to the avro scanner and it doesn't have
appropriate checks to make sure the schema is non-null.

The patch fixes the following.

1. Avro scanner should gracefully handle the case where the avro schema
   is not set. Appropriate null checks and a meaning error message have
   been added.

2. This is a special case with multi-fileformat partitioned tables.
   avroSchema_ should be set in HdfsTable even if any subset of the
   partitions are backed by avro. Without this patch, we only set it
   if the base table file format is Avro.

Change-Id: I09262d3a7b85a2263c721f3beafd0cab2a1bdf4b
Reviewed-on: http://gerrit.cloudera.org:8080/3136
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/49610e2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/49610e2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/49610e2c

Branch: refs/heads/master
Commit: 49610e2cfa40aa10b626c5ae41d7f0d99d7cabc5
Parents: ca8ac35
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Sat May 7 03:01:54 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Mon May 23 08:40:20 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hdfs-avro-scanner.cc                | 13 +++++
 be/src/exec/hdfs-avro-scanner.h                 |  2 +
 .../cloudera/impala/catalog/HdfsPartition.java  |  4 ++
 .../com/cloudera/impala/catalog/HdfsTable.java  | 61 ++++++++++++++------
 .../queries/QueryTest/avro-stale-schema.test    | 36 ++++++++++++
 tests/query_test/test_avro_schema_resolution.py | 10 ++++
 6 files changed, 107 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/49610e2c/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index e7559ac..5f469b7 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -57,6 +57,15 @@ HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state)
     codegend_decode_avro_data_(NULL) {
 }
 
+Status HdfsAvroScanner::Prepare(ScannerContext* context) {
+  RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context));
+  if (scan_node_->avro_schema().schema == NULL) {
+    return Status("Missing Avro schema in scan node. This could be due to stale "
+        "metadata. Running 'invalidate metadata <tablename>' may resolve the problem.");
+  }
+  return Status::OK();
+}
+
 Function* HdfsAvroScanner::Codegen(HdfsScanNode* node,
                                    const vector<ExprContext*>& conjunct_ctxs) {
   if (!node->runtime_state()->codegen_enabled()) return NULL;
@@ -728,6 +737,10 @@ Status HdfsAvroScanner::CodegenReadRecord(
     LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before,
     BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val,
     Value* data_val) {
+  if (record.schema == NULL) {
+    return Status("Missing Avro schema in scan node. This could be due to stale "
+        "metadata. Running 'invalidate metadata <tablename>' may resolve the problem.");
+  }
   DCHECK_EQ(record.schema->type, AVRO_RECORD);
   LLVMContext& context = codegen->context();
   LlvmCodeGen::LlvmBuilder* builder =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/49610e2c/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index 682ba04..316f697 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -86,6 +86,8 @@ class HdfsAvroScanner : public BaseSequenceScanner {
 
   HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state);
 
+  virtual Status Prepare(ScannerContext* context);
+
   /// Codegen parsing records, writing tuples and evaluating predicates.
   static llvm::Function* Codegen(HdfsScanNode*,
                                  const std::vector<ExprContext*>& conjunct_ctxs);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/49610e2c/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java
index bd52682..78ac780 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsPartition.java
@@ -400,6 +400,10 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
         fileFormatDescriptor_.getFileFormat().serializationLib());
   }
 
+  public HdfsFileFormat getFileFormat() {
+    return fileFormatDescriptor_.getFileFormat();
+  }
+
   public void setLocation(String place) {
     location_ = table_.getPartitionLocationCompressor().new Location(place);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/49610e2c/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
index 2c0bae7..6b70a40 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
@@ -134,6 +134,9 @@ public class HdfsTable extends Table {
   // Avro schema of this table if this is an Avro table, otherwise null. Set in load().
   private String avroSchema_ = null;
 
+  // Set to true if any of the partitions have Avro data.
+  private boolean hasAvroData_ = false;
+
   // True if this table's metadata is marked as cached. Does not necessarily mean the
   // data is cached or that all/any partitions are cached.
   private boolean isMarkedCached_ = false;
@@ -1032,6 +1035,7 @@ public class HdfsTable extends Table {
           updatePartitionsFromHms(client, partitionsToUpdate, loadFileMetadata);
         }
       }
+      if (loadTableSchema) setAvroSchema(client, msTbl);
       updateStatsFromHmsTable(msTbl);
     } catch (TableLoadingException e) {
       throw e;
@@ -1245,25 +1249,17 @@ public class HdfsTable extends Table {
   }
 
   /**
-   * Loads table schema from Hive Metastore. It also loads column stats.
+   * Sets avroSchema_ if the table or any of the partitions in the table are stored
+   * as Avro. Additionally, this method also reconciles the schema if the column
+   * definitions from the metastore differ from the Avro schema.
    */
-  private void loadSchema(HiveMetaStoreClient client,
+  private void setAvroSchema(HiveMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
-    nonPartFieldSchemas_.clear();
-    // set nullPartitionKeyValue from the hive conf.
-    nullPartitionKeyValue_ = client.getConfigValue(
-        "hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__");
-
-    // set NULL indicator string from table properties
-    nullColumnValue_ =
-        msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
-    if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE;
-
-    // Excludes partition columns.
-    List<FieldSchema> msColDefs = msTbl.getSd().getCols();
+    Preconditions.checkState(!nonPartFieldSchemas_.isEmpty());
     String inputFormat = msTbl.getSd().getInputFormat();
-    if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO) {
-      // Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
+    if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO
+        || hasAvroData_) {
+      // Look for Avro schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
       // taking precedence.
       List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
       schemaSearchLocations.add(
@@ -1271,6 +1267,7 @@ public class HdfsTable extends Table {
       schemaSearchLocations.add(getMetaStoreTable().getParameters());
 
       avroSchema_ = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
+
       if (avroSchema_ == null) {
         // No Avro schema was explicitly set in the table metadata, so infer the Avro
         // schema from the column definitions.
@@ -1285,7 +1282,7 @@ public class HdfsTable extends Table {
         // indicates there is an issue with the table metadata since Avro table need a
         // non-native serde. Instead of failing to load the table, fall back to
         // using the fields from the storage descriptor (same as Hive).
-        nonPartFieldSchemas_.addAll(msColDefs);
+        return;
       } else {
         // Generate new FieldSchemas from the Avro schema. This step reconciles
         // differences in the column definitions and the Avro schema. For
@@ -1303,11 +1300,36 @@ public class HdfsTable extends Table {
               getFullName(), warning.toString()));
         }
         AvroSchemaUtils.setFromSerdeComment(reconciledColDefs);
+        // Reset and update nonPartFieldSchemas_ to the reconcicled colDefs.
+        nonPartFieldSchemas_.clear();
         nonPartFieldSchemas_.addAll(ColumnDef.toFieldSchemas(reconciledColDefs));
+        // Update the columns as per the reconciled colDefs and re-load stats.
+        clearColumns();
+        addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
+        addColumnsFromFieldSchemas(nonPartFieldSchemas_);
+        loadAllColumnStats(client);
       }
-    } else {
-      nonPartFieldSchemas_.addAll(msColDefs);
     }
+  }
+
+  /**
+   * Loads table schema and column stats from Hive Metastore.
+   */
+  private void loadSchema(HiveMetaStoreClient client,
+      org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
+    nonPartFieldSchemas_.clear();
+    // set nullPartitionKeyValue from the hive conf.
+    nullPartitionKeyValue_ = client.getConfigValue(
+        "hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__");
+
+    // set NULL indicator string from table properties
+    nullColumnValue_ =
+        msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
+    if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE;
+
+    // Excludes partition columns.
+    nonPartFieldSchemas_.addAll(msTbl.getSd().getCols());
+
     // The number of clustering columns is the number of partition keys.
     numClusteringCols_ = msTbl.getPartitionKeys().size();
     partitionLocationCompressor_.setClusteringColumns(numClusteringCols_);
@@ -1358,6 +1380,7 @@ public class HdfsTable extends Table {
       // If the partition is null, its HDFS path does not exist, and it was not added to
       // this table's partition list. Skip the partition.
       if (partition == null) continue;
+      if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true;
       if (msPartition.getParameters() != null) {
         partition.setNumRows(getRowCount(msPartition.getParameters()));
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/49610e2c/testdata/workloads/functional-query/queries/QueryTest/avro-stale-schema.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/avro-stale-schema.test b/testdata/workloads/functional-query/queries/QueryTest/avro-stale-schema.test
new file mode 100644
index 0000000..e21a980
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/avro-stale-schema.test
@@ -0,0 +1,36 @@
+====
+---- QUERY
+CREATE EXTERNAL TABLE alltypesagg_staleschema (
+  id INT,
+  bool_col BOOLEAN,
+  tinyint_col INT,
+  smallint_col INT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col STRING
+)
+LOCATION 'hdfs://localhost:20500//test-warehouse/alltypesaggmultifilesnopart_avro_snap'
+TBLPROPERTIES ('avro.schema.url'= '/test-warehouse/avro_schemas/functional/alltypesaggmultifilesnopart.json')
+====
+---- QUERY
+alter table alltypesagg_staleschema set fileformat avro
+====
+---- QUERY
+select count(*) from alltypesagg_staleschema
+---- CATCH
+Missing Avro schema in scan node. This could be due to stale metadata.
+====
+---- QUERY
+invalidate metadata alltypesagg_staleschema
+====
+---- QUERY
+select count(*) from alltypesagg_staleschema
+---- RESULTS
+11000
+---- TYPES
+bigint
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/49610e2c/tests/query_test/test_avro_schema_resolution.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_avro_schema_resolution.py b/tests/query_test/test_avro_schema_resolution.py
index 68e5072..21d65bd 100644
--- a/tests/query_test/test_avro_schema_resolution.py
+++ b/tests/query_test/test_avro_schema_resolution.py
@@ -41,3 +41,13 @@ class TestAvroSchemaResolution(ImpalaTestSuite):
     doesn't match file schema.
     """
     self.run_test_case('QueryTest/avro-schema-resolution', vector)
+
+  def test_avro_stale_schema(self, vector, unique_database):
+    """Test for IMPALA-3314 and IMPALA-3513. Impalad shouldn't crash with stale avro
+    metadata. Instead, should provide a meaningful error message.
+    """
+    # Create a table with default fileformat and later change it to avro using
+    # alter sql. The query runs with stale metadata and a warning should be raised.
+    # Invalidating metadata should cause the Avro schema to be properly set upon the
+    # next metadata load.
+    self.run_test_case('QueryTest/avro-stale-schema', vector, unique_database)