You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Sanjay Dahiya (JIRA)" <ji...@apache.org> on 2006/05/19 11:29:31 UTC

[jira] Created: (HADOOP-234) Support for writing Map/Reduce functions in C++

Support for writing Map/Reduce functions in C++
-----------------------------------------------

         Key: HADOOP-234
         URL: http://issues.apache.org/jira/browse/HADOOP-234
     Project: Hadoop
        Type: New Feature

  Components: mapred  
    Reporter: Sanjay Dahiya


MapReduce C++ support

Requirements 

1. Allow users to write Map and Reduce functions in C++, rest of the 
infrastructure already present in Java should be reused. 
2. Avoid users having to write both Java and C++ for this to work. 
3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
4. Use Record IO for describing record format, both MR java framework and C++ should 
	use the same format to work seemlessly. 
5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
	simple strings. 

Implementation notes

- If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
	mapper and reducer methods in C++. 
- For composite Record IO types user starts with defining a record format using Record IO DDL. 
- User generates Java and C++ classes from the DDL using record IO. 
- Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
- User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
  makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
  generated C++ record IO classes. 
- User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 

Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 

Native(JNI) Java proxy for the Mapper : 
---------------------------------------
Without Record IO :-
--------------------
public class SimpleNativeMapper extends MapReduceBase implements Mapper {

	/**
	  * Works on simple strings. 
	  **/
	public void map(WritableComparable key, Writable value,
			OutputCollector output, Reporter reporter) throws IOException {

		mapNative(key.toString().getBytes()
				, value.toString().getBytes(), output, reporter); 
	}
	
	/**
	  * Native implementation. 
	  **/
	private native void mapNative(byte[] key, byte[] value, 
			OutputCollector output, Reporter reporter) throws IOException;

}


With Record IO :- 
------------------
public class RecordIONativeMapper extends MapReduceBase implements Mapper {

	/**
	 * Implementation of map method, this acts as a JNI proxy for actual map 
	 * method implemented in C++. Works for Record IO based records. 
	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
	 */
	public void map(WritableComparable key, Writable value,
			OutputCollector output, Reporter reporter) throws IOException {
		
		byte[] keyBytes = null ;
		byte[] valueBytes = null ;
		
		try{
			// we need to serialize the key and record and pass the serialized 
			// format to C++ / JNI methods so they can interpret it using appropriate 
			// record IO classes. 
			{
				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
				
				((Record)key).serialize(boa, "WhatIsTag");
				keyBytes = keyStream.toByteArray();
			}
			{
				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
				
				((Record)key).serialize(boa, "WhatIsTag");
				valueBytes = valueStream.toByteArray();
			}
		}catch(ClassCastException e){
			// throw better exceptions
			throw new IOException("Input record must be of Record IO Type"); 
		}
		// pass the serialized byte[] to C++ implementation. 
		mapNative(keyBytes, valueBytes, output, reporter); 
	}

	/**
	 * Implementation in C++.
	 */
	private native void mapNative(byte[] key, byte[] value, 
			OutputCollector output, Reporter reporter) throws IOException;
}

OutputCollector Proxy for C++
------------------------------
public class NativeOutputCollector implements OutputCollector {

	// standard method from interface
	public void collect(WritableComparable key, Writable value)
			throws IOException {
	}
	
	// deserializes key and value and calls collect(WritableComparable, Writable)
	public void collectFromNative(byte[]key, byte[]value){
		// deserialize key and value to java types ( as configured in JobConf )
		// call actual collect method 
	}
}



Core Native functions ( helper for user provided Mapper and Reducer ) 
---------------------------------------------------------------------
#include "org_apache_hadoop_mapred_NativeMapper.h"
#include "UserMapper.h"

/**
	* A C++ proxy method, calls actual implementation of the Mapper. This method 
	signature is generated by javah.
**/
JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
  (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
  	jobject output_collector, jobject reporter);
{
 
 	// convert char* and pass on to user defined map method. 
 	// user's map method should take care of converting it to correct record IO 
 	// type.
 	int keyLen = (*env)->GetArrayLength(env, key) ;
 	int valueLen = (*env)->GetArrayLength(env, valueLen) ;

 	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
 	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
 	
 	// Call User defined method 
 	user_map(keyBuf, valueBuf, output_collector, reporter) ;
 	
 	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
 	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
}

