You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Viral Bajaria <vi...@gmail.com> on 2011/01/18 11:04:54 UTC

partitioned column join does not work as expected

I am facing issues with a query where I am joining two fairly large tables
on the partitioned column along with other common columns. The expected
output is not in line with what I expect it to be. Since the query is very
complex, I will simplify it so that people can provide inputs if they have
faced similar issues or if I am doing something totally wrong.

TABLE A:
a_id bigint
common_id bigint
some_string string
total_count bigint
part_col string <---- this is the partitioned column

TABLE B:
b_int bigint
common_id bigint
some_string string
total_sum bigint
part_col string <---- this is the partitioned column

now the query is as follows:
SELECT /*+ STREAMTABLE(A,B) */ A.some_string, B.some_string,
sum(A.total_count), sum(B.total_sum) from A JOIN B ON (t1.part_col =
t2.part_col AND t1.common_id = t2.common_id) WHERE t1.part_col >= 'val1' AND
t2.part_col >= 'val1' GROUP BY A.some_string, B.some_string

Does HIVE not like to join on the partitioned columns ? because when i
create a join on just the partitioned column the reduce step never finishes.

I am using HIVE 0.5.0

Thanks,
Viral

Re: partitioned column join does not work as expected

Posted by Viral Bajaria <vi...@gmail.com>.
Thanks again.

I think I figured out the bug (not sure if it's a bug or whether that's
a known limitation when creating a third-level join) .... we need another
table c to re-create my scenario.

table_a
create table table_a(a_id bigint, common_id bigint, int_a int, int_b int,
int_c int, int_d int, string_a string, total_count bigint) partitioned by
(part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES
TERMINATED BY '\n' STORED AS TEXTFILE;

table_b
create table table_b(b_id bigint, common_id bigint, int_a int, int_b int,
int_c int, int_d int, string_b string, total_count bigint) partitioned by
(part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES
TERMINATED BY '\n' STORED AS TEXTFILE;
table_c
create table table_c(int_c int, string_c string) ROW FORMAT DELIMITED FIELDS
TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;

query
explain select c.string_c, sum(a.total_count), sum(b.total_count) from
table_a a join table_b b on a.common_id = b.common_id and a.int_c = b.int_c
and a.int_d = b.int_d join table_c c on a.int_c = c.int_c where a.part_col
>= "blah1" and b.part_col >= "blah1" group by c.string_c;

If you look at the query plan for table_b you will see the value expression
does not project the column int_c, and if you look at the join operator in
the query plan it show a join operating between 3 columns from table_a and 2
columns from table_b which is not the intention of the query.

I think hive should output all the columns from table_b which are part of
the join conditions and not look to see if the column is going to be
consumed in the later stages.

Do you think I am not writing the hive query in a right way ? the query
would return results as expected in a mysql or sql-server environment.

Thanks,
Viral
On Wed, Jan 19, 2011 at 11:06 AM, Appan Thirumaligai <
athirumaligai@ngmoco.com> wrote:

>  EXPLAIN select
> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
> table_a t1 join table_b t2 on t1.part_col = t2.part_col and t1.common_id =
> t2.common_id where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group
> by t1.some_string,t2.some_string;
>
> OK
> ABSTRACT SYNTAX TREE:
>   (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF table_a t1) (TOK_TABREF
> table_b t2) (and (= (. (TOK_TABLE_OR_COL t1) part_col) (. (TOK_TABLE_OR_COL
> t2) part_col)) (= (. (TOK_TABLE_OR_COL t1) common_id) (. (TOK_TABLE_OR_COL
> t2) common_id))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
> (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL t1) some_string)) (TOK_SELEXPR
> (. (TOK_TABLE_OR_COL t2) some_string)) (TOK_SELEXPR (TOK_FUNCTION sum (.
> (TOK_TABLE_OR_COL t1) total_count))) (TOK_SELEXPR (TOK_FUNCTION sum (.
> (TOK_TABLE_OR_COL t2) total_count)))) (TOK_WHERE (and (>= (.
> (TOK_TABLE_OR_COL t1) part_col) 'mypart') (>= (. (TOK_TABLE_OR_COL t2)
> part_col) 'mypart'))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL t1) some_string) (.
> (TOK_TABLE_OR_COL t2) some_string))))
>
> STAGE DEPENDENCIES:
>   Stage-1 is a root stage
>   Stage-2 depends on stages: Stage-1
>   Stage-0 is a root stage
>
> STAGE PLANS:
>   Stage: Stage-1
>     Map Reduce
>       Alias -> Map Operator Tree:
>         t1
>           TableScan
>             alias: t1
>             Filter Operator
>               predicate:
>                   expr: (part_col >= 'mypart')
>                   type: boolean
>               Reduce Output Operator
>                 key expressions:
>                       expr: part_col
>                       type: string
>                       expr: common_id
>                       type: bigint
>                 sort order: ++
>                 Map-reduce partition columns:
>                       expr: part_col
>                       type: string
>                       expr: common_id
>                       type: bigint
>                 tag: 0
>                 value expressions:
>                       expr: some_string
>                       type: string
>                       expr: total_count
>                       type: bigint
>                       expr: part_col
>                       type: string
>         t2
>           TableScan
>             alias: t2
>             Filter Operator
>               predicate:
>                   expr: (part_col >= 'mypart')
>                   type: boolean
>               Reduce Output Operator
>                 key expressions:
>                       expr: part_col
>                       type: string
>                       expr: common_id
>                       type: bigint
>                 sort order: ++
>                 Map-reduce partition columns:
>                       expr: part_col
>                       type: string
>                       expr: common_id
>                       type: bigint
>                 tag: 1
>                 value expressions:
>                       expr: some_string
>                       type: string
>                       expr: total_count
>                       type: bigint
>                       expr: part_col
>                       type: string
>       Reduce Operator Tree:
>         Join Operator
>           condition map:
>                Inner Join 0 to 1
>           condition expressions:
>             0 {VALUE._col2} {VALUE._col3} {VALUE._col4}
>             1 {VALUE._col2} {VALUE._col3} {VALUE._col4}
>           handleSkewJoin: false
>           outputColumnNames: _col2, _col3, _col4, _col9, _col10, _col11
>           Filter Operator
>             predicate:
>                 expr: ((_col4 >= 'mypart') and (_col11 >= 'mypart'))
>                 type: boolean
>             Select Operator
>               expressions:
>                     expr: _col2
>                     type: string
>                     expr: _col9
>                     type: string
>                     expr: _col3
>                     type: bigint
>                     expr: _col10
>                     type: bigint
>               outputColumnNames: _col2, _col9, _col3, _col10
>               Group By Operator
>                 aggregations:
>                       expr: sum(_col3)
>                       expr: sum(_col10)
>                 bucketGroup: false
>                 keys:
>                       expr: _col2
>                       type: string
>                       expr: _col9
>                       type: string
>                 mode: hash
>                 outputColumnNames: _col0, _col1, _col2, _col3
>                 File Output Operator
>                   compressed: false
>                   GlobalTableId: 0
>                   table:
>                       input format:
> org.apache.hadoop.mapred.SequenceFileInputFormat
>                       output format:
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>
>   Stage: Stage-2
>     Map Reduce
>       Alias -> Map Operator Tree:
>
> hdfs://localhost:8022/tmp/hive-training/hive_2011-01-19_11-05-34_526_453408324928472657/-mr-10002
>
>             Reduce Output Operator
>               key expressions:
>                     expr: _col0
>                     type: string
>                     expr: _col1
>                     type: string
>               sort order: ++
>               Map-reduce partition columns:
>                     expr: _col0
>                     type: string
>                     expr: _col1
>                     type: string
>               tag: -1
>               value expressions:
>                     expr: _col2
>                     type: bigint
>                     expr: _col3
>                     type: bigint
>       Reduce Operator Tree:
>         Group By Operator
>           aggregations:
>                 expr: sum(VALUE._col0)
>                 expr: sum(VALUE._col1)
>           bucketGroup: false
>           keys:
>                 expr: KEY._col0
>                 type: string
>                 expr: KEY._col1
>                 type: string
>           mode: mergepartial
>           outputColumnNames: _col0, _col1, _col2, _col3
>           Select Operator
>             expressions:
>                   expr: _col0
>                   type: string
>                   expr: _col1
>                   type: string
>                   expr: _col2
>                   type: bigint
>                   expr: _col3
>                   type: bigint
>             outputColumnNames: _col0, _col1, _col2, _col3
>             File Output Operator
>               compressed: false
>               GlobalTableId: 0
>               table:
>                   input format: org.apache.hadoop.mapred.TextInputFormat
>                   output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
>
>  On Jan 19, 2011, at 10:47 AM, Viral Bajaria wrote:
>
>  Thanks Appan for verifying. I will do some more tests on my side too and
> let you know the results.
>
> I tried a different version of the query where I join'ed two sub-queries
> for the same partitions and the data comes out to be correct.
>
> I will see if I can post the real-world example to the list, because that
> might sound like a more practical example.
>
> If you still have your example(s) do you mind sending me your query-plan
> for
>
> select
> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
> table_a t1 join table_b t2 on t1.part_col = t2.part_col and t1.common_id =
> t2.common_id where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group
> by t1.some_string,t2.some_string;
>
> -Viral
> On Wed, Jan 19, 2011 at 10:36 AM, Appan Thirumaligai <
> athirumaligai@ngmoco.com> wrote:
>
>> Viral,
>>
>> I tried the queries below (similar to yours) and I get the expected
>> results when I do the join. I ran my queries after building hive from the
>> latest source and hadoop 0.20+.
>>  create table table_a(a_id bigint, common_id bigint, some_string
>> string,total_count bigint) partitioned by (part_col string)  ROW FORMAT
>> DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS
>> TEXTFILE;
>> create table table_b(b_id bigint, common_id bigint, some_string
>> string,total_count bigint) partitioned by (part_col string)  ROW FORMAT
>> DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS
>> TEXTFILE;
>> dfs -mkdir /user/data/table_a;
>> dfs -mkdir /user/data/table_b;
>> dfs -put /home/training/hiveug/table_a.csv /user/data/table_a;
>> dfs -put /home/training/hiveug/table_b.csv /user/data/table_b;
>> alter table table_a add partition (part_col = 'mypart') location
>> '/user/data/table_a';
>> alter table table_b add partition (part_col = 'mypart') location
>> '/user/data/table_b';
>> select * from table_a t1 join table_b t2 on t1.part_col == t2.part_col;
>> -->> Returns expected result
>> select
>> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
>> table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >=
>> 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_string;
>> --->>Works fine.
>> select
>> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
>> table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >=
>> 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_st*
>> from table_a t1 join table_b t2 on t1.part_col = t2.part_col where
>> t1.part_col >= 'mypart' and t2.part_col >= 'mypart';
>> --->Works fine.
>>
>> I created the two files with sample data in them and copied it to hdfs
>>
>> I'll try later on your hive 0.5.0 but looks like there might be something
>> wrong in your query.
>>
>>  On Jan 18, 2011, at 8:40 PM, Ajo Fod wrote:
>>
>>  Can you try this with a dummy table with very few rows ... to see if
>> the reason the script doesn't finish is a computational issue?
>>
>> One other thing is to try with a combined partition, to see if it is a
>> problem with the partitioning.
>>
>> Also, take a look at  the results of an EXPLAIN statement, see if
>> there are any hints there.
>>
>> NOTE: I'm new to hive too.
>>
>> -Ajo
>>
>>
>> On Tue, Jan 18, 2011 at 8:08 PM, Viral Bajaria <vi...@gmail.com>
>> wrote:
>>
>> I haven't heard back from any on the list and am still struggling to join
>>
>> two tables on partitioned column
>>
>>
>> Has anyone every tried joining two tables on a paritioned column and the
>>
>> results are not as expected ?
>>
>> On Tue, Jan 18, 2011 at 2:04 AM, Viral Bajaria <vi...@gmail.com>
>>
>> wrote:
>>
>>
>>  I am facing issues with a query where I am joining two fairly large
>> tables
>>
>>  on the partitioned column along with other common columns. The expected
>>
>>  output is not in line with what I expect it to be. Since the query is
>> very
>>
>>  complex, I will simplify it so that people can provide inputs if they
>> have
>>
>>  faced similar issues or if I am doing something totally wrong.
>>
>>  TABLE A:
>>
>>  a_id bigint
>>
>>  common_id bigint
>>
>>  some_string string
>>
>>  total_count bigint
>>
>>  part_col string <---- this is the partitioned column
>>
>>  TABLE B:
>>
>>  b_int bigint
>>
>>  common_id bigint
>>
>>  some_string string
>>
>>  total_sum bigint
>>
>>  part_col string <---- this is the partitioned column
>>
>>  now the query is as follows:
>>
>>  SELECT /*+ STREAMTABLE(A,B) */ A.some_string, B.some_string,
>>
>>  sum(A.total_count), sum(B.total_sum) from A JOIN B ON (t1.part_col =
>>
>>  t2.part_col AND t1.common_id = t2.common_id) WHERE t1.part_col >= 'val1'
>> AND
>>
>>  t2.part_col >= 'val1' GROUP BY A.some_string, B.some_string
>>
>>  Does HIVE not like to join on the partitioned columns ? because when i
>>
>>  create a join on just the partitioned column the reduce step never
>> finishes.
>>
>>  I am using HIVE 0.5.0
>>
>>  Thanks,
>>
>>  Viral
>>
>>
>>
>>    Appan Thirumaligai
>> appan@ngmoco.com
>> Ph:1-818-472-8427
>> ngmoco:)
>>
>>
>
>    Appan Thirumaligai
> appan@ngmoco.com
> Ph:1-818-472-8427
> ngmoco:)
>
>

Re: partitioned column join does not work as expected

Posted by Appan Thirumaligai <at...@ngmoco.com>.
EXPLAIN select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a t1 join table_b t2 on t1.part_col = t2.part_col and t1.common_id = t2.common_id where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_string;

OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF table_a t1) (TOK_TABREF table_b t2) (and (= (. (TOK_TABLE_OR_COL t1) part_col) (. (TOK_TABLE_OR_COL t2) part_col)) (= (. (TOK_TABLE_OR_COL t1) common_id) (. (TOK_TABLE_OR_COL t2) common_id))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL t1) some_string)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL t2) some_string)) (TOK_SELEXPR (TOK_FUNCTION sum (. (TOK_TABLE_OR_COL t1) total_count))) (TOK_SELEXPR (TOK_FUNCTION sum (. (TOK_TABLE_OR_COL t2) total_count)))) (TOK_WHERE (and (>= (. (TOK_TABLE_OR_COL t1) part_col) 'mypart') (>= (. (TOK_TABLE_OR_COL t2) part_col) 'mypart'))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL t1) some_string) (. (TOK_TABLE_OR_COL t2) some_string))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        t1 
          TableScan
            alias: t1
            Filter Operator
              predicate:
                  expr: (part_col >= 'mypart')
                  type: boolean
              Reduce Output Operator
                key expressions:
                      expr: part_col
                      type: string
                      expr: common_id
                      type: bigint
                sort order: ++
                Map-reduce partition columns:
                      expr: part_col
                      type: string
                      expr: common_id
                      type: bigint
                tag: 0
                value expressions:
                      expr: some_string
                      type: string
                      expr: total_count
                      type: bigint
                      expr: part_col
                      type: string
        t2 
          TableScan
            alias: t2
            Filter Operator
              predicate:
                  expr: (part_col >= 'mypart')
                  type: boolean
              Reduce Output Operator
                key expressions:
                      expr: part_col
                      type: string
                      expr: common_id
                      type: bigint
                sort order: ++
                Map-reduce partition columns:
                      expr: part_col
                      type: string
                      expr: common_id
                      type: bigint
                tag: 1
                value expressions:
                      expr: some_string
                      type: string
                      expr: total_count
                      type: bigint
                      expr: part_col
                      type: string
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          condition expressions:
            0 {VALUE._col2} {VALUE._col3} {VALUE._col4}
            1 {VALUE._col2} {VALUE._col3} {VALUE._col4}
          handleSkewJoin: false
          outputColumnNames: _col2, _col3, _col4, _col9, _col10, _col11
          Filter Operator
            predicate:
                expr: ((_col4 >= 'mypart') and (_col11 >= 'mypart'))
                type: boolean
            Select Operator
              expressions:
                    expr: _col2
                    type: string
                    expr: _col9
                    type: string
                    expr: _col3
                    type: bigint
                    expr: _col10
                    type: bigint
              outputColumnNames: _col2, _col9, _col3, _col10
              Group By Operator
                aggregations:
                      expr: sum(_col3)
                      expr: sum(_col10)
                bucketGroup: false
                keys:
                      expr: _col2
                      type: string
                      expr: _col9
                      type: string
                mode: hash
                outputColumnNames: _col0, _col1, _col2, _col3
                File Output Operator
                  compressed: false
                  GlobalTableId: 0
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

  Stage: Stage-2
    Map Reduce
      Alias -> Map Operator Tree:
        hdfs://localhost:8022/tmp/hive-training/hive_2011-01-19_11-05-34_526_453408324928472657/-mr-10002 
            Reduce Output Operator
              key expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              sort order: ++
              Map-reduce partition columns:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              tag: -1
              value expressions:
                    expr: _col2
                    type: bigint
                    expr: _col3
                    type: bigint
      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: sum(VALUE._col0)
                expr: sum(VALUE._col1)
          bucketGroup: false
          keys:
                expr: KEY._col0
                type: string
                expr: KEY._col1
                type: string
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2, _col3
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: string
                  expr: _col2
                  type: bigint
                  expr: _col3
                  type: bigint
            outputColumnNames: _col0, _col1, _col2, _col3
            File Output Operator
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1

On Jan 19, 2011, at 10:47 AM, Viral Bajaria wrote:

> Thanks Appan for verifying. I will do some more tests on my side too and let you know the results.
>  
> I tried a different version of the query where I join'ed two sub-queries for the same partitions and the data comes out to be correct.
>  
> I will see if I can post the real-world example to the list, because that might sound like a more practical example.
>  
> If you still have your example(s) do you mind sending me your query-plan for
>  
> select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a t1 join table_b t2 on t1.part_col = t2.part_col and t1.common_id = t2.common_id where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_string;
>  
> -Viral
> On Wed, Jan 19, 2011 at 10:36 AM, Appan Thirumaligai <at...@ngmoco.com> wrote:
> Viral,
> 
> I tried the queries below (similar to yours) and I get the expected results when I do the join. I ran my queries after building hive from the latest source and hadoop 0.20+.
> create table table_a(a_id bigint, common_id bigint, some_string string,total_count bigint) partitioned by (part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
> create table table_b(b_id bigint, common_id bigint, some_string string,total_count bigint) partitioned by (part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
> dfs -mkdir /user/data/table_a;
> dfs -mkdir /user/data/table_b;
> dfs -put /home/training/hiveug/table_a.csv /user/data/table_a;
> dfs -put /home/training/hiveug/table_b.csv /user/data/table_b;
> alter table table_a add partition (part_col = 'mypart') location '/user/data/table_a';
> alter table table_b add partition (part_col = 'mypart') location '/user/data/table_b';
> select * from table_a t1 join table_b t2 on t1.part_col == t2.part_col; 
> -->> Returns expected result
> select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_string;
> --->>Works fine.
> select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_st* from table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >= 'mypart' and t2.part_col >= 'mypart';
> --->Works fine.
> 
> I created the two files with sample data in them and copied it to hdfs
> 
> I'll try later on your hive 0.5.0 but looks like there might be something wrong in your query.
> 
> On Jan 18, 2011, at 8:40 PM, Ajo Fod wrote:
> 
>> Can you try this with a dummy table with very few rows ... to see if
>> the reason the script doesn't finish is a computational issue?
>> 
>> One other thing is to try with a combined partition, to see if it is a
>> problem with the partitioning.
>> 
>> Also, take a look at  the results of an EXPLAIN statement, see if
>> there are any hints there.
>> 
>> NOTE: I'm new to hive too.
>> 
>> -Ajo
>> 
>> 
>> On Tue, Jan 18, 2011 at 8:08 PM, Viral Bajaria <vi...@gmail.com> wrote:
>>> I haven't heard back from any on the list and am still struggling to join
>>> two tables on partitioned column
>>> 
>>> Has anyone every tried joining two tables on a paritioned column and the
>>> results are not as expected ?
>>> On Tue, Jan 18, 2011 at 2:04 AM, Viral Bajaria <vi...@gmail.com>
>>> wrote:
>>>> 
>>>> I am facing issues with a query where I am joining two fairly large tables
>>>> on the partitioned column along with other common columns. The expected
>>>> output is not in line with what I expect it to be. Since the query is very
>>>> complex, I will simplify it so that people can provide inputs if they have
>>>> faced similar issues or if I am doing something totally wrong.
>>>> TABLE A:
>>>> a_id bigint
>>>> common_id bigint
>>>> some_string string
>>>> total_count bigint
>>>> part_col string <---- this is the partitioned column
>>>> TABLE B:
>>>> b_int bigint
>>>> common_id bigint
>>>> some_string string
>>>> total_sum bigint
>>>> part_col string <---- this is the partitioned column
>>>> now the query is as follows:
>>>> SELECT /*+ STREAMTABLE(A,B) */ A.some_string, B.some_string,
>>>> sum(A.total_count), sum(B.total_sum) from A JOIN B ON (t1.part_col =
>>>> t2.part_col AND t1.common_id = t2.common_id) WHERE t1.part_col >= 'val1' AND
>>>> t2.part_col >= 'val1' GROUP BY A.some_string, B.some_string
>>>> Does HIVE not like to join on the partitioned columns ? because when i
>>>> create a join on just the partitioned column the reduce step never finishes.
>>>> I am using HIVE 0.5.0
>>>> Thanks,
>>>> Viral
>>> 
> 
> Appan Thirumaligai
> appan@ngmoco.com
> Ph:1-818-472-8427
> ngmoco:)
> 
> 

Appan Thirumaligai
appan@ngmoco.com
Ph:1-818-472-8427
ngmoco:)


Re: partitioned column join does not work as expected

Posted by Viral Bajaria <vi...@gmail.com>.
Thanks Appan for verifying. I will do some more tests on my side too and let
you know the results.

I tried a different version of the query where I join'ed two sub-queries for
the same partitions and the data comes out to be correct.

I will see if I can post the real-world example to the list, because that
might sound like a more practical example.

If you still have your example(s) do you mind sending me your query-plan for


select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count)
from table_a t1 join table_b t2 on t1.part_col = t2.part_col and
t1.common_id = t2.common_id where t1.part_col >= 'mypart' and t2.part_col >=
'mypart' group by t1.some_string,t2.some_string;

-Viral
On Wed, Jan 19, 2011 at 10:36 AM, Appan Thirumaligai <
athirumaligai@ngmoco.com> wrote:

> Viral,
>
> I tried the queries below (similar to yours) and I get the expected results
> when I do the join. I ran my queries after building hive from the latest
> source and hadoop 0.20+.
>  create table table_a(a_id bigint, common_id bigint, some_string
> string,total_count bigint) partitioned by (part_col string)  ROW FORMAT
> DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS
> TEXTFILE;
> create table table_b(b_id bigint, common_id bigint, some_string
> string,total_count bigint) partitioned by (part_col string)  ROW FORMAT
> DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS
> TEXTFILE;
> dfs -mkdir /user/data/table_a;
> dfs -mkdir /user/data/table_b;
> dfs -put /home/training/hiveug/table_a.csv /user/data/table_a;
> dfs -put /home/training/hiveug/table_b.csv /user/data/table_b;
> alter table table_a add partition (part_col = 'mypart') location
> '/user/data/table_a';
> alter table table_b add partition (part_col = 'mypart') location
> '/user/data/table_b';
> select * from table_a t1 join table_b t2 on t1.part_col == t2.part_col;
> -->> Returns expected result
> select
> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
> table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >=
> 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_string;
> --->>Works fine.
> select
> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
> table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >=
> 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_st*
> from table_a t1 join table_b t2 on t1.part_col = t2.part_col where
> t1.part_col >= 'mypart' and t2.part_col >= 'mypart';
> --->Works fine.
>
> I created the two files with sample data in them and copied it to hdfs
>
> I'll try later on your hive 0.5.0 but looks like there might be something
> wrong in your query.
>
>  On Jan 18, 2011, at 8:40 PM, Ajo Fod wrote:
>
>  Can you try this with a dummy table with very few rows ... to see if
> the reason the script doesn't finish is a computational issue?
>
> One other thing is to try with a combined partition, to see if it is a
> problem with the partitioning.
>
> Also, take a look at  the results of an EXPLAIN statement, see if
> there are any hints there.
>
> NOTE: I'm new to hive too.
>
> -Ajo
>
>
> On Tue, Jan 18, 2011 at 8:08 PM, Viral Bajaria <vi...@gmail.com>
> wrote:
>
> I haven't heard back from any on the list and am still struggling to join
>
> two tables on partitioned column
>
>
> Has anyone every tried joining two tables on a paritioned column and the
>
> results are not as expected ?
>
> On Tue, Jan 18, 2011 at 2:04 AM, Viral Bajaria <vi...@gmail.com>
>
> wrote:
>
>
>  I am facing issues with a query where I am joining two fairly large
> tables
>
>  on the partitioned column along with other common columns. The expected
>
>  output is not in line with what I expect it to be. Since the query is
> very
>
>  complex, I will simplify it so that people can provide inputs if they
> have
>
>  faced similar issues or if I am doing something totally wrong.
>
>  TABLE A:
>
>  a_id bigint
>
>  common_id bigint
>
>  some_string string
>
>  total_count bigint
>
>  part_col string <---- this is the partitioned column
>
>  TABLE B:
>
>  b_int bigint
>
>  common_id bigint
>
>  some_string string
>
>  total_sum bigint
>
>  part_col string <---- this is the partitioned column
>
>  now the query is as follows:
>
>  SELECT /*+ STREAMTABLE(A,B) */ A.some_string, B.some_string,
>
>  sum(A.total_count), sum(B.total_sum) from A JOIN B ON (t1.part_col =
>
>  t2.part_col AND t1.common_id = t2.common_id) WHERE t1.part_col >= 'val1'
> AND
>
>  t2.part_col >= 'val1' GROUP BY A.some_string, B.some_string
>
>  Does HIVE not like to join on the partitioned columns ? because when i
>
>  create a join on just the partitioned column the reduce step never
> finishes.
>
>  I am using HIVE 0.5.0
>
>  Thanks,
>
>  Viral
>
>
>
>    Appan Thirumaligai
> appan@ngmoco.com
> Ph:1-818-472-8427
> ngmoco:)
>
>

Re: partitioned column join does not work as expected

Posted by Appan Thirumaligai <at...@ngmoco.com>.
Viral,

I tried the queries below (similar to yours) and I get the expected results when I do the join. I ran my queries after building hive from the latest source and hadoop 0.20+.
	
create table table_a(a_id bigint, common_id bigint, some_string string,total_count bigint) partitioned by (part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
create table table_b(b_id bigint, common_id bigint, some_string string,total_count bigint) partitioned by (part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
dfs -mkdir /user/data/table_a;
dfs -mkdir /user/data/table_b;
dfs -put /home/training/hiveug/table_a.csv /user/data/table_a;
dfs -put /home/training/hiveug/table_b.csv /user/data/table_b;
alter table table_a add partition (part_col = 'mypart') location '/user/data/table_a';
alter table table_b add partition (part_col = 'mypart') location '/user/data/table_b';
select * from table_a t1 join table_b t2 on t1.part_col == t2.part_col; 
-->> Returns expected result
select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_string;
--->>Works fine.
select t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_st* from table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >= 'mypart' and t2.part_col >= 'mypart';
--->Works fine.

I created the two files with sample data in them and copied it to hdfs

I'll try later on your hive 0.5.0 but looks like there might be something wrong in your query.

On Jan 18, 2011, at 8:40 PM, Ajo Fod wrote:

> Can you try this with a dummy table with very few rows ... to see if
> the reason the script doesn't finish is a computational issue?
> 
> One other thing is to try with a combined partition, to see if it is a
> problem with the partitioning.
> 
> Also, take a look at  the results of an EXPLAIN statement, see if
> there are any hints there.
> 
> NOTE: I'm new to hive too.
> 
> -Ajo
> 
> 
> On Tue, Jan 18, 2011 at 8:08 PM, Viral Bajaria <vi...@gmail.com> wrote:
>> I haven't heard back from any on the list and am still struggling to join
>> two tables on partitioned column
>> 
>> Has anyone every tried joining two tables on a paritioned column and the
>> results are not as expected ?
>> On Tue, Jan 18, 2011 at 2:04 AM, Viral Bajaria <vi...@gmail.com>
>> wrote:
>>> 
>>> I am facing issues with a query where I am joining two fairly large tables
>>> on the partitioned column along with other common columns. The expected
>>> output is not in line with what I expect it to be. Since the query is very
>>> complex, I will simplify it so that people can provide inputs if they have
>>> faced similar issues or if I am doing something totally wrong.
>>> TABLE A:
>>> a_id bigint
>>> common_id bigint
>>> some_string string
>>> total_count bigint
>>> part_col string <---- this is the partitioned column
>>> TABLE B:
>>> b_int bigint
>>> common_id bigint
>>> some_string string
>>> total_sum bigint
>>> part_col string <---- this is the partitioned column
>>> now the query is as follows:
>>> SELECT /*+ STREAMTABLE(A,B) */ A.some_string, B.some_string,
>>> sum(A.total_count), sum(B.total_sum) from A JOIN B ON (t1.part_col =
>>> t2.part_col AND t1.common_id = t2.common_id) WHERE t1.part_col >= 'val1' AND
>>> t2.part_col >= 'val1' GROUP BY A.some_string, B.some_string
>>> Does HIVE not like to join on the partitioned columns ? because when i
>>> create a join on just the partitioned column the reduce step never finishes.
>>> I am using HIVE 0.5.0
>>> Thanks,
>>> Viral
>> 

Appan Thirumaligai
appan@ngmoco.com
Ph:1-818-472-8427
ngmoco:)


Re: partitioned column join does not work as expected

Posted by Ajo Fod <aj...@gmail.com>.
Can you try this with a dummy table with very few rows ... to see if
the reason the script doesn't finish is a computational issue?

One other thing is to try with a combined partition, to see if it is a
problem with the partitioning.

Also, take a look at  the results of an EXPLAIN statement, see if
there are any hints there.

NOTE: I'm new to hive too.

-Ajo


On Tue, Jan 18, 2011 at 8:08 PM, Viral Bajaria <vi...@gmail.com> wrote:
> I haven't heard back from any on the list and am still struggling to join
> two tables on partitioned column
>
> Has anyone every tried joining two tables on a paritioned column and the
> results are not as expected ?
> On Tue, Jan 18, 2011 at 2:04 AM, Viral Bajaria <vi...@gmail.com>
> wrote:
>>
>> I am facing issues with a query where I am joining two fairly large tables
>> on the partitioned column along with other common columns. The expected
>> output is not in line with what I expect it to be. Since the query is very
>> complex, I will simplify it so that people can provide inputs if they have
>> faced similar issues or if I am doing something totally wrong.
>> TABLE A:
>> a_id bigint
>> common_id bigint
>> some_string string
>> total_count bigint
>> part_col string <---- this is the partitioned column
>> TABLE B:
>> b_int bigint
>> common_id bigint
>> some_string string
>> total_sum bigint
>> part_col string <---- this is the partitioned column
>> now the query is as follows:
>> SELECT /*+ STREAMTABLE(A,B) */ A.some_string, B.some_string,
>> sum(A.total_count), sum(B.total_sum) from A JOIN B ON (t1.part_col =
>> t2.part_col AND t1.common_id = t2.common_id) WHERE t1.part_col >= 'val1' AND
>> t2.part_col >= 'val1' GROUP BY A.some_string, B.some_string
>> Does HIVE not like to join on the partitioned columns ? because when i
>> create a join on just the partitioned column the reduce step never finishes.
>> I am using HIVE 0.5.0
>> Thanks,
>> Viral
>

Re: partitioned column join does not work as expected

Posted by Viral Bajaria <vi...@gmail.com>.
I haven't heard back from any on the list and am still struggling to join
two tables on partitioned column

Has anyone every tried joining two tables on a paritioned column and the
results are not as expected ?
On Tue, Jan 18, 2011 at 2:04 AM, Viral Bajaria <vi...@gmail.com>wrote:

> I am facing issues with a query where I am joining two fairly large tables
> on the partitioned column along with other common columns. The expected
> output is not in line with what I expect it to be. Since the query is very
> complex, I will simplify it so that people can provide inputs if they have
> faced similar issues or if I am doing something totally wrong.
>
> TABLE A:
> a_id bigint
> common_id bigint
> some_string string
> total_count bigint
> part_col string <---- this is the partitioned column
>
> TABLE B:
> b_int bigint
> common_id bigint
> some_string string
> total_sum bigint
> part_col string <---- this is the partitioned column
>
> now the query is as follows:
> SELECT /*+ STREAMTABLE(A,B) */ A.some_string, B.some_string,
> sum(A.total_count), sum(B.total_sum) from A JOIN B ON (t1.part_col =
> t2.part_col AND t1.common_id = t2.common_id) WHERE t1.part_col >= 'val1' AND
> t2.part_col >= 'val1' GROUP BY A.some_string, B.some_string
>
> Does HIVE not like to join on the partitioned columns ? because when i
> create a join on just the partitioned column the reduce step never finishes.
>
> I am using HIVE 0.5.0
>
> Thanks,
> Viral
>