You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Steve Loughran <st...@cloudera.com.INVALID> on 2021/07/07 21:41:03 UTC

MAPREDUCE-7341. Intermediate Manifest Committer for Azure + GCS

My little committer project, an intermediate manifest committer for Azure
and GCS is reaching the stage where it's ready for others to look at

https://github.com/apache/hadoop/pull/2971

Goals

   1. Correctness even on GCS, which doesn't have atomic dir rename (so v1
   isn't safe). It does use FileSystem.rename() for commits; this is not for
   S3.
   2. Performance closer to Mr v2 than v1.
   3. Workaround some issues with the object stores (most recently, ABFS
   timing out on delete of deep dir trees
   https://issues.apache.org/jira/browse/HADOOP-17691 )
   4. integrated stats collection and reporting
   5. Not going near any of the working committer code
   6. Output 100% compatible with existing directory-partition model. The
   manifests are only intermediate


architecture doc
https://github.com/steveloughran/hadoop/blob/mr/MAPREDUCE-7341-manifest-committer/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md

user guide
https://github.com/steveloughran/hadoop/blob/mr/MAPREDUCE-7341-manifest-committer/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md

1. This is really targeting spark. It works on MR, I've just avoided the
job restart problem because spark doesn't attempt to do that.
2. It will work on HDFS, but I'm not testing there or targeting it. Things
like mkdir() are fast there, as is dir rename.


I'm reusing the same JSON report in a _SUCCESS marker as the s3a
committers; adding

   1. collecting IOStatistics timing of all operations invoked during task
   and job commits
   2. also supporting an option to save all stats reports to a directory in
   a cluster FS, including even on job failures/abort.

Attached what a stats report looks like on a mini terasort (byte sort).
It's collecting the task commit numbers via the intermediate manifests, and
aggregating them. No collection of actual IO during task execution; we'd
need per thread collection of stats there.

Please have a look at the PR. Yes, it's big, but it's broken up into lots
of stages for ease of understanding and testing. One of the lessons from
the S3A Committer work.

-Steve


------

{
  "name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
  "timestamp" : 1625693109922,
  "success" : true,
  "date" : "2021-07-07T22:25:09.922+01:00[Europe/London]",
  "hostname" : "stevel-mbp15-13176.local",
  "committer" :
"org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter",
  "description" : null,
  "jobId" : "job_1625693035829_0001",
  "jobIdSource" : "JobID",
  "metrics" : { },
  "diagnostics" : {
    "principal" : "stevel",
    "stage" : "committer_commit_job"
  },
  "filenames" : [ "abfs://
stevel-testing@ukwest.dfs.core.windows.net/terasort/sortin/part-m-00001",
"abfs://
stevel-testing@ukwest.dfs.core.windows.net/terasort/sortin/part-m-00000" ],
  "state" : null,
  "iostatistics" : {
    "counters" : {
      "op_job_stage_abort" : 0,
      "committer_commit_job" : 1,
      "op_load_manifest.failures" : 0,
      "op_job_stage_optional_validate_output.failures" : 0,
      "op_load_all_manifests" : 1,
      "op_rename.failures" : 0,
      "op_job_stage_create_target_dirs" : 1,
      "op_task_stage_save_manifest" : 0,
      "committer_commit_job.failures" : 0,
      "op_job_stage_cleanup.failures" : 0,
      "op_stage_task_scan_directory" : 2,
      "op_task_stage_abort_task.failures" : 0,
      "op_job_stage_rename_files.failures" : 0,
      "committer_bytes_committed" : 100000,
      "op_create_one_directory.failures" : 0,
      "op_mkdirs.failures" : 0,
      "committer_files_committed" : 2,
      "op_load_all_manifests.failures" : 0,
      "op_save_task_manifest" : 1,
      "op_job_stage_load_manifests" : 1,
      "op_task_stage_setup" : 2,
      "op_stage_task_commit" : 2,
      "op_create_directories.failures" : 0,
      "op_stage_task_scan_directory.failures" : 0,
      "op_create_one_directory" : 0,
      "op_save_task_manifest.failures" : 0,
      "op_directory_scan" : 3,
      "op_rename" : 3,
      "op_get_file_status" : 8,
      "op_job_stage_rename_files" : 1,
      "op_is_directory.failures" : 0,
      "op_job_stage_abort.failures" : 0,
      "op_job_stage_optional_validate_output" : 1,
      "op_task_stage_setup.failures" : 0,
      "op_delete" : 8,
      "op_job_stage_setup" : 1,
      "op_directory_scan.failures" : 0,
      "object_list_request.failures" : 0,
      "op_load_manifest" : 2,
      "object_continue_list_request.failures" : 0,
      "committer_tasks_completed" : 0,
      "op_task_stage_save_manifest.failures" : 0,
      "op_get_file_status.failures" : 3,
      "op_task_stage_abort_task" : 1,
      "op_delete.failures" : 0,
      "object_list_request" : 0,
      "op_job_stage_create_target_dirs.failures" : 0,
      "op_list_status" : 4,
      "committer_tasks_failed" : 0,
      "op_job_stage_cleanup" : 1,
      "op_job_stage_setup.failures" : 0,
      "op_job_stage_save_success_marker" : 1,
      "object_continue_list_request" : 0,
      "op_stage_task_commit.failures" : 0,
      "op_create_directories" : 1,
      "op_job_stage_load_manifests.failures" : 0,
      "op_mkdirs" : 3,
      "op_is_directory" : 0,
      "op_job_stage_save_success_marker.failures" : 0,
      "op_list_status.failures" : 0
    },
    "gauges" : { },
    "minimums" : {
      "op_create_directories.failures.min" : -1,
      "op_is_directory.failures.min" : -1,
      "object_list_request.min" : -1,
      "op_task_stage_setup.min" : 121,
      "op_job_stage_create_target_dirs.failures.min" : -1,
      "object_continue_list_request.failures.min" : -1,
      "object_list_request.failures.min" : -1,
      "op_job_stage_rename_files.min" : 218,
      "op_directory_scan.min" : 34,
      "op_job_stage_optional_validate_output.min" : 104,
      "op_job_stage_setup.failures.min" : -1,
      "op_list_status.min" : 23,
      "op_mkdirs.failures.min" : -1,
      "op_job_stage_abort.failures.min" : -1,
      "committer_bytes_committed" : -1,
      "committer_files_committed" : -1,
      "op_rename.failures.min" : -1,
      "op_load_all_manifests.min" : 399,
      "op_job_stage_save_success_marker.failures.min" : -1,
      "op_load_manifest.failures.min" : -1,
      "committer_commit_job.min" : 1007,
      "op_stage_task_scan_directory.failures.min" : -1,
      "committer_commit_job.failures.min" : -1,
      "op_delete.min" : 24,
      "op_job_stage_cleanup.min" : 151,
      "op_get_file_status.min" : 21,
      "op_load_all_manifests.failures.min" : -1,
      "op_get_file_status.failures.min" : 47,
      "op_create_one_directory.failures.min" : -1,
      "op_load_manifest.min" : 208,
      "op_create_one_directory.min" : -1,
      "op_mkdirs.min" : 29,
      "op_save_task_manifest.min" : 99,
      "op_task_stage_save_manifest.min" : -1,
      "op_job_stage_abort.min" : -1,
      "op_job_stage_optional_validate_output.failures.min" : -1,
      "op_list_status.failures.min" : -1,
      "op_directory_scan.failures.min" : -1,
      "op_job_stage_load_manifests.failures.min" : -1,
      "committer_tasks_completed" : -1,
      "op_job_stage_create_target_dirs.min" : 2,
      "object_continue_list_request.min" : -1,
      "op_task_stage_setup.failures.min" : -1,
      "op_job_stage_setup.min" : 693,
      "committer_tasks_failed" : -1,
      "op_stage_task_scan_directory.min" : -1,
      "op_job_stage_rename_files.failures.min" : -1,
      "op_save_task_manifest.failures.min" : -1,
      "op_delete.failures.min" : -1,
      "op_stage_task_commit.failures.min" : -1,
      "op_stage_task_commit.min" : -1,
      "op_create_directories.min" : 1,
      "op_job_stage_load_manifests.min" : 554,
      "op_task_stage_abort_task.min" : 256,
      "op_is_directory.min" : -1,
      "op_task_stage_abort_task.failures.min" : -1,
      "op_rename.min" : 51,
      "op_job_stage_save_success_marker.min" : 196,
      "op_job_stage_cleanup.failures.min" : -1,
      "op_task_stage_save_manifest.failures.min" : -1
    },
    "maximums" : {
      "op_job_stage_load_manifests.max" : 554,
      "op_load_all_manifests.failures.max" : -1,
      "op_job_stage_save_success_marker.max" : 196,
      "op_job_stage_abort.failures.max" : -1,
      "op_stage_task_commit.max" : -1,
      "op_create_one_directory.max" : -1,
      "op_is_directory.failures.max" : -1,
      "object_list_request.failures.max" : -1,
      "op_task_stage_setup.max" : 885,
      "op_job_stage_create_target_dirs.failures.max" : -1,
      "committer_bytes_committed" : -1,
      "committer_files_committed" : -1,
      "op_job_stage_optional_validate_output.max" : 104,
      "op_directory_scan.max" : 149,
      "op_mkdirs.max" : 339,
      "op_create_one_directory.failures.max" : -1,
      "op_get_file_status.failures.max" : 803,
      "op_job_stage_cleanup.max" : 151,
      "op_save_task_manifest.max" : 99,
      "op_create_directories.failures.max" : -1,
      "op_rename.failures.max" : -1,
      "op_job_stage_save_success_marker.failures.max" : -1,
      "op_job_stage_setup.failures.max" : -1,
      "op_job_stage_load_manifests.failures.max" : -1,
      "op_directory_scan.failures.max" : -1,
      "op_job_stage_rename_files.max" : 218,
      "op_delete.max" : 256,
      "object_list_request.max" : -1,
      "committer_commit_job.max" : 1007,
      "op_task_stage_abort_task.max" : 256,
      "op_get_file_status.max" : 95,
      "op_delete.failures.max" : -1,
      "object_continue_list_request.max" : -1,
      "op_job_stage_abort.max" : -1,
      "op_is_directory.max" : -1,
      "op_task_stage_save_manifest.max" : -1,
      "op_job_stage_cleanup.failures.max" : -1,
      "op_list_status.failures.max" : -1,
      "op_save_task_manifest.failures.max" : -1,
      "op_load_manifest.max" : 384,
      "committer_tasks_completed" : -1,
      "op_load_manifest.failures.max" : -1,
      "op_stage_task_scan_directory.failures.max" : -1,
      "committer_commit_job.failures.max" : -1,
      "op_task_stage_save_manifest.failures.max" : -1,
      "op_job_stage_create_target_dirs.max" : 2,
      "op_create_directories.max" : 1,
      "op_stage_task_commit.failures.max" : -1,
      "op_mkdirs.failures.max" : -1,
      "committer_tasks_failed" : -1,
      "op_list_status.max" : 97,
      "op_task_stage_setup.failures.max" : -1,
      "op_task_stage_abort_task.failures.max" : -1,
      "op_rename.max" : 165,
      "op_job_stage_optional_validate_output.failures.max" : -1,
      "op_load_all_manifests.max" : 399,
      "op_job_stage_setup.max" : 693,
      "object_continue_list_request.failures.max" : -1,
      "op_job_stage_rename_files.failures.max" : -1,
      "op_stage_task_scan_directory.max" : -1
    },
    "meanstatistics" : {
      "op_job_stage_optional_validate_output.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_rename.mean" : {
        "samples" : 3,
        "sum" : 366
      },
      "op_load_manifest.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_is_directory.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_create_one_directory.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_create_target_dirs.mean" : {
        "samples" : 1,
        "sum" : 2
      },
      "op_rename.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_abort.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_setup.mean" : {
        "samples" : 1,
        "sum" : 693
      },
      "committer_bytes_committed" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_delete.mean" : {
        "samples" : 8,
        "sum" : 531
      },
      "op_stage_task_scan_directory.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "committer_files_committed" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_abort.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_save_success_marker.mean" : {
        "samples" : 1,
        "sum" : 196
      },
      "op_load_manifest.mean" : {
        "samples" : 2,
        "sum" : 592
      },
      "op_save_task_manifest.mean" : {
        "samples" : 1,
        "sum" : 99
      },
      "op_task_stage_save_manifest.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_optional_validate_output.mean" : {
        "samples" : 1,
        "sum" : 104
      },
      "op_is_directory.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_mkdirs.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_cleanup.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "object_list_request.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_task_stage_abort_task.mean" : {
        "samples" : 1,
        "sum" : 256
      },
      "object_list_request.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_task_stage_setup.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_get_file_status.failures.mean" : {
        "samples" : 3,
        "sum" : 1149
      },
      "op_job_stage_setup.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "object_continue_list_request.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_task_stage_save_manifest.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_directory_scan.mean" : {
        "samples" : 3,
        "sum" : 247
      },
      "op_job_stage_save_success_marker.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_rename_files.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_cleanup.mean" : {
        "samples" : 1,
        "sum" : 151
      },
      "op_job_stage_load_manifests.mean" : {
        "samples" : 1,
        "sum" : 554
      },
      "op_get_file_status.mean" : {
        "samples" : 5,
        "sum" : 210
      },
      "committer_tasks_completed" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_list_status.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_create_directories.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_load_all_manifests.mean" : {
        "samples" : 1,
        "sum" : 399
      },
      "committer_commit_job.mean" : {
        "samples" : 1,
        "sum" : 1007
      },
      "op_create_one_directory.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_load_all_manifests.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_directory_scan.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_mkdirs.mean" : {
        "samples" : 3,
        "sum" : 423
      },
      "op_list_status.mean" : {
        "samples" : 4,
        "sum" : 217
      },
      "op_task_stage_setup.mean" : {
        "samples" : 2,
        "sum" : 1006
      },
      "committer_commit_job.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "committer_tasks_failed" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_create_target_dirs.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_create_directories.mean" : {
        "samples" : 1,
        "sum" : 1
      },
      "op_job_stage_rename_files.mean" : {
        "samples" : 1,
        "sum" : 218
      },
      "op_stage_task_scan_directory.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_task_stage_abort_task.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "object_continue_list_request.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_delete.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_stage_task_commit.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_job_stage_load_manifests.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_stage_task_commit.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      },
      "op_save_task_manifest.failures.mean" : {
        "samples" : 0,
        "sum" : 0
      }
    }
  }
}