/**
	Helper method, acts as a proxy to OutputCollector in java. key and value 
	must be serialized forms of records as specified in JobConf. 
**/
void output_collector(const char * key, const char *value,
	jobject output_collector, jobject reporter){
	
	// invoke java NativeOutputCollector.collect with key and value. 
}


User defined Mapper ( and Reducer ) 
------------------------------------
/**
	implements user defined map operation. 
**/
void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {

	//1. deserialize key/value in the appropriate format using record IO. 
	
	//2. process key/value and generate the intermediate key/values in record IO format. 
	
	//3. Deserialize intermediate key/values to intermed_key and intermed_value
	
	//4. pass intermed_key/intermed_value using helper function - 
	// 		output_collector(intermed_key, intermed_value, collector, recorder);
	
	
}

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Commented: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Sanjay Dahiya (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-234?page=comments#action_12442121 ] 
            
Sanjay Dahiya commented on HADOOP-234:
--------------------------------------


   [[ Old comment, sent by email on Mon, 22 May 2006 17:28:16 +0530 ]]

Hi Doug -
I was also looking for a way to avoid serialization and de- 
serialization, but I am still not clear how do we use existing record  
IO with this (without modifying generated classes)
When we define a new record format and generate classes to work with  
the record, the generated classes contain Reader and Writer for the  
record. These read/write 'Record' objects from streams. One way to  
implement this would be to modify the class generation and make  
Reader extend SequenceInputFile and return(optionally) a ByetWritable  
rather than a Record. With this in place we should be able to avoid  
the need for serialization/de-serialization and users will not need  
to write extra code per type of record. Or am I missing something here ?

~Sanjay







> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: http://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce Developer doc.pdf
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Updated: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Sanjay Dahiya (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-234?page=all ]

Sanjay Dahiya updated HADOOP-234:
---------------------------------

    Attachment: Hadoop MaReduce Developer doc.pdf

I am not sure if I am following correctly here, I am new to the codebase and so I took some time to document the MapReduce flow in the system. There are some things I am not clear about (bold italics in attachment). would be great if you could clarify. It may help others new to the project. 

In this specific case, RecordReader.next() is called by the framework (MapTask) and next() method knows  the boundry of the next Record in the File. In case it is a complex RecordIO type the boundry wont be known to anyone except the generated object, which contains code for reading and writing the object. This is the part which is not clear how a generic SequenceFile would know size/boundries of RecordIO objects. If the generated code contains code for reading the next record in a ByteWritable in addition to Record, this will be easy to implement. 
I think I am missing something here, can you please clarify ? 

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya
>  Attachments: Hadoop MaReduce Developer doc.pdf
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Updated: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-234:
---------------------------------

    Attachment:     (was: Hadoop MaReduce Developer doc.pdf)

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.12.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12474303 ] 

Doug Cutting commented on HADOOP-234:
-------------------------------------

+1  This proposal looks good.  I note that the contexts could, with the addition of a nextKey() method, be turned into iterators, permitting alternate map and reduce control structures, like MapRunnable.  This may prove to be a feature.

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.12.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-234?page=all ]

Doug Cutting updated HADOOP-234:
--------------------------------

    Fix Version: 0.5.0
                     (was: 0.4.0)

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya
>      Fix For: 0.5.0
>  Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce Developer doc.pdf
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Commented: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-234?page=comments#action_12412836 ] 

Owen O'Malley commented on HADOOP-234:
--------------------------------------

I agree with Doug that if you are going to throw away the advantages of actually having meaningful types on the C++ that the keys and values on the Java side should be BytesWritable. 

That said, I think it would be much less error-prone for the users and easier to understand and debug if you followed the Hadoop API much closer. Define a Writable and WritableComparable interfaces in C++. The Record IO classes will support them with a minor change to the code generator. 

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya

>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Commented: (HADOOP-234) Hadoop Pipes for writing map/reduce jobs in C++ and python

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12490454 ] 

Doug Cutting commented on HADOOP-234:
-------------------------------------

A few quick comments:

- the pipes package should have a package.html file
- only Submitter.java should be public, right?
- one must 'chmod +x' the configure scripts
- the create-c++-configure task fails for me on Ubuntu w/ 'Can't exec "aclocal"'
- 'bin/hadoop pipes' should print a helpful message
- a README.txt file in src/examples/pipes telling how to run things would go a long way



> Hadoop Pipes for writing map/reduce jobs in C++ and python
> ----------------------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html, pipes.patch
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-234?page=comments#action_12412581 ] 

Doug Cutting commented on HADOOP-234:
-------------------------------------

It would be best to avoid deserializing & reserializing objects in Java before passing them to C.  So you might instead always use BytesWritable for the input and output keys and values in Java, and implement a subclass of SequenceFileInputFormat that uses SequenceFile.next(DataOutputBuffer) to copy the raw binary content into the BytesWritable.  Then your mapper and reducer implementations could simply get the bytes from the BytesWritable and pass it to the native method, without re-serializing.  Does this make sense?

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya

>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Commented: (HADOOP-234) Hadoop Pipes for writing map/reduce jobs in C++ and python

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12496171 ] 

Hadoop QA commented on HADOOP-234:
----------------------------------

+1

http://issues.apache.org/jira/secure/attachment/12357433/pipes-2.patch applied and successfully tested against trunk revision r538318.

Test results:   http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/146/testReport/
Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/146/console

> Hadoop Pipes for writing map/reduce jobs in C++ and python
> ----------------------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: pipes
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.14.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html, pipes-2.patch, pipes.patch
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-234:
---------------------------------

    Attachment: hadoop-pipes.html

This is my current plan of approach.

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.12.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-234) Hadoop Pipes for writing map/reduce jobs in C++ and python

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-234:
---------------------------------

    Attachment: pipes-2.patch

> Hadoop Pipes for writing map/reduce jobs in C++ and python
> ----------------------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: pipes
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.14.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html, pipes-2.patch, pipes.patch
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-234:
---------------------------------

    Attachment: pipes.patch

Here is a first pass of this patch. I still need to add unit tests.

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html, pipes.patch
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Sanjay Dahiya (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-234?page=all ]

Sanjay Dahiya updated HADOOP-234:
---------------------------------

    Attachment: Hadoop MaReduce Developer doc.pdf

Updated ... 

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya
>  Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce Developer doc.pdf
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Updated: (HADOOP-234) Hadoop Pipes for writing map/reduce jobs in C++ and python

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-234:
---------------------------------

      Component/s:     (was: mapred)
                   pipes
    Fix Version/s: 0.14.0

> Hadoop Pipes for writing map/reduce jobs in C++ and python
> ----------------------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: pipes
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.14.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html, pipes.patch
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-234) Hadoop Pipes for writing map/reduce jobs in C++ and python

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-234:
---------------------------------

    Summary: Hadoop Pipes for writing map/reduce jobs in C++ and python  (was: Support for writing Map/Reduce functions in C++)

> Hadoop Pipes for writing map/reduce jobs in C++ and python
> ----------------------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html, pipes.patch
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-234:
---------------------------------

    Fix Version/s: 0.12.0
      Description: 
MapReduce C++ support

Requirements 

1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
2. Avoid users having to write both Java and C++ for this to work. 
3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
4. The interface should be SWIG'able.

  was:
MapReduce C++ support

Requirements 

1. Allow users to write Map and Reduce functions in C++, rest of the 
infrastructure already present in Java should be reused. 
2. Avoid users having to write both Java and C++ for this to work. 
3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
4. Use Record IO for describing record format, both MR java framework and C++ should 
	use the same format to work seemlessly. 
5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
	simple strings. 

Implementation notes

- If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
	mapper and reducer methods in C++. 
- For composite Record IO types user starts with defining a record format using Record IO DDL. 
- User generates Java and C++ classes from the DDL using record IO. 
- Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
- User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
  makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
  generated C++ record IO classes. 
- User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 

Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 

