You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/09 06:01:17 UTC

[2/2] impala git commit: IMPALA-5052: Read and write signed integer logical types in Parquet

IMPALA-5052: Read and write signed integer logical types in Parquet

This patch maps a signed integer logical type in parquet to a supported
Impala column type. This change introduces the following mapping -

  INT_8  -> TINYINT
  INT_16 -> SMALLINT
  INT_32 -> INT
  INT_64 -> BIGINT

Also, added a parquet file with the following schema for testing -

  schema {
    optional int32 id;
    optional int32 tinyint_col (INT_8);
    optional int32 smallint_col (INT_16);
    optional int32 int_col;
    optional int64 bigint_col;
  }

Change-Id: I47a8371858c9597c6a440808cf6f933532468927
Reviewed-on: http://gerrit.cloudera.org:8080/8548
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/38461c52
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/38461c52
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/38461c52

Branch: refs/heads/master
Commit: 38461c524f64cc367c483f5f958c64ffd014fcaa
Parents: c4d950b
Author: aphadke <ap...@cloudera.com>
Authored: Wed Oct 11 16:07:13 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 9 04:55:59 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc        |   8 +++
 .../analysis/CreateTableLikeFileStmt.java       |  11 +++
 testdata/data/README                            |  11 +++
 .../data/signed_integer_logical_types.parquet   | Bin 0 -> 22716 bytes
 tests/query_test/test_insert_parquet.py         |  70 ++++++++++++++++++-
 5 files changed, 99 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 952c78e..2d40f14 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -877,6 +877,14 @@ Status HdfsParquetTableWriter::CreateSchema() {
         (type.type == TYPE_STRING &&
          state_->query_options().parquet_annotate_strings_utf8)) {
       node.__set_converted_type(ConvertedType::UTF8);
+    } else if (type.type == TYPE_TINYINT) {
+      node.__set_converted_type(ConvertedType::INT_8);
+    } else if (type.type == TYPE_SMALLINT) {
+      node.__set_converted_type(ConvertedType::INT_16);
+    } else if (type.type == TYPE_INT) {
+      node.__set_converted_type(ConvertedType::INT_32);
+    } else if (type.type == TYPE_BIGINT) {
+      node.__set_converted_type(ConvertedType::INT_64);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
index 9f17708..5f43ec3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -281,6 +281,17 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
       return Type.STRING;
     }
 
+    if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32
+        || prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) {
+      // Map signed integer types to an supported Impala column type
+      switch (orig) {
+        case INT_8: return Type.TINYINT;
+        case INT_16: return Type.SMALLINT;
+        case INT_32: return Type.INT;
+        case INT_64: return Type.BIGINT;
+      }
+    }
+
     if (orig == OriginalType.DECIMAL) {
       return ScalarType.createDecimalType(prim.getDecimalMetadata().getPrecision(),
                                            prim.getDecimalMetadata().getScale());

http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index 8b9db3a..25aa09b 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -132,3 +132,14 @@ null and non-null values. This is not actually a valid Parquet file because the
 bit-packed levels are written in the reverse order specified in the Parquet spec
 for BIT_PACKED. However, this is the order that Impala attempts to read the levels
 in - see IMPALA-3006.
+
+signed_integer_logical_types.parquet:
+Generated using a utility that uses the java Parquet API.
+The file has the following schema:
+  schema {
+    optional int32 id;
+    optional int32 tinyint_col (INT_8);
+    optional int32 smallint_col (INT_16);
+    optional int32 int_col;
+    optional int64 bigint_col;
+  }

http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/testdata/data/signed_integer_logical_types.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/signed_integer_logical_types.parquet b/testdata/data/signed_integer_logical_types.parquet
new file mode 100644
index 0000000..c3bdcc4
Binary files /dev/null and b/testdata/data/signed_integer_logical_types.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index dc73a2d..1e8ce6e 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -287,6 +287,75 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
         file_meta_data = get_parquet_metadata(parquet_file)
         assert file_meta_data.column_orders == expected_col_orders
 
+  def test_read_write_logical_types(self, vector, unique_database, tmpdir):
+    """IMPALA-5052: Read and write signed integer parquet logical types
+    This test creates a src_tbl like a parquet file. The parquet file was generated
+    to have columns with different signed integer logical types. The test verifies
+    that parquet file written by the hdfs parquet table writer using the genererated
+    file has the same column type metadata as the generated one."""
+    hdfs_path = (os.environ['DEFAULT_FS'] + "/test-warehouse/{0}.db/"
+                 "signed_integer_logical_types.parquet").format(unique_database)
+    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
+                '/testdata/data/signed_integer_logical_types.parquet', hdfs_path])
+    # Create table with signed integer logical types
+    src_tbl = "{0}.{1}".format(unique_database, "read_write_logical_type_src")
+    create_tbl_stmt = """create table {0} like parquet "{1}"
+        stored as parquet""".format(src_tbl, hdfs_path)
+    result = self.execute_query_expect_success(self.client, create_tbl_stmt)
+    # Check to see if the src_tbl column types matches the schema of the parquet
+    # file from which it was generated
+    result_src = self.execute_query_expect_success(self.client, "describe %s" %src_tbl)
+    for line in result_src.data:
+      line_split = line.split()
+      if line_split[0] == "id":
+        assert line_split[1] == 'int'
+      elif line_split[0] == "tinyint_col":
+        assert line_split[1] == 'tinyint'
+      elif line_split[0] == "smallint_col":
+        assert line_split[1] == 'smallint'
+      elif line_split[0] == "int_col":
+        assert line_split[1] == 'int'
+      else:
+        assert line_split[0] == 'bigint_col' and line_split[1] == 'bigint'
+
+    # Insert values in this table
+    insert_stmt = "insert into table {0} values(1, 2, 3, 4, 5)".format(src_tbl)
+    result = self.execute_query_expect_success(self.client, insert_stmt)
+
+    # To test the integer round tripping, a new dst_tbl is created by using the parquet
+    # file written by the src_tbl and running the following tests -
+    #   1. inserting same values into src and dst table and reading it back and comparing
+    #      them.
+    #   2. Ensuring that the column types in dst_tbl matches the column types in the
+    #      schema of the parquet file that was used to generate the src_tbl
+    result = self.execute_query_expect_success(self.client, "show files in %s" %src_tbl)
+    hdfs_path = result.data[0].split("\t")[0]
+    dst_tbl = "{0}.{1}".format(unique_database, "read_write_logical_type_dst")
+    create_tbl_stmt = 'create table {0} like parquet "{1}"'.format(dst_tbl, hdfs_path)
+    result = self.execute_query_expect_success(self.client, create_tbl_stmt)
+    result_dst = self.execute_query_expect_success(self.client, "describe %s" % dst_tbl)
+    for line in result_dst.data:
+      line_split = line.split()
+      if line_split[0] == "id":
+        assert line_split[1] == 'int'
+      elif line_split[0] == "tinyint_col":
+        assert line_split[1] == 'tinyint'
+      elif line_split[0] == "smallint_col":
+        assert line_split[1] == 'smallint'
+      elif line_split[0] == "int_col":
+        assert line_split[1] == 'int'
+      else:
+        assert line_split[0] == 'bigint_col' and line_split[1] == 'bigint'
+
+    insert_stmt = "insert into table {0} values(1, 2, 3, 4, 5)".format(dst_tbl)
+    self.execute_query_expect_success(self.client, insert_stmt)
+    # Check that the values inserted are same in both src and dst tables
+    result_src = self.execute_query_expect_success(self.client, "select * from %s"
+            % src_tbl)
+    result_dst = self.execute_query_expect_success(self.client, "select * from %s"
+            % dst_tbl)
+    assert result_src.data == result_dst.data
+
 @SkipIfIsilon.hive
 @SkipIfLocal.hive
 @SkipIfS3.hive
@@ -505,7 +574,6 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         ColumnStats('vc', 'abc banana', 'ghj xyz', 0),
         ColumnStats('st', 'abc xyz', 'lorem ipsum', 0)
     ]
-
     self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
                                       qualified_table_name, expected_min_max_values)