You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "feiwang (Jira)" <ji...@apache.org> on 2019/09/12 14:40:00 UTC

[jira] [Comment Edited] (SPARK-29037) [Core] Spark gives duplicate result when an application was killed and rerun

    [ https://issues.apache.org/jira/browse/SPARK-29037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928596#comment-16928596 ] 

feiwang edited comment on SPARK-29037 at 9/12/19 2:39 PM:
----------------------------------------------------------

[~advancedxy]  
1. We re-submit the same application again.

We meet this issue when insert overwrite table, so it is feasible to resolve in user side.

Yes, this issue can be resolved in the hadoop side, but it involves a new release of hadoop. We can do it in spark side.

About output check, I think it is not appropriate, because when several application(insert overwrite a partition of same table) running at same time, they may use the same committedTaskPath.

So, I think we could implement a spark's FileCommitProtocol reference the implementation of `InsertIntoHiveTable`.
// org.apache.spark.sql.hive.execution.InsertIntoHiveTable
For InsertIntoHiveTable, it first saveAsHiveFile(commit all tasks' output) to a hive-staging dir as shown in the log below.

{code:java}
19/09/12 02:47:46 INFO FileOutputCommitter: Saved output of task 'attempt_20190912024744_0004_m_000000_0' to hdfs://hercules-sub/user/b_hive_dba/fwang12_test/test_merge/.hive-staging_hive_2019-09-12_02-47-44_798_6385324183561649436-1/-ext-10000/_temporary/0/task_20190912024744_0004_m_000000
{code}

Then it load these output to hive table.





was (Author: hzfeiwang):
[~advancedxy]  
1. We re-submit the same application again.

We meet this issue when insert overwrite table, so it is feasible to resolve in user side.

Yes, this issue can be resolved in the hadoop side, but it involves a new release of hadoop. We can do it in spark side.

About output check, I think it is not appropriate, because when several application(insert overwrite a partition of same table) running at same time, they may use the same committedTaskPath.

So, I think we could implement a spark's FileCommitProtocol reference the implement of `InsertIntoHiveTable`.
// org.apache.spark.sql.hive.execution.InsertIntoHiveTable
For InsertIntoHiveTable, it first saveAsHiveFile(commit all tasks' output) to a hive-staging dir as shown in the log below.

{code:java}
19/09/12 02:47:46 INFO FileOutputCommitter: Saved output of task 'attempt_20190912024744_0004_m_000000_0' to hdfs://hercules-sub/user/b_hive_dba/fwang12_test/test_merge/.hive-staging_hive_2019-09-12_02-47-44_798_6385324183561649436-1/-ext-10000/_temporary/0/task_20190912024744_0004_m_000000
{code}

Then it load these output to hive table.




> [Core] Spark gives duplicate result when an application was killed and rerun
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-29037
>                 URL: https://issues.apache.org/jira/browse/SPARK-29037
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0, 2.3.3
>            Reporter: feiwang
>            Priority: Major
>         Attachments: screenshot-1.png
>
>
> When we insert overwrite a partition of table.
> For a stage, whose tasks commit output, a task saves output to a staging dir firstly,  when this task complete, it will save output to committedTaskPath, when all tasks of this stage success, all task output under committedTaskPath will be moved to destination dir.
> However, when we kill an application, which is committing tasks' output, parts of tasks' results will be kept in committedTaskPath, which would not be cleared gracefully.
> Then we rerun this application and the new application will reuse this committedTaskPath dir.
> And when the task commit stage of new application success, all task output under this committedTaskPath, which contains parts of old application's task output , would be moved to destination dir and the result is duplicated.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org