You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@crunch.apache.org by "Attila Sasvari (JIRA)" <ji...@apache.org> on 2017/03/08 17:05:38 UTC

[jira] [Updated] (CRUNCH-636) Make replication factor for temporary files configurable

     [ https://issues.apache.org/jira/browse/CRUNCH-636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Attila Sasvari updated CRUNCH-636:
----------------------------------
    Attachment: test.WordCount_2017-03-08_16.31.55.737.log
                test.WordCount_2017-03-08_16.31.55.737_jobplan.dot.png

I believe I have found a way to handle the issue. Previously, I set the replication in a wrong place in {{MSCROutputHandler}}. Simply I cannot rely on target path, we need to make sure that a job has only temporary output files when we set the supplied replication factor. 

If a job has both temporary and final files, then we should use the default / initial dfs.replication in the MR job. We can make a decision in the {{build()}} method of {{JobPrototype}}. Before  https://github.com/apache/crunch/blob/7f85ee5816a19eca0e87ce503ea0b03ea294433c/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java#L155 , we create a boolean that maintains if the job at hand has only temporary files. If there is any non-temporary output we set this to false inside the loop. Based on this we can set the proper replication after the loop.

Regarding user configuration, I guess we should allow whatever they set. If MapReduce allows it, we allow it. 

I have attached an example jobplan and a debug log to show what is happening right now.

Following the execution: 
- At the beginning we see Crunch creates all the temporary directories.
{code}
7/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p1
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p2
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p3
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p4
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p5
{code}

- We remember these paths to make sure we can decide whether an output file is temporary or not.

- Then we check current replication factor set for the job and adjust it if we are dealing with a temporary job output:
{code}
Replication factor: 3
17/03/08 16:31:55 INFO plan.JobPrototype: --- target :Text(/user/asasvari/5FXm_union_count)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: Text(/user/asasvari/5FXm_union_count)
17/03/08 16:31:55 INFO plan.JobPrototype: Setting initial replication factor (3)
Replication factor: 3
17/03/08 16:31:55 INFO plan.JobPrototype: --- target :SeqFile(/tmp/crunch-1543087915/p2)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: SeqFile(/tmp/crunch-1543087915/p2)
17/03/08 16:31:55 INFO plan.JobPrototype: Setting replication factor to: 9 
Replication factor: 3
17/03/08 16:31:55 INFO plan.JobPrototype: --- target :Text(/user/asasvari/5FXm2)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: Text(/user/asasvari/5FXm2)
17/03/08 16:31:55 INFO plan.JobPrototype: --- target :SeqFile(/tmp/crunch-1543087915/p1)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: SeqFile(/tmp/crunch-1543087915/p1)
17/03/08 16:31:55 INFO plan.JobPrototype: Setting initial replication factor (3)
{code}

* Crunch Job 3 creates only a temporary file {{ /tmp/crunch-1543087915/p2}} so we set replication factor to 9 (user specified value of {{crunch.tmp.dir.replication}}). 
{code}
$ hdfs dfs -ls  /tmp/crunch-1543087915/p2        
17/03/08 17:44:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 asasvari supergroup          0 2017-03-08 16:32 /tmp/crunch-1543087915/p2/_SUCCESS
-rw-r--r--   9 asasvari supergroup       1183 2017-03-08 16:32 /tmp/crunch-1543087915/p2/part-r-00000
{code}
* Crunch Job 2 produces a temporary file and a non-temporary file, so we must replication factor to the original {{dfs.replication}}.
* Crunch Job 1 has 1 non-temporary output file, so {{dfs.replication}} is used.
{code}
hdfs dfs -ls  /user/asasvari/iwTV_union_count 
17/03/08 17:45:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 asasvari supergroup          0 2017-03-08 17:06 /user/asasvari/iwTV_union_count/_SUCCESS
-rw-r--r--   3 asasvari supergroup        921 2017-03-08 17:06 /user/asasvari/iwTV_union_count/part-r-00000
{code}

I had some other questions regarding failure of a big pipeline. 
- Let's assume replication factor is set to 1 for temporary files, and we have a huge pipeline with hundreds of nodes. Node fails that stores temporary data very near to the end of pipeline execution, so the last job cannot finish. So the pipeline failed. There are no rollback actions are performed that delete outputs by earlier jobs. With checkpointing enabled, previous successful jobs are not necessary to perform.

- Does it make sense to allow setting replication higher than initial for temporary files? Probably not, but we can allow it. 



> Make replication factor for temporary files configurable
> --------------------------------------------------------
>
>                 Key: CRUNCH-636
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-636
>             Project: Crunch
>          Issue Type: New Feature
>            Reporter: Attila Sasvari
>            Assignee: Attila Sasvari
>         Attachments: test.WordCount_2017-03-08_16.31.55.737_jobplan.dot.png, test.WordCount_2017-03-08_16.31.55.737.log
>
>
> As of now, Crunch does not allow having different replication factor for temporary files and non-temporary files (e.g. final output data of leaf nodes) at the same time. If a user has a large amount of data (say hundreds a of gigabytes) to process, they might want to have lower replication factor for large temporary files between Crunch jobs. 
> We could make this configurable via a new setting (e.g. {{crunch.tmp.dir.replication}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)