You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shashank Pedamallu (Jira)" <ji...@apache.org> on 2021/08/10 00:06:00 UTC

[jira] [Commented] (SPARK-32709) Write Hive ORC/Parquet bucketed table with hivehash (for Hive 1,2)

    [ https://issues.apache.org/jira/browse/SPARK-32709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396330#comment-17396330 ] 

Shashank Pedamallu commented on SPARK-32709:
--------------------------------------------

Issue observed at Lyft. When attempting to apply the patch on production query, the query fails eventually due to S3 throttling issues due to too many files generated by the bucketing unlike hive. To overcome the too many small files problem, we tried reducing the number of reducers which is creating OOM issues. We enabled adaptive query execution which reduced the number of reducers to 44. But even with 44 reducers and number of buckets being 1024, the final number of files as 45057 is little higher compared to 1024 end files in Hive. This method did not seem to work effectively on larger tables (even with AQE, we would get hit by S3 throttling).

*Query (I anonymized the names for privacy. please let me know if that's a concern)*:

 
{noformat}
-- Default configurations SET hive.exec.compress.output=true;SET hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
SET mapred.output.compress=true;
SET parquet.compression=SNAPPY;
SET mapreduce.input.fileinputformat.split.maxsize=256000000;
SET mapreduce.input.fileinputformat.split.minsize=64000000;
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=32;
SET hive.hadoop.supports.splittable.combineinputformat=true;
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions=1000;
SET hive.exec.max.dynamic.partitions.pernode=1000;-- User configurations SET hive.enforce.bucketing = true;
SET hive.mapred.mode = nonstrict;
SET hive.exec.max.created.files=1800000;
SET hive.execution.engine=tez;-- spark configs
SET spark.executor.memory=8g;
SET spark.driver.memoryOverhead=4g;
SET spark.driver.memory=12g;
SET spark.sql.adaptive.advisoryPartitionSizeInBytes=1536MB;
SET spark.sql.adaptive.coalescePartitions.minPartitionNum=16;
DROP TABLE IF EXISTS anon_table_a;

WITH anon_table_a
AS (
SELECT col_a,
col_b,
col_c,
RANK () OVER (PARTITION BY col_b ORDER BY col_c) AS alias_a
FROM schema_a.src_table_a
WHERE col_d IS NOT NULL
DISTRIBUTE BY col_b SORT BY col_c
),
anon_table_b AS (
    SELECT
        col_e
    FROM
        (
            SELECT
                col_e,
                ROW_NUMBER() OVER (PARTITION BY col_e) AS rn
            FROM
                schema_b.src_table_b
            WHERE
                1 = 1
        ) v
    WHERE
        rn = 1
)INSERT OVERWRITE TABLE personal_schema.temp_dest_table SELECT
    data.*
FROM
    (
        
        SELECT col_a,
        col_b,
        alias_a
        FROM anon_table_a
    ) data
    LEFT OUTER JOIN
    anon_table_b
        ON data.col_b = anon_table_b.col_e
WHERE
    (anon_table_b.col_e IS NULL OR data.col_b is null );
DROP TABLE IF EXISTS personal_schema.dest_table;
ALTER TABLE personal_schema.temp_dest_table RENAME TO personal_schema.dest_table
{noformat}
 

*Spark shuffle metrics:*
 !91275701_stage6_metrics.png|width=300,height=160!

Average file size in the final path is ~500kb when writing from Spark compared to 29MB in Hive

*Question:*
So, just wanted to raise the question again about if there is any active / planned effort to support 1 file per reducer for buckted tables?

> Write Hive ORC/Parquet bucketed table with hivehash (for Hive 1,2)
> ------------------------------------------------------------------
>
>                 Key: SPARK-32709
>                 URL: https://issues.apache.org/jira/browse/SPARK-32709
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Priority: Minor
>         Attachments: 91275701_stage6_metrics.png
>
>
> Hive ORC/Parquet write code path is same as data source v1 code path (FileFormatWriter). This JIRA is to add the support to write Hive ORC/Parquet bucketed table with hivehash. The change is to custom `bucketIdExpression` to use hivehash when the table is Hive bucketed table, and the Hive version is 1.x.y or 2.x.y.
>  
> This will allow us write Hive/Presto-compatible bucketed table for Hive 1 and 2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org