You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues-all@impala.apache.org by "Quanlong Huang (JIRA)" <ji...@apache.org> on 2018/10/24 06:44:00 UTC

[jira] [Created] (IMPALA-7751) Kudu insert statement should push down range partition predicates

Quanlong Huang created IMPALA-7751:
--------------------------------------

             Summary: Kudu insert statement should push down range partition predicates
                 Key: IMPALA-7751
                 URL: https://issues.apache.org/jira/browse/IMPALA-7751
             Project: IMPALA
          Issue Type: Improvement
            Reporter: Quanlong Huang


We have a job dumping newly added data in HDFS into Kudu table for good performance of point queries. Each day we create a new range partition in Kudu for the new data on this day. When we add more and more Kudu range partitions, we found performance degradation of this job.

The root cause is, the insert statement for kudu does not leverage the partition predicates for kudu range partition keys, which causes skew on the insert nodes.

How to reveal this:
Step 1: Launch impala cluster with 3 nodes.
Step 2: Create an HDFS table with more than 3 underlying files, thus will have more than 3 scan ranges
{code:sql}
create table default.metrics_tbl (
  source_id string,
  event_timestamp bigint,
  value double
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;
{code}
Upload the three attached tsv files into its directory and refresh this table in Impala.

Step 3: Create a Kudu table with mix partitions containing 3 hash partitions and 3 range partitions.
{code:sql}
create table default.metrics_kudu_tbl (
  source_id string,
  event_timestamp bigint,
  value double,
  primary key(source_id, event_timestamp)
) partition by
  hash (source_id) PARTITIONS 3,
  range (event_timestamp) (
    partition 0 <= values < 10000,
    partition 10000 <= values < 20000,
    partition 20000 <= values < 30000
) stored as kudu;
{code}
Step 4: Dump rows in HDFS table into Kudu giving partition predicates.
{code:sql}
insert into table metrics_kudu_tbl
  select source_id, event_timestamp, value from metrics_tbl
  where event_timestamp >= 10000 and event_timestamp < 20000;
{code}
Step 5: Looking into the profile, there're three fragment instances containing KuduTableSink but only one of them received and generated data.
{code:java}
    Averaged Fragment F01:
      KuduTableSink:
         - TotalNumRows: 1.00K (1000)
    Fragment F01:
      Instance 6347506799a2966d:6e82f49200000004
        KuduTableSink:
           - TotalNumRows: 3.00K (3000)
      Instance 6347506799a2966d:6e82f49200000005
        KuduTableSink:
           - TotalNumRows: 0 (0)
      Instance 6347506799a2966d:6e82f49200000003
        KuduTableSink:
           - TotalNumRows: 0 (0)
{code}
Thus, only one fragment instance of F01 is sorting and ingesting data into Impala.

Generally, if there're N range partitions and all the inserted rows are belong to one range (supplied by the partition predicates in WHERE clause), only 1/N of the insert fragments are producing data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscribe@impala.apache.org
For additional commands, e-mail: issues-all-help@impala.apache.org