Native(JNI) Java proxy for the Mapper : 
---------------------------------------
Without Record IO :-
--------------------
public class SimpleNativeMapper extends MapReduceBase implements Mapper {

	/**
	  * Works on simple strings. 
	  **/
	public void map(WritableComparable key, Writable value,
			OutputCollector output, Reporter reporter) throws IOException {

		mapNative(key.toString().getBytes()
				, value.toString().getBytes(), output, reporter); 
	}
	
	/**
	  * Native implementation. 
	  **/
	private native void mapNative(byte[] key, byte[] value, 
			OutputCollector output, Reporter reporter) throws IOException;

}


With Record IO :- 
------------------
public class RecordIONativeMapper extends MapReduceBase implements Mapper {

	/**
	 * Implementation of map method, this acts as a JNI proxy for actual map 
	 * method implemented in C++. Works for Record IO based records. 
	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
	 */
	public void map(WritableComparable key, Writable value,
			OutputCollector output, Reporter reporter) throws IOException {
		
		byte[] keyBytes = null ;
		byte[] valueBytes = null ;
		
		try{
			// we need to serialize the key and record and pass the serialized 
			// format to C++ / JNI methods so they can interpret it using appropriate 
			// record IO classes. 
			{
				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
				
				((Record)key).serialize(boa, "WhatIsTag");
				keyBytes = keyStream.toByteArray();
			}
			{
				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
				
				((Record)key).serialize(boa, "WhatIsTag");
				valueBytes = valueStream.toByteArray();
			}
		}catch(ClassCastException e){
			// throw better exceptions
			throw new IOException("Input record must be of Record IO Type"); 
		}
		// pass the serialized byte[] to C++ implementation. 
		mapNative(keyBytes, valueBytes, output, reporter); 
	}

	/**
	 * Implementation in C++.
	 */
	private native void mapNative(byte[] key, byte[] value, 
			OutputCollector output, Reporter reporter) throws IOException;
}

OutputCollector Proxy for C++
------------------------------
public class NativeOutputCollector implements OutputCollector {

	// standard method from interface
	public void collect(WritableComparable key, Writable value)
			throws IOException {
	}
	
	// deserializes key and value and calls collect(WritableComparable, Writable)
	public void collectFromNative(byte[]key, byte[]value){
		// deserialize key and value to java types ( as configured in JobConf )
		// call actual collect method 
	}
}



Core Native functions ( helper for user provided Mapper and Reducer ) 
---------------------------------------------------------------------
#include "org_apache_hadoop_mapred_NativeMapper.h"
#include "UserMapper.h"

/**
	* A C++ proxy method, calls actual implementation of the Mapper. This method 
	signature is generated by javah.
**/
JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
  (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
  	jobject output_collector, jobject reporter);
{
 
 	// convert char* and pass on to user defined map method. 
 	// user's map method should take care of converting it to correct record IO 
 	// type.
 	int keyLen = (*env)->GetArrayLength(env, key) ;
 	int valueLen = (*env)->GetArrayLength(env, valueLen) ;

 	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
 	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
 	
 	// Call User defined method 
 	user_map(keyBuf, valueBuf, output_collector, reporter) ;
 	
 	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
 	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
}

/**
	Helper method, acts as a proxy to OutputCollector in java. key and value 
	must be serialized forms of records as specified in JobConf. 
**/
void output_collector(const char * key, const char *value,
	jobject output_collector, jobject reporter){
	
	// invoke java NativeOutputCollector.collect with key and value. 
}


User defined Mapper ( and Reducer ) 
------------------------------------
/**
	implements user defined map operation. 
**/
void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {

	//1. deserialize key/value in the appropriate format using record IO. 
	
	//2. process key/value and generate the intermediate key/values in record IO format. 
	
	//3. Deserialize intermediate key/values to intermed_key and intermed_value
	
	//4. pass intermed_key/intermed_value using helper function - 
	// 		output_collector(intermed_key, intermed_value, collector, recorder);
	
	
}


> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.12.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce Developer doc.pdf
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Work started: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Work on HADOOP-234 started by Owen O'Malley.

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.12.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-234?page=comments#action_12412954 ] 

Doug Cutting commented on HADOOP-234:
-------------------------------------

I think we can mostly achieve what we want by using the SequenceFile.next(DataOutputBuffer) method to read the raw bytes of each entry, regardless of the declared types.  Thus the SequenceFile can still be created with the correct Java key and value classes, but when we actually read entries in Java we won't use instances of those classes, but rather just read the raw content of the entry into a byte-array container like BytesWritable that is passed to C.  Then the C code can deserialize the Record instance from the byte-array container.  So a C-based mapper should specify the real input key and value classes, but its InputFormat implementation will ignore that when entries are read, passing raw bytes to C.

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya

>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Updated: (HADOOP-234) Hadoop Pipes for writing map/reduce jobs in C++ and python

Posted by "Doug Cutting (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Doug Cutting updated HADOOP-234:
--------------------------------

    Resolution: Fixed
        Status: Resolved  (was: Patch Available)

I just committed this.  Thanks, Owen!

> Hadoop Pipes for writing map/reduce jobs in C++ and python
> ----------------------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: pipes
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.14.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html, pipes-2.patch, pipes.patch
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-234) Hadoop Pipes for writing map/reduce jobs in C++ and python

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12496534 ] 

Hadoop QA commented on HADOOP-234:
----------------------------------

Integrated in Hadoop-Nightly #91 (See http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Nightly/91/)

> Hadoop Pipes for writing map/reduce jobs in C++ and python
> ----------------------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: pipes
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.14.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html, pipes-2.patch, pipes.patch
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Sameer Paranjpye (JIRA)" <ji...@apache.org>.
     [ http://issues.apache.org/jira/browse/HADOOP-234?page=all ]

Sameer Paranjpye updated HADOOP-234:
------------------------------------

    Fix Version: 0.4

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya
>      Fix For: 0.4
>  Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce Developer doc.pdf
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Updated: (HADOOP-234) Hadoop Pipes for writing map/reduce jobs in C++ and python

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-234:
---------------------------------

    Status: Patch Available  (was: In Progress)

> Hadoop Pipes for writing map/reduce jobs in C++ and python
> ----------------------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: https://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: pipes
>            Reporter: Sanjay Dahiya
>         Assigned To: Owen O'Malley
>             Fix For: 0.14.0
>
>         Attachments: Hadoop MaReduce Developer doc.pdf, hadoop-pipes.html, pipes-2.patch, pipes.patch
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map, Reduce, RecordReader, and RecordWriter functions in C++, rest of the infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. The interface should be SWIG'able.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Milind Bhandarkar (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-234?page=comments#action_12412962 ] 

Milind Bhandarkar commented on HADOOP-234:
------------------------------------------

This can be done by providing just one C++ class in record IO. Currently, C++ version for record IO generates methods for each class that read from and write to InStream and OutStream interfaces, that contain only read and write methods. Creation of concrete classes that implement these interfaces is outside of the code generation. One could proovide a byteStream class in C++ that provides these interfaces. The construction of BytesOutStream happens in C++ and after serializing C++ record, this stream goes to Java via JNI, which is then converted to BytesWritable and written to the sequencefile. BytesInStream is created in Java tied with the sequencefile, and supplies bytes to the C++ record.

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya

>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira


[jira] Commented: (HADOOP-234) Support for writing Map/Reduce functions in C++

Posted by "Sanjay Dahiya (JIRA)" <ji...@apache.org>.
    [ http://issues.apache.org/jira/browse/HADOOP-234?page=comments#action_12412870 ] 

Sanjay Dahiya commented on HADOOP-234:
--------------------------------------

I agree that  keys/values should be BytesWritable in case we are not using typed data (Records). But I am trying to understand how to to avoid serialization/de-serialization multiple times between Java and C++ and still use Record IO. 
So when we define a new record format and generate classes to work with the record, the generated classes contain Reader and Writer for the record. These read/write 'Record' objects from streams. One way to implement this would be to modify the class generation and make Reader extend SequenceInputFile and return(optionally) a ByetWritable rather than a Record. The idea is that the Record know how to read itself and its size etc. so we let it read from the input but it reads in BytesWritable rather than the record type in Java. we pass this BytesWritable to C++ and do deserialization only once to get the right type. 
With this in place we should be able to avoid the need for serialization/de-serialization and users will not need to write extra code per type of record. 

> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>          Key: HADOOP-234
>          URL: http://issues.apache.org/jira/browse/HADOOP-234
>      Project: Hadoop
>         Type: New Feature

>   Components: mapred
>     Reporter: Sanjay Dahiya

>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ should 
> 	use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if keys and values are 
> 	simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper in JobConf and implements 
> 	mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	  * Works on simple strings. 
> 	  **/
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		mapNative(key.toString().getBytes()
> 				, value.toString().getBytes(), output, reporter); 
> 	}
> 	
> 	/**
> 	  * Native implementation. 
> 	  **/
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
> 	/**
> 	 * Implementation of map method, this acts as a JNI proxy for actual map 
> 	 * method implemented in C++. Works for Record IO based records. 
> 	 * @see map(byte[] , byte[],	OutputCollector, Reporter)
> 	 */
> 	public void map(WritableComparable key, Writable value,
> 			OutputCollector output, Reporter reporter) throws IOException {
> 		
> 		byte[] keyBytes = null ;
> 		byte[] valueBytes = null ;
> 		
> 		try{
> 			// we need to serialize the key and record and pass the serialized 
> 			// format to C++ / JNI methods so they can interpret it using appropriate 
> 			// record IO classes. 
> 			{
> 				ByteArrayOutputStream keyStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(keyStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				keyBytes = keyStream.toByteArray();
> 			}
> 			{
> 				ByteArrayOutputStream valueStream = new ByteArrayOutputStream() ; 
> 				BinaryOutputArchive boa = new BinaryOutputArchive(new DataOutputStream(valueStream)) ;
> 				
> 				((Record)key).serialize(boa, "WhatIsTag");
> 				valueBytes = valueStream.toByteArray();
> 			}
> 		}catch(ClassCastException e){
> 			// throw better exceptions
> 			throw new IOException("Input record must be of Record IO Type"); 
> 		}
> 		// pass the serialized byte[] to C++ implementation. 
> 		mapNative(keyBytes, valueBytes, output, reporter); 
> 	}
> 	/**
> 	 * Implementation in C++.
> 	 */
> 	private native void mapNative(byte[] key, byte[] value, 
> 			OutputCollector output, Reporter reporter) throws IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
> 	// standard method from interface
> 	public void collect(WritableComparable key, Writable value)
> 			throws IOException {
> 	}
> 	
> 	// deserializes key and value and calls collect(WritableComparable, Writable)
> 	public void collectFromNative(byte[]key, byte[]value){
> 		// deserialize key and value to java types ( as configured in JobConf )
> 		// call actual collect method 
> 	}
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
> 	* A C++ proxy method, calls actual implementation of the Mapper. This method 
> 	signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>   	jobject output_collector, jobject reporter);
> {
>  
>  	// convert char* and pass on to user defined map method. 
>  	// user's map method should take care of converting it to correct record IO 
>  	// type.
>  	int keyLen = (*env)->GetArrayLength(env, key) ;
>  	int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>  	const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, JNI_FALSE) ;
>  	const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, JNI_FALSE) ;
>  	
>  	// Call User defined method 
>  	user_map(keyBuf, valueBuf, output_collector, reporter) ;
>  	
>  	(*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>  	(*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
> 	Helper method, acts as a proxy to OutputCollector in java. key and value 
> 	must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
> 	jobject output_collector, jobject reporter){
> 	
> 	// invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
> 	implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, jobject recorder) {
> 	//1. deserialize key/value in the appropriate format using record IO. 
> 	
> 	//2. process key/value and generate the intermediate key/values in record IO format. 
> 	
> 	//3. Deserialize intermediate key/values to intermed_key and intermed_value
> 	
> 	//4. pass intermed_key/intermed_value using helper function - 
> 	// 		output_collector(intermed_key, intermed_value, collector, recorder);
> 	
> 	
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira