You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Doug Cutting (JIRA)" <ji...@apache.org> on 2006/06/28 23:12:00 UTC

[jira] Updated: (HADOOP-234) Support for writing Map/Reduce functions in C++

     [ http://issues.apache.org/jira/browse/HADOOP-234?page=all ]

Doug Cutting updated HADOOP-234:
--------------------------------

    Fix Version: 0.5.0
                     (was: 0.4.0)

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya
>      Fix For: 0.5.0
>  Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce Developer doc.pdf
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira