You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Doug Cutting (JIRA)" <ji...@apache.org> on 2006/06/28 23:12:00 UTC
[jira] Updated: (HADOOP-234) Support for writing Map/Reduce
functions in C++
[ http://issues.apache.org/jira/browse/HADOOP-234?page=all ]
Doug Cutting updated HADOOP-234:
--------------------------------
Fix Version: 0.5.0
(was: 0.4.0)
> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
> Key: HADOOP-234
> URL: http://issues.apache.org/jira/browse/HADOOP-234
> Project: Hadoop
> Type: New Feature
> Components: mapred
> Reporter: Sanjay Dahiya
> Fix For: 0.5.0
> Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce Developer doc.pdf
>
> MapReduce C++ support
> Requirements
> 1. Allow users to write Map and Reduce functions in C++, rest of the
> infrastructure already present in Java should be reused.
> 2. Avoid users having to write both Java and C++ for this to work.
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions.
> 4. Use Record IO for describing record format, both MR java framework and C++ should
> use the same format to work seemlessly.
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are
> simple strings.
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements
> mapper and reducer methods in C++.
> - For composite Record IO types user starts with defining a record format using Record IO DDL.
> - User generates Java and C++ classes from the DDL using record IO.
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes.
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface
> makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
> generated C++ record IO classes.
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector.
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) -
> Native(JNI) Java proxy for the Mapper :
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> /**
> * Works on simple strings.
> **/
> public void map(WritableComparable key, Writable value,
> OutputCollector output, Reporter reporter) throws IOException {
> mapNative(key.toString().getBytes()
> , value.toString().getBytes(), output, reporter);
> }
>
> /**
> * Native implementation.
> **/
> private native void mapNative(byte[] key, byte[] value,
> OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :-
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> /**
> * Implementation of map method, this acts as a JNI proxy for actual map
> * method implemented in C++. Works for Record IO based records.
> * @see map(byte[] , byte[], OutputCollector, Reporter)
> */
> public void map(WritableComparable key, Writable value,
> OutputCollector output, Reporter reporter) throws IOException {
>
> byte[] keyBytes = null ;
> byte[] valueBytes = null ;
>
> try{
> // we need to serialize the key and record and pass the serialized
> // format to C++ / JNI methods so they can interpret it using appropriate
> // record IO classes.
> {
> ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ;
> BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
>
> ((Record)key).serialize(boa, "WhatIsTag");
> keyBytes = keyStream.toByteArray();
> }
> {
> ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ;
> BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
>
> ((Record)key).serialize(boa, "WhatIsTag");
> valueBytes = valueStream.toByteArray();
> }
> }catch(ClassCastException e){
> // throw better exceptions
> throw new IOException("Input record must be of Record IO Type");
> }
> // pass the serialized byte[] to C++ implementation.
> mapNative(keyBytes, valueBytes, output, reporter);
> }
> /**
> * Implementation in C++.
> */
> private native void mapNative(byte[] key, byte[] value,
> OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> // standard method from interface
> public void collect(WritableComparable key, Writable value)
> throws IOException {
> }
>
> // deserializes key and value and calls collect(WritableComparable, Writable)
> public void collectFromNative(byte[]key, byte[]value){
> // deserialize key and value to java types ( as configured in JobConf )
> // call actual collect method
> }
> }
> Core Native functions ( helper for user provided Mapper and Reducer )
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> * A C++ proxy method, calls actual implementation of the Mapper. This method
> signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
> (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value,
> jobject output_collector, jobject reporter);
> {
>
> // convert char* and pass on to user defined map method.
> // user's map method should take care of converting it to correct record IO
> // type.
> int keyLen = (*env)->GetArrayLength(env, key) ;
> int valueLen = (*env)->GetArrayLength(env, valueLen) ;
> const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
> const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>
> // Call User defined method
> user_map(keyBuf, valueBuf, output_collector, reporter) ;
>
> (*env)->ReleaseByteArrayElements(env, key, keyBuf, JNI_ABORT) ;
> (*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ;
> }
> /**
> Helper method, acts as a proxy to OutputCollector in java. key and value
> must be serialized forms of records as specified in JobConf.
> **/
> void output_collector(const char * key, const char *value,
> jobject output_collector, jobject reporter){
>
> // invoke java NativeOutputCollector.collect with key and value.
> }
> User defined Mapper ( and Reducer )
> ------------------------------------
> /**
> implements user defined map operation.
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> //1. deserialize key/value in the appropriate format using record IO.
>
> //2. process key/value and generate the intermediate key/values in record IO format.
>
> //3. Deserialize intermediate key/values to intermed_key and intermed_value
>
> //4. pass intermed_key/intermed_value using helper function -
> // output_collector(intermed_key, intermed_value, collector, recorder);
>
>
> }
--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
http://www.atlassian.com/software/jira