You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sqoop.apache.org by "Jarek Jarcec Cecho (JIRA)" <ji...@apache.org> on 2015/03/03 19:57:04 UTC

[jira] [Commented] (SQOOP-1803) JobManager and Execution Engine changes: Support for a injecting and pulling out configs and job output in connectors

    [ https://issues.apache.org/jira/browse/SQOOP-1803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14345504#comment-14345504 ] 

Jarek Jarcec Cecho commented on SQOOP-1803:
-------------------------------------------

I was thinking about this one a bit myself. I have couple of thoughts of getting data back from the execution engine and I'm wondering what others thinks. Please don't hesitate and chime in if I missed any approach.

1) DistributedCache

In addition to [~gwenshap] comments about supportability (and/or debuggability) of {{DistributedCache}}, to my best knowledge it can be only used to distribute data from the launcher (Sqoop Server) to children mapreduce tasks. I do not believe that it can be used to the other way around to get files or data from individual tasks back to the launcher. Looking at the [latest javadocs|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html] it seems still valid as the documentation contains note about immutability of the cache when the job is submitted:

{quote}
DistributedCache tracks modification timestamps of the cache files. Clearly the cache files should not be modified by the application or externally while the job is executing.
{quote}

*Summary:* I believe that this solution is disqualified for the retrieving data back from execution engine.

2) Counters

[Counters|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/Counters.html] are nice technique how to get insight into what is the mapreduce job doing. Multiple mappers can be updating the counters in parallel and it's mapreduce responsibility to ensure that counters from all children tasks are sum-upped correctly. The limitation of this solution is that counters can be only {{long}} based (e.g. no {{String}}, {{Date}}, ...). Also the counters are cumulative in nature, so it might be a bit difficult to retrieve discrete values - we would need to ensure that only certain mappers/reducers will update given counters whereas others won't or we would need to figure out a way how to decode one single value when multiple mapper/reducers will update one counter at the same time.

*Summary:* Whereas it's not impossible to use the counters to retrieve data from execution engine, it seems that this solution will impose limitations and will be "difficult" to implement and maintain.

3) HDFS Files

Looking into how others are solving this problem, [Oozie|http://oozie.apache.org] launcher tasks (=one map mapreduce jobs) are generating files on HDFS in predefined directory from where the Oozie server will pick them up to read any arbitrary values. This is neat solution as it allows us to retrieve any value of any type from any part of the workflow (all processes can create their own files if needed). The downside is that we would need to agree on certain location where Server and the mapreduce job will be exchanging files - this directory must exists and must be accessible by both Sqoop (running under system user) and the mapreduce job itself (most likely running as end user). I believe that HDFS ACLs can be easily used to accomplish this task.

We would need to be careful here with edge conditions - we would need to make sure that we're cleaning up old and unused files (job failures, ...) and that we are not leaking any sensitive information to the HDFS.

*Summary:* Possible solution that will support all our use cases, but will be a bit harder to implement.

4) Server side exchange only

I was also looking into how things work currently in the server and I've realized something that made me thing about this proposal. Back when we were defining the workflow, the intention was that only {{Initializer}} is allowed to generate state whereas all other parts of the workflow {{Partitioner}}, {{Extractor}}, {{Loader}} and {{Destroyer}} should not generate any state and only reuse the one that was pre-prepared in initializer. The reason for that is that {{Initializer}} is run only once whereas all other parts of the workflow are run in parallel and/or not running on Sqoop server itself, hence by allowing state to be generated only in {{Initializer}} we don't have to deal with synchronizing the parallel pieces or deal with limitations in various execution engines. The intention is persisted in the API when {{Initializer}} is given {{MutableContext}} where connector developer can set any properties that will be shared with rest of the workflow (~ the state) and when all other parts are given only {{ImmutableContext}} that doesn't allow any changes to the shared properties. I have to say that we have small exception in the code base, because {{Partitioner}} class is generating {{Partition}} objects that can carry some context as well. However as the {{Partition}} objects are not available in {{Destroyer}}, connector developer still needs to persist state that is required through entire workflow inside the {{Initializer}}.

Having said that, another option seems to be to simply not retrieve anything from the execution engine and let connector update the configuration objects based on info that the connector generated in {{Initializer}} - assuming that the job finished correctly. Looking at current connectors this should work well, as we need to update and persist state that is 'locked' at the {{Initializer}} stage. For database-base connectors the "max value" should be determined in initializer (it's currently not though) and the same for Kafka and other connectors. The beauty of this approach is that it's simple to implement and can actually be easily extended in the future to include data coming from execution engine shell there be a need for it (for the approach 3) for example).

> JobManager and Execution Engine changes: Support for a injecting and pulling out configs and job output in connectors 
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SQOOP-1803
>                 URL: https://issues.apache.org/jira/browse/SQOOP-1803
>             Project: Sqoop
>          Issue Type: Sub-task
>            Reporter: Veena Basavaraj
>            Assignee: Veena Basavaraj
>             Fix For: 1.99.6
>
>
> The details are in the design wiki, as the implementation happens more discussions can happen here.
> https://cwiki.apache.org/confluence/display/SQOOP/Delta+Fetch+And+Merge+Design#DeltaFetchAndMergeDesign-Howtogetoutputfromconnectortosqoop?
> The goal is to dynamically inject a IncrementalConfig instance into the FromJobConfiguration. The current MFromConfig and MToConfig can already hold a list of configs, and a strong sentiment was expressed to keep it as a list, why not for the first time actually make use of it and group the incremental related configs in one config object
> This task will prepare the FromJobConfiguration from the job config data, ExtractorContext with the relevant values from the prev job run 
> This task will prepare the ToJobConfiguration from the job config data, LoaderContext with the relevant values from the prev job run if any
> We will use DistributedCache to get State information from the Extractor and Loader out and finally persist it into the sqoop repository depending on SQOOP-1804 once the outputcommitter commit is called



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)