You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/09 06:01:16 UTC

[1/2] impala git commit: IMPALA-3887: Wait for HDFS replication in data loading

Repository: impala
Updated Branches:
  refs/heads/master 6a87eb20a -> 38461c524


IMPALA-3887: Wait for HDFS replication in data loading

When the data loading finishes, it is possible for some HDFS blocks to
be under replicated. If impala gets the metadata before the replication
is done, some tests may fail. This patch adds a replication waiting step
in the data loading script.
Resubmitted with filesystem type check.

Change-Id: I64d9a8ea1d0a32b40047321b50a7139a8f48eac8
Reviewed-on: http://gerrit.cloudera.org:8080/8916
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c4d950b9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c4d950b9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c4d950b9

Branch: refs/heads/master
Commit: c4d950b9e96c22ca76aab288814428a775a88fa5
Parents: 6a87eb2
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Thu Dec 21 17:41:28 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 9 03:24:36 2018 +0000

----------------------------------------------------------------------
 testdata/bin/create-load-data.sh | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c4d950b9/testdata/bin/create-load-data.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index df6622a..404bdfe 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -449,6 +449,23 @@ function copy-and-load-ext-data-source {
     ${IMPALA_HOME}/testdata/bin/create-data-source-table.sql
 }
 
+function wait-hdfs-replication {
+  FAIL_COUNT=0
+  while [[ "$FAIL_COUNT" -ne "6" ]] ; do
+    FSCK_OUTPUT="$(hdfs fsck /test-warehouse)"
+    echo "$FSCK_OUTPUT"
+    if grep "Under-replicated blocks:[[:space:]]*0" <<< "$FSCK_OUTPUT"; then
+      return
+    fi
+    let FAIL_COUNT="$FAIL_COUNT"+1
+    sleep 5
+  done
+  echo "Some HDFS blocks are still under replicated after 30s."
+  echo "Some tests cannot pass without fully replicated blocks (IMPALA-3887)."
+  echo "Failing the data loading."
+  exit 1
+}
+
 # For kerberized clusters, use kerberos
 if ${CLUSTER_DIR}/admin is_kerberized; then
   LOAD_DATA_ARGS="${LOAD_DATA_ARGS} --use_kerberos --principal=${MINIKDC_PRINC_HIVE}"
@@ -534,6 +551,8 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
 
   run-step "Creating internal HBase table" create-internal-hbase-table.log \
       create-internal-hbase-table
+
+  run-step "Waiting for HDFS replication" wait-hdfs-replication.log wait-hdfs-replication
 fi
 
 # TODO: Investigate why all stats are not preserved. Theoretically, we only need to


[2/2] impala git commit: IMPALA-5052: Read and write signed integer logical types in Parquet

Posted by ta...@apache.org.
IMPALA-5052: Read and write signed integer logical types in Parquet

This patch maps a signed integer logical type in parquet to a supported
Impala column type. This change introduces the following mapping -

  INT_8  -> TINYINT
  INT_16 -> SMALLINT
  INT_32 -> INT
  INT_64 -> BIGINT

Also, added a parquet file with the following schema for testing -

  schema {
    optional int32 id;
    optional int32 tinyint_col (INT_8);
    optional int32 smallint_col (INT_16);
    optional int32 int_col;
    optional int64 bigint_col;
  }

Change-Id: I47a8371858c9597c6a440808cf6f933532468927
Reviewed-on: http://gerrit.cloudera.org:8080/8548
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/38461c52
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/38461c52
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/38461c52

Branch: refs/heads/master
Commit: 38461c524f64cc367c483f5f958c64ffd014fcaa
Parents: c4d950b
Author: aphadke <ap...@cloudera.com>
Authored: Wed Oct 11 16:07:13 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 9 04:55:59 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc        |   8 +++
 .../analysis/CreateTableLikeFileStmt.java       |  11 +++
 testdata/data/README                            |  11 +++
 .../data/signed_integer_logical_types.parquet   | Bin 0 -> 22716 bytes
 tests/query_test/test_insert_parquet.py         |  70 ++++++++++++++++++-
 5 files changed, 99 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 952c78e..2d40f14 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -877,6 +877,14 @@ Status HdfsParquetTableWriter::CreateSchema() {
         (type.type == TYPE_STRING &&
          state_->query_options().parquet_annotate_strings_utf8)) {
       node.__set_converted_type(ConvertedType::UTF8);
+    } else if (type.type == TYPE_TINYINT) {
+      node.__set_converted_type(ConvertedType::INT_8);
+    } else if (type.type == TYPE_SMALLINT) {
+      node.__set_converted_type(ConvertedType::INT_16);
+    } else if (type.type == TYPE_INT) {
+      node.__set_converted_type(ConvertedType::INT_32);
+    } else if (type.type == TYPE_BIGINT) {
+      node.__set_converted_type(ConvertedType::INT_64);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
index 9f17708..5f43ec3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -281,6 +281,17 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
       return Type.STRING;
     }
 
+    if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32
+        || prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) {
+      // Map signed integer types to an supported Impala column type
+      switch (orig) {
+        case INT_8: return Type.TINYINT;
+        case INT_16: return Type.SMALLINT;
+        case INT_32: return Type.INT;
+        case INT_64: return Type.BIGINT;
+      }
+    }
+
     if (orig == OriginalType.DECIMAL) {
       return ScalarType.createDecimalType(prim.getDecimalMetadata().getPrecision(),
                                            prim.getDecimalMetadata().getScale());

http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index 8b9db3a..25aa09b 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -132,3 +132,14 @@ null and non-null values. This is not actually a valid Parquet file because the
 bit-packed levels are written in the reverse order specified in the Parquet spec
 for BIT_PACKED. However, this is the order that Impala attempts to read the levels
 in - see IMPALA-3006.
+
+signed_integer_logical_types.parquet:
+Generated using a utility that uses the java Parquet API.
+The file has the following schema:
+  schema {
+    optional int32 id;
+    optional int32 tinyint_col (INT_8);
+    optional int32 smallint_col (INT_16);
+    optional int32 int_col;
+    optional int64 bigint_col;
+  }

http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/testdata/data/signed_integer_logical_types.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/signed_integer_logical_types.parquet b/testdata/data/signed_integer_logical_types.parquet
new file mode 100644
index 0000000..c3bdcc4
Binary files /dev/null and b/testdata/data/signed_integer_logical_types.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/38461c52/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index dc73a2d..1e8ce6e 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -287,6 +287,75 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
         file_meta_data = get_parquet_metadata(parquet_file)
         assert file_meta_data.column_orders == expected_col_orders
 
+  def test_read_write_logical_types(self, vector, unique_database, tmpdir):
+    """IMPALA-5052: Read and write signed integer parquet logical types
+    This test creates a src_tbl like a parquet file. The parquet file was generated
+    to have columns with different signed integer logical types. The test verifies
+    that parquet file written by the hdfs parquet table writer using the genererated
+    file has the same column type metadata as the generated one."""
+    hdfs_path = (os.environ['DEFAULT_FS'] + "/test-warehouse/{0}.db/"
+                 "signed_integer_logical_types.parquet").format(unique_database)
+    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
+                '/testdata/data/signed_integer_logical_types.parquet', hdfs_path])
+    # Create table with signed integer logical types
+    src_tbl = "{0}.{1}".format(unique_database, "read_write_logical_type_src")
+    create_tbl_stmt = """create table {0} like parquet "{1}"
+        stored as parquet""".format(src_tbl, hdfs_path)
+    result = self.execute_query_expect_success(self.client, create_tbl_stmt)
+    # Check to see if the src_tbl column types matches the schema of the parquet
+    # file from which it was generated
+    result_src = self.execute_query_expect_success(self.client, "describe %s" %src_tbl)
+    for line in result_src.data:
+      line_split = line.split()
+      if line_split[0] == "id":
+        assert line_split[1] == 'int'
+      elif line_split[0] == "tinyint_col":
+        assert line_split[1] == 'tinyint'
+      elif line_split[0] == "smallint_col":
+        assert line_split[1] == 'smallint'
+      elif line_split[0] == "int_col":
+        assert line_split[1] == 'int'
+      else:
+        assert line_split[0] == 'bigint_col' and line_split[1] == 'bigint'
+
+    # Insert values in this table
+    insert_stmt = "insert into table {0} values(1, 2, 3, 4, 5)".format(src_tbl)
+    result = self.execute_query_expect_success(self.client, insert_stmt)
+
+    # To test the integer round tripping, a new dst_tbl is created by using the parquet
+    # file written by the src_tbl and running the following tests -
+    #   1. inserting same values into src and dst table and reading it back and comparing
+    #      them.
+    #   2. Ensuring that the column types in dst_tbl matches the column types in the
+    #      schema of the parquet file that was used to generate the src_tbl
+    result = self.execute_query_expect_success(self.client, "show files in %s" %src_tbl)
+    hdfs_path = result.data[0].split("\t")[0]
+    dst_tbl = "{0}.{1}".format(unique_database, "read_write_logical_type_dst")
+    create_tbl_stmt = 'create table {0} like parquet "{1}"'.format(dst_tbl, hdfs_path)
+    result = self.execute_query_expect_success(self.client, create_tbl_stmt)
+    result_dst = self.execute_query_expect_success(self.client, "describe %s" % dst_tbl)
+    for line in result_dst.data:
+      line_split = line.split()
+      if line_split[0] == "id":
+        assert line_split[1] == 'int'
+      elif line_split[0] == "tinyint_col":
+        assert line_split[1] == 'tinyint'
+      elif line_split[0] == "smallint_col":
+        assert line_split[1] == 'smallint'
+      elif line_split[0] == "int_col":
+        assert line_split[1] == 'int'
+      else:
+        assert line_split[0] == 'bigint_col' and line_split[1] == 'bigint'
+
+    insert_stmt = "insert into table {0} values(1, 2, 3, 4, 5)".format(dst_tbl)
+    self.execute_query_expect_success(self.client, insert_stmt)
+    # Check that the values inserted are same in both src and dst tables
+    result_src = self.execute_query_expect_success(self.client, "select * from %s"
+            % src_tbl)
+    result_dst = self.execute_query_expect_success(self.client, "select * from %s"
+            % dst_tbl)
+    assert result_src.data == result_dst.data
+
 @SkipIfIsilon.hive
 @SkipIfLocal.hive
 @SkipIfS3.hive
@@ -505,7 +574,6 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         ColumnStats('vc', 'abc banana', 'ghj xyz', 0),
         ColumnStats('st', 'abc xyz', 'lorem ipsum', 0)
     ]
-
     self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
                                       qualified_table_name, expected_min_max_values)