You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/02/17 23:17:12 UTC

[9/9] incubator-impala git commit: IMPALA-4828: Alter Kudu schema outside Impala may crash on read

IMPALA-4828: Alter Kudu schema outside Impala may crash on read

Creating a table in Impala, changing the column schema
outside of Impala, and then reading again in Impala may
result in a crash. Impala may attempt to dereference
pointers that aren't there. This happens if a string column
is dropped and then a new, non string column is added with
the old string column's name.

The Kudu scan token contains the projection schema, and that
is validated when opening the Kudu scanner (with the
exception of KUDU-1881), but the issue is that during
planning, Impala assumes the types/nullability of columns
haven't changed when creating the scan tokens. This is fixed
by adding a check when creating the scan token, and failing
the query if the column types changed. Impala then relies on
the Kudu client to properly validate that the underlying
schema is still represented by the scan token, and that
deserialization will fail if it no longer matches. Test
cases were added for this particular crash scenario, which now
fails during planning as expected. This does not attempt to
validate the Kudu client validation at deserialization time,
though that would be valuable coverage to add in the future.

Columns being removed don't produce a crash; the query fails
gracefully. A test was added for this case.

Columns being added should not affect this scenario, but a
test was added anyway.

Change-Id: I6d43f5bb9811e728ad592933066d006c8fb4553a
Reviewed-on: http://gerrit.cloudera.org:8080/5840
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/815c76f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/815c76f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/815c76f9

Branch: refs/heads/master
Commit: 815c76f9cbbe6585ebed961da506fc54ce2ef4e3
Parents: d845413
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Fri Jan 27 16:02:49 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 17 23:09:39 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/planner/KuduScanNode.java |  30 ++-
 tests/common/kudu_test_suite.py                 |   7 +
 tests/query_test/test_kudu.py                   | 196 ++++++++++++++++++-
 3 files changed, 227 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/815c76f9/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index cdb620c..02506e5 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -35,6 +35,7 @@ import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TKuduScanNode;
@@ -146,11 +147,36 @@ public class KuduScanNode extends ScanNode {
     Schema tableSchema = rpcTable.getSchema();
     for (SlotDescriptor desc: getTupleDesc().getSlots()) {
       String colName = desc.getColumn().getName();
+      Type colType = desc.getColumn().getType();
+      ColumnSchema kuduCol = null;
       try {
-        tableSchema.getColumn(colName);
+        kuduCol = tableSchema.getColumn(colName);
       } catch (Exception e) {
         throw new ImpalaRuntimeException("Column '" + colName + "' not found in kudu " +
-            "table " + rpcTable.getName());
+            "table " + rpcTable.getName() + ". The table metadata in Impala may be " +
+            "outdated and need to be refreshed.");
+      }
+
+      Type kuduColType = KuduUtil.toImpalaType(kuduCol.getType());
+      if (!colType.equals(kuduColType)) {
+        throw new ImpalaRuntimeException("Column '" + colName + "' is type " +
+            kuduColType.toSql() + " but Impala expected " + colType.toSql() +
+            ". The table metadata in Impala may be outdated and need to be refreshed.");
+      }
+
+      if (desc.getIsNullable() != kuduCol.isNullable()) {
+        String expected;
+        String actual;
+        if (desc.getIsNullable()) {
+          expected = "nullable";
+          actual = "not nullable";
+        } else {
+          expected = "not nullable";
+          actual = "nullable";
+        }
+        throw new ImpalaRuntimeException("Column '" + colName + "' is " + actual +
+            " but Impala expected it to be " + expected +
+            ". The table metadata in Impala may be outdated and need to be refreshed.");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/815c76f9/tests/common/kudu_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/kudu_test_suite.py b/tests/common/kudu_test_suite.py
index e9763ed..3e438d0 100644
--- a/tests/common/kudu_test_suite.py
+++ b/tests/common/kudu_test_suite.py
@@ -83,6 +83,13 @@ class KuduTestSuite(ImpalaTestSuite):
     return "".join(choice(string.lowercase) for _ in xrange(10))
 
   @classmethod
+  def to_kudu_table_name(cls, db_name, tbl_name):
+    """Return the name of the underlying Kudu table, from the Impala database and table
+    name. This must be kept in sync with KuduUtil.getDefaultCreateKuduTableName() in the
+    FE."""
+    return "impala::%s.%s" % (db_name, tbl_name)
+
+  @classmethod
   def get_kudu_table_base_name(cls, name):
     return name.split(".")[-1]
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/815c76f9/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index b7235e8..50db7ee 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -95,14 +95,202 @@ class TestKuduOperations(KuduTestSuite):
                   encoding, compression, default, blocksize, nullable, encoding,
                   compression, default, blocksize))
               indx = indx + 1
-              kudu_tbl_name = "impala::%s.%s" % (unique_database, impala_tbl_name)
-              assert kudu_client.table_exists(kudu_tbl_name)
+              assert kudu_client.table_exists(
+                  KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name))
+
+  def test_kudu_col_changed(self, cursor, kudu_client, unique_database):
+    """Test changing a Kudu column outside of Impala results in a failure on read with
+       outdated metadata (IMPALA-4828)."""
+    cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING)
+        PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
+    assert kudu_client.table_exists(
+        KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+
+    # Force metadata to be loaded on impalads
+    cursor.execute("select * from %s.foo" % (unique_database))
+
+    # Load the table via the Kudu client and change col 's' to be a different type.
+    table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+    alterer = kudu_client.new_table_alterer(table)
+    alterer.drop_column("s")
+    table = alterer.alter()
+    alterer = kudu_client.new_table_alterer(table)
+    alterer.add_column("s", "int32")
+    table = alterer.alter()
+
+    # Add some rows
+    session = kudu_client.new_session()
+    for i in range(100):
+      op = table.new_insert((i, i))
+      session.apply(op)
+    session.flush()
+
+    # Scanning should result in an error
+    try:
+      cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+      assert False
+    except Exception as e:
+      expected_error = "Column 's' is type INT but Impala expected STRING. The table "\
+          "metadata in Impala may be outdated and need to be refreshed."
+      assert expected_error in str(e)
+
+    # After a REFRESH the scan should succeed
+    cursor.execute("REFRESH %s.foo" % (unique_database))
+    cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+    assert len(cursor.fetchall()) == 100
+
+  def test_kudu_col_not_null_changed(self, cursor, kudu_client, unique_database):
+    """Test changing a NOT NULL Kudu column outside of Impala results in a failure
+       on read with outdated metadata (IMPALA-4828)."""
+    cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING NOT NULL)
+        PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
+    assert kudu_client.table_exists(
+        KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+
+    # Force metadata to be loaded on impalads
+    cursor.execute("select * from %s.foo" % (unique_database))
+
+    # Load the table via the Kudu client and change col 's' to be a different type.
+    table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+    alterer = kudu_client.new_table_alterer(table)
+    alterer.drop_column("s")
+    table = alterer.alter()
+    alterer = kudu_client.new_table_alterer(table)
+    alterer.add_column("s", "string", nullable=True)
+    table = alterer.alter()
+
+    # Add some rows
+    session = kudu_client.new_session()
+    for i in range(100):
+      op = table.new_insert((i, None))
+      session.apply(op)
+    session.flush()
+
+    # Scanning should result in an error
+    try:
+      cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+      assert False
+    except Exception as e:
+      expected_error = "Column 's' is nullable but Impala expected it to be "\
+          "not nullable. The table metadata in Impala may be outdated and need to be "\
+          "refreshed."
+      assert expected_error in str(e)
+
+    # After a REFRESH the scan should succeed
+    cursor.execute("REFRESH %s.foo" % (unique_database))
+    cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+    assert len(cursor.fetchall()) == 100
+
+  def test_kudu_col_null_changed(self, cursor, kudu_client, unique_database):
+    """Test changing a NULL Kudu column outside of Impala results in a failure
+       on read with outdated metadata (IMPALA-4828)."""
+    cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING NULL)
+        PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
+    assert kudu_client.table_exists(
+        KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+
+    # Force metadata to be loaded on impalads
+    cursor.execute("select * from %s.foo" % (unique_database))
+
+    # Load the table via the Kudu client and change col 's' to be a different type.
+    table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+    alterer = kudu_client.new_table_alterer(table)
+    alterer.drop_column("s")
+    table = alterer.alter()
+    alterer = kudu_client.new_table_alterer(table)
+    alterer.add_column("s", "string", nullable=False, default="bar")
+    table = alterer.alter()
+
+    # Add some rows
+    session = kudu_client.new_session()
+    for i in range(100):
+      op = table.new_insert((i, None))
+      session.apply(op)
+    session.flush()
+
+    # Scanning should result in an error
+    try:
+      cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+      assert False
+    except Exception as e:
+      expected_error = "Column 's' is not nullable but Impala expected it to be "\
+          "nullable. The table metadata in Impala may be outdated and need to be "\
+          "refreshed."
+      assert expected_error in str(e)
+
+    # After a REFRESH the scan should succeed
+    cursor.execute("REFRESH %s.foo" % (unique_database))
+    cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+    assert len(cursor.fetchall()) == 100
+
+  def test_kudu_col_added(self, cursor, kudu_client, unique_database):
+    """Test adding a Kudu column outside of Impala."""
+    cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY)
+        PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
+    assert kudu_client.table_exists(
+        KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+
+    # Force metadata to be loaded on impalads
+    cursor.execute("select * from %s.foo" % (unique_database))
+
+    # Load the table via the Kudu client and add a new col
+    table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+    alterer = kudu_client.new_table_alterer(table)
+    alterer.add_column("b", "int32")
+    table = alterer.alter()
+
+    # Add some rows
+    session = kudu_client.new_session()
+    op = table.new_insert((0, 0))
+    session.apply(op)
+    session.flush()
+
+    # Only the first col is visible to Impala. Impala will not know about the missing
+    # column, so '*' is expanded to known columns. This doesn't have a separate check
+    # because the query can proceed and checking would need to fetch metadata from the
+    # Kudu master, which is what REFRESH is for.
+    cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+    assert cursor.fetchall() == [(0, )]
+
+    # After a REFRESH both cols should be visible
+    cursor.execute("REFRESH %s.foo" % (unique_database))
+    cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+    assert cursor.fetchall() == [(0, 0)]
+
+  def test_kudu_col_removed(self, cursor, kudu_client, unique_database):
+    """Test removing a Kudu column outside of Impala."""
+    cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING)
+        PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
+    assert kudu_client.table_exists(
+        KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+
+    # Force metadata to be loaded on impalads
+    cursor.execute("select * from %s.foo" % (unique_database))
+    cursor.execute("insert into %s.foo values (0, 'foo')" % (unique_database))
+
+    # Load the table via the Kudu client and change col 's' to be a different type.
+    table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+    alterer = kudu_client.new_table_alterer(table)
+    alterer.drop_column("s")
+    table = alterer.alter()
+
+    # Scanning should result in an error
+    try:
+      cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+    except Exception as e:
+      expected_error = "Column 's' not found in kudu table impala::test_kudu_col_removed"
+      assert expected_error in str(e)
+
+    # After a REFRESH the scan should succeed
+    cursor.execute("REFRESH %s.foo" % (unique_database))
+    cursor.execute("SELECT * FROM %s.foo" % (unique_database))
+    assert cursor.fetchall() == [(0, )]
 
   def test_kudu_rename_table(self, cursor, kudu_client, unique_database):
     """Test Kudu table rename"""
     cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY) PARTITION BY HASH(a)
         PARTITIONS 3 STORED AS KUDU""" % unique_database)
-    kudu_tbl_name = "impala::%s.foo" % unique_database
+    kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, "foo")
     assert kudu_client.table_exists(kudu_tbl_name)
     new_kudu_tbl_name = "blah"
     cursor.execute("ALTER TABLE %s.foo SET TBLPROPERTIES('kudu.table_name'='%s')" % (
@@ -546,7 +734,7 @@ class TestImpalaKuduIntegration(KuduTestSuite):
     impala_tbl_name = "foo"
     cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a)
         PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name))
-    kudu_tbl_name = "impala::%s.%s" % (unique_database, impala_tbl_name)
+    kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name)
     assert kudu_client.table_exists(kudu_tbl_name)
     kudu_client.delete_table(kudu_tbl_name)
     assert not kudu_client.table_exists(kudu_tbl_name)