You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/11/23 09:41:10 UTC

svn commit: r1544761 [1/3] - in /hama/trunk: ./ c++/ c++/src/ c++/src/main/native/examples/ c++/src/main/native/examples/conf/ c++/src/main/native/examples/impl/ c++/src/main/native/examples/input/ c++/src/main/native/pipes/api/hama/ c++/src/main/nativ...

Author: millecker
Date: Sat Nov 23 08:41:09 2013
New Revision: 1544761

URL: http://svn.apache.org/r1544761
Log:
HAMA-815: Hama Pipes uses C++ templates

Added:
    hama/trunk/c++/README.txt
    hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
    hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java
    hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java
    hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VectorWritableMatrixGen.java
Removed:
    hama/trunk/c++/src/main/native/examples/input/
    hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/c++/src/CMakeLists.txt
    hama/trunk/c++/src/main/native/examples/README.txt
    hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
    hama/trunk/c++/src/main/native/examples/conf/piestimator.xml
    hama/trunk/c++/src/main/native/examples/conf/summation.xml
    hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.cc
    hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.hh
    hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc
    hama/trunk/c++/src/main/native/examples/impl/piestimator.cc
    hama/trunk/c++/src/main/native/examples/impl/summation.cc
    hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
    hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh
    hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh
    hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc
    hama/trunk/c++/src/main/native/utils/impl/StringUtils.cc
    hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java
    hama/trunk/core/pom.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/Generator.java
    hama/trunk/src/assemble/bin.xml

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sat Nov 23 08:41:09 2013
@@ -2,6 +2,10 @@ Hama Change Log
 
 Release 0.7.0 (unreleased changes)
 
+  NEW FEATURES
+
+   HAMA-815: Hama Pipes uses C++ templates (Martin Illecker)  
+
   IMPROVEMENTS
 
    HAMA-699: Add commons module (Martin Illecker)
@@ -9,7 +13,7 @@ Release 0.7.0 (unreleased changes)
 Release 0.6.3 - October 11, 2013
 
   NEW FEATURES
-  
+
    HAMA-807: Make aggregators skip supersteps (Anastasis Andronidis)
    HAMA-800: Hama Pipes Examples (Martin Illecker)
    HAMA-804: Create NeuralNetwork Example (Yexi Jiang)

Added: hama/trunk/c++/README.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/README.txt?rev=1544761&view=auto
==============================================================================
--- hama/trunk/c++/README.txt (added)
+++ hama/trunk/c++/README.txt Sat Nov 23 08:41:09 2013
@@ -0,0 +1,25 @@
+####################################################################
+# Hama Pipes README                                                #
+####################################################################
+# Hama Pipes includes the following three examples:                #
+# - 1) Summation                                                   #
+# - 2) PiEstimator                                                 #
+# - 3) MatrixMultiplication                                        #
+# in c++/src/main/native/examples                                  #
+# Please see c++/src/main/native/examples/README.txt               #
+####################################################################
+
+Please use the following command to compile:
+
+% g++ -m64 -Ic++/src/main/native/utils/api \
+      -Ic++/src/main/native/pipes/api \
+      -Lc++/target/native \
+      -lhadooputils -lpthread \
+      PROGRAM.cc \
+      -o PROGRAM \
+      -g -Wall -O2
+
+Attention: The paths have to be adjusted, if you are not operating
+           in the Hama source folder.
+
+####################################################################

Modified: hama/trunk/c++/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/CMakeLists.txt?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/CMakeLists.txt (original)
+++ hama/trunk/c++/src/CMakeLists.txt Sat Nov 23 08:41:09 2013
@@ -34,7 +34,7 @@ include(JNIFlags.cmake NO_POLICY_SCOPE)
 function(output_directory TGT DIR)
     SET_TARGET_PROPERTIES(${TGT} PROPERTIES
         RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
-   SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
         ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
     SET_TARGET_PROPERTIES(${TGT} PROPERTIES
         LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
@@ -49,15 +49,15 @@ include_directories(
 
 # Example programs
 add_executable(summation main/native/examples/impl/summation.cc)
-target_link_libraries(summation hamapipes hadooputils)
+target_link_libraries(summation hadooputils)
 output_directory(summation examples)
 
 add_executable(piestimator main/native/examples/impl/piestimator.cc)
-target_link_libraries(piestimator hamapipes hadooputils)
+target_link_libraries(piestimator hadooputils)
 output_directory(piestimator examples)
 
 add_executable(matrixmultiplication main/native/examples/impl/matrixmultiplication.cc)
-target_link_libraries(matrixmultiplication DenseDoubleVector hamapipes hadooputils)
+target_link_libraries(matrixmultiplication DenseDoubleVector hadooputils)
 output_directory(matrixmultiplication examples)
 
 add_library(DenseDoubleVector 
@@ -70,11 +70,11 @@ add_library(hadooputils STATIC
     main/native/utils/impl/SerialUtils.cc
 )
 
-add_library(hamapipes STATIC
-    main/native/pipes/impl/HamaPipes.cc
-)
+#add_library(hamapipes STATIC
+#    main/native/pipes/impl/Pipes.cc
+#)
 
-target_link_libraries(hamapipes
+#target_link_libraries(hamapipes
 #    ${OPENSSL_LIBRARIES}
-    pthread
-)
+#    pthread
+#)

Modified: hama/trunk/c++/src/main/native/examples/README.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/README.txt?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/README.txt (original)
+++ hama/trunk/c++/src/main/native/examples/README.txt Sat Nov 23 08:41:09 2013
@@ -1,113 +1,127 @@
 ####################################################################
-# Hama Pipes Examples                                              #
+# Hama Pipes Examples README                                       #
 ####################################################################
-# - Summation                                                      #
-# - PiEstimator                                                    #
-# - MatrixMultiplication                                           #
+# Hama Pipes includes the following three examples:                #
+# - 1) Summation                                                   #
+# - 2) PiEstimator                                                 #
+# - 3) MatrixMultiplication                                        #
 ####################################################################
 
 To run the examples, first compile them:
 
-% mvn install 
-
-and then copy the binaries to dfs:
-
-% hadoop fs -put c++/target/native/examples/summation \
- /examples/bin/summation
-
-create an input directory with text files:
-
-% hadoop fs -put my-data in-dir
-
-and run the word count example:
-
-% hama pipes \
- -conf c++/src/main/native/examples/conf/summation.xml \
- -input in-dir -output out-dir
+% mvn install
 
 ####################################################################
+# 1) Summation Example                                             #
+####################################################################
 
-# Summation Example
+First copy summation binary to dfs
 
 % hadoop fs -put c++/target/native/examples/summation \
  /examples/bin/summation
 
+Generate input data and copy to dfs
+
 % echo -e "key1\t1.0\nkey2\t2.0\nkey3\t3.0\nkey4\t4.0\n\
 key5\t5.0\nkey6\t6.0\nkey7\t7.0\nkey8\t8.0\nkey9\t9.0\n\
 key10\t10.0" > summation.txt && hadoop fs -put summation.txt \
  /examples/input/summation/input.txt && rm summation.txt
 
+Run summation example
+
 % hama pipes \
  -conf c++/src/main/native/examples/conf/summation.xml \
  -input /examples/input/summation \
  -output /examples/output/summation
 
+View input and output data
+
 % hadoop fs -cat /examples/input/summation/input.txt
-% hadoop fs -cat /examples/output/summation/part-00000
+% hama seqdumper -seqFile /examples/output/summation/part-00000
+
+You should see
+# Input Path: /examples/output/summation/part-00000
+# Key class: class org.apache.hadoop.io.Text 
+# Value Class: class org.apache.hadoop.io.DoubleWritable
+# Key: Sum: Value: 55.0
+# Count: 1
+
+Delete input and output folder
 
 % hadoop fs -rmr /examples/input/summation
 % hadoop fs -rmr /examples/output/summation
 
 ####################################################################
+# 2) PiEstimator Example                                           #
+####################################################################
 
-# PiEstimator Example
+First copy piestimator binary to dfs
 
 % hadoop fs -put c++/target/native/examples/piestimator \
  /examples/bin/piestimator
 
+Run piestimator example because no input data is needed
+
 % hama pipes \
  -conf c++/src/main/native/examples/conf/piestimator.xml \
  -output /examples/output/piestimator
 
-% hadoop fs -cat /examples/output/piestimator/part-00001
+View output data
+
+% hama seqdumper -seqFile /examples/output/piestimator/part-00001
+
+You should see
+# Input Path: /examples/output/piestimator/part-00001
+# Key class: class org.apache.hadoop.io.Text 
+# Value Class: class org.apache.hadoop.io.DoubleWritable
+# Key: Estimated value of PI: Value: 3.139116
+# Count: 1
+
+Delete output folder
 
 % hadoop fs -rmr /examples/output/piestimator
 
 ####################################################################
+# 3) MatrixMultiplication Example                                  #
+####################################################################
 
-# MatrixMultiplication Example
+First copy matrixmultiplication binary to dfs
 
 % hadoop fs -put c++/target/native/examples/matrixmultiplication \
  /examples/bin/matrixmultiplication
 
-% hadoop fs -put c++/src/main/native/examples/input/MatrixA.seq \
- /examples/input/matrixmultiplication/MatrixA.seq
+Generate input data
 
-% hadoop fs -put \
- c++/src/main/native/examples/input/MatrixB_transposed.seq \
- /examples/input/matrixmultiplication/MatrixB_transposed.seq
+% hama jar dist/target/hama-*/hama-*/hama-examples-*.jar \
+  gen vectorwritablematrix 4 4 \
+  /examples/input/matrixmultiplication/MatrixA.seq \
+  false true 0 10 0
+
+% hama jar dist/target/hama-*/hama-*/hama-examples-*.jar \
+  gen vectorwritablematrix 4 4 \
+  /examples/input/matrixmultiplication/MatrixB_transposed.seq \
+  false true 0 10 0
+
+Run matrixmultiplication example
 
 % hama pipes \
  -conf c++/src/main/native/examples/conf/matrixmultiplication.xml \
  -output /examples/output/matrixmultiplication
 
+View input and output data
+
 % hama seqdumper \
  -seqFile /examples/input/matrixmultiplication/MatrixA.seq
 
 % hama seqdumper \
- -seqFile /examples/input/matrixmultiplication/MatrixB_transposed.seq
+ -seqFile  \
+ /examples/input/matrixmultiplication/MatrixB_transposed.seq
 
-% hadoop fs -cat /examples/output/matrixmultiplication/part-00000
+% hama seqdumper -seqFile \
+  /examples/output/matrixmultiplication/part-00001
 
-# Matrix A
-#    9     4     1     9
-#    1     8     6     3
-#    8     3     3     9
-#    7     1     9     6
-
-# Matrix B (not transposed)
-#    2     1     6     5
-#    7     8     9     5
-#    2     1     5     8
-#    7     4     4     9
-
-# Resulting Matrix C
-#  111.0 78.0 131.0 154.0
-#   91.0 83.0 120.0 120.0
-#  106.0 71.0 126.0 160.0
-#   81.0 48.0 120.0 166.0
+Delete output folder
 
 % hadoop fs -rmr /examples/output/matrixmultiplication
 
 ####################################################################
-

Modified: hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/matrixmultiplication.xml?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml Sat Nov 23 08:41:09 2013
@@ -30,10 +30,6 @@
     <value>true</value>
   </property>
   <property>
-    <name>bsp.input.format.class</name>
-    <value>org.apache.hama.bsp.SequenceFileInputFormat</value>
-  </property>
-  <property>
     <name>bsp.input.dir</name>
     <value>/examples/input/matrixmultiplication/MatrixA.seq</value>                                            
   </property>
@@ -42,15 +38,47 @@
     <value>/examples/input/matrixmultiplication/MatrixB_transposed.seq</value>
   </property>
   <property>
-    <name>hama.pipes.logging</name>
-    <value>false</value>
+    <name>bsp.input.format.class</name>
+    <value>org.apache.hama.bsp.SequenceFileInputFormat</value>
+  </property>
+  <property>
+    <name>bsp.input.key.class</name>
+    <value>org.apache.hadoop.io.IntWritable</value>
+  </property>
+  <property>
+    <name>bsp.input.value.class</name>
+    <value>org.apache.hama.commons.io.PipesVectorWritable</value>
+  </property>
+  <property>
+    <name>bsp.output.format.class</name>
+    <value>org.apache.hama.bsp.SequenceFileOutputFormat</value>
+  </property>
+  <property>
+    <name>bsp.output.key.class</name>
+    <value>org.apache.hadoop.io.IntWritable</value>
+  </property>
+  <property>
+    <name>bsp.output.value.class</name>
+    <value>org.apache.hama.commons.io.PipesVectorWritable</value>
+  </property>
+  <property>
+    <name>bsp.message.class</name>
+    <value>org.apache.hama.commons.io.PipesKeyValueWritable</value>
   </property>
   <property>
-    <name>hama.messenger.queue.class</name>
-    <value>org.apache.hama.bsp.message.queue.SortedMessageQueue</value>                                            
+    <name>hama.messenger.xfer.queue.class</name>
+    <value>org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol</value>
   </property>
   <property>
     <name>bsp.input.partitioner.class</name>
     <value>org.apache.hama.pipes.PipesPartitioner</value>                                            
   </property>
+  <property>
+    <name>hama.pipes.logging</name>
+    <value>false</value>
+  </property>
+  <property>
+    <name>bsp.peers.num</name>
+    <value>3</value>
+  </property>
 </configuration>

Modified: hama/trunk/c++/src/main/native/examples/conf/piestimator.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/piestimator.xml?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/piestimator.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/piestimator.xml Sat Nov 23 08:41:09 2013
@@ -35,7 +35,19 @@
   </property>
   <property>
     <name>bsp.output.format.class</name>
-    <value>org.apache.hama.bsp.TextOutputFormat</value>
+    <value>org.apache.hama.bsp.SequenceFileOutputFormat</value>
+  </property>
+  <property>
+    <name>bsp.output.key.class</name>
+    <value>org.apache.hadoop.io.Text</value>
+  </property>
+  <property>
+    <name>bsp.output.value.class</name>
+    <value>org.apache.hadoop.io.DoubleWritable</value>
+  </property>
+  <property>
+    <name>bsp.message.class</name>
+    <value>org.apache.hadoop.io.IntWritable</value>
   </property>
   <property>
     <name>hama.pipes.logging</name>

Modified: hama/trunk/c++/src/main/native/examples/conf/summation.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/summation.xml?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/summation.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/summation.xml Sat Nov 23 08:41:09 2013
@@ -34,8 +34,28 @@
     <value>org.apache.hama.bsp.KeyValueTextInputFormat</value>
   </property>
   <property>
+    <name>bsp.input.key.class</name>
+    <value>org.apache.hadoop.io.Text</value>
+  </property>
+  <property>
+    <name>bsp.input.value.class</name>
+    <value>org.apache.hadoop.io.Text</value>
+  </property>
+  <property>
     <name>bsp.output.format.class</name>
-    <value>org.apache.hama.bsp.TextOutputFormat</value>
+    <value>org.apache.hama.bsp.SequenceFileOutputFormat</value>
+  </property>
+  <property>
+    <name>bsp.output.key.class</name>
+    <value>org.apache.hadoop.io.Text</value>
+  </property>
+  <property>
+    <name>bsp.output.value.class</name>
+    <value>org.apache.hadoop.io.DoubleWritable</value>
+  </property>
+  <property>
+    <name>bsp.message.class</name>
+    <value>org.apache.hadoop.io.DoubleWritable</value>
   </property>
   <property>
     <name>hama.pipes.logging</name>

Modified: hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/DenseDoubleVector.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.cc Sat Nov 23 08:41:09 2013
@@ -32,25 +32,26 @@ using HadoopUtils::Splitter;
 
 namespace math {
   
-  DenseDoubleVector::DenseDoubleVector(int len) : size(len), vector(new double[len]) {
+  DenseDoubleVector::DenseDoubleVector(int len) : vector(new double[len]), size(len) {
   }
   
-  DenseDoubleVector::DenseDoubleVector(int len, double val) : size(len), vector(new double[len]) {
+  DenseDoubleVector::DenseDoubleVector(int len, double val) : vector(new double[len]), size(len) {
     for (int i=0; i<len; i++)
       vector[i] = val;
   }
   
-  DenseDoubleVector::DenseDoubleVector(double arr[]) : vector(arr) {
+  DenseDoubleVector::DenseDoubleVector(int len, double arr[]) : vector(arr), size(len) {
   }
-  
+
   DenseDoubleVector::DenseDoubleVector(const string values) {
     
     Splitter split ( values, "," );
     size = split.size();
     
     vector = new double[size];
-    for ( Splitter::size_type i = 0; i < split.size(); i++ )
+    for ( Splitter::size_type i = 0; i < split.size(); i++ ) {
       vector[i] = HadoopUtils::toDouble(split[i]);
+    }
   }
   
   DenseDoubleVector::~DenseDoubleVector() {

Modified: hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/DenseDoubleVector.hh?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.hh (original)
+++ hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.hh Sat Nov 23 08:41:09 2013
@@ -32,7 +32,7 @@ namespace math {
     /// Creates a new vector with the given length and default value.
     DenseDoubleVector(int length, double val);
     // Creates a new vector with the given array.
-    DenseDoubleVector(double arr[]);
+    DenseDoubleVector(int length, double arr[]);
     DenseDoubleVector(const string values);
     ~DenseDoubleVector();  // Destructor
     

Modified: hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/matrixmultiplication.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc Sat Nov 23 08:41:09 2013
@@ -24,119 +24,121 @@
 #include <time.h>
 #include <math.h>
 #include <string>
-#include <string>
 #include <iostream>
 #include <sstream>
 
 using std::string;
-using std::cout;
+using std::vector;
 
 using HamaPipes::BSP;
 using HamaPipes::BSPJob;
 using HamaPipes::Partitioner;
 using HamaPipes::BSPContext;
-using namespace HadoopUtils;
 
 using math::DenseDoubleVector;
 
-class MatrixMultiplicationBSP: public BSP {
+class MatrixMultiplicationBSP: public BSP<int,string,int,string,string> {
 private:
-  string masterTask;
-  int seqFileID;
-  string HAMA_MAT_MULT_B_PATH;
+  string master_task_;
+  int seq_file_id_;
+  string HAMA_MAT_MULT_B_PATH_;
+  
 public:
-  MatrixMultiplicationBSP(BSPContext& context) {
-    seqFileID = 0;
-    HAMA_MAT_MULT_B_PATH = "hama.mat.mult.B.path";
+  MatrixMultiplicationBSP(BSPContext<int,string,int,string,string>& context) {
+    seq_file_id_ = 0;
+    HAMA_MAT_MULT_B_PATH_ = "hama.mat.mult.B.path";
   }
   
-  void setup(BSPContext& context) {
+  void setup(BSPContext<int,string,int,string,string>& context) {
     // Choose one as a master
-    masterTask = context.getPeerName(context.getNumPeers() / 2);
+    master_task_ = context.getPeerName(context.getNumPeers() / 2);
     
     reopenMatrixB(context);
   }
   
-  void bsp(BSPContext& context) {
+  void bsp(BSPContext<int,string,int,string,string>& context) {
     
-    string aRowKey;
-    string aRowVectorStr;
+    int a_row_key = 0;
+    string a_row_vector_str;
     // while for each row of matrix A
-    while(context.readNext(aRowKey, aRowVectorStr)) {
+    while(context.readNext(a_row_key, a_row_vector_str)) {
+      
+      DenseDoubleVector *a_row_vector = new DenseDoubleVector(a_row_vector_str);
       
-      DenseDoubleVector *aRowVector = new DenseDoubleVector(aRowVectorStr);
-      DenseDoubleVector *colValues = NULL;
+      int b_col_key = 0;
+      string b_col_vector_str;
       
-      string bColKey;
-      string bColVectorStr;
+      // dynamic column values, depend on matrix B cols
+      vector<double> col_values;
       
       // while for each col of matrix B
-      while (context.sequenceFileReadNext(seqFileID,bColKey,bColVectorStr)) {
+      while (context.sequenceFileReadNext<int,string>(seq_file_id_, b_col_key, b_col_vector_str)) {
         
-        DenseDoubleVector *bColVector = new DenseDoubleVector(bColVectorStr);
+        DenseDoubleVector *b_col_vector = new DenseDoubleVector(b_col_vector_str);
         
-        if (colValues == NULL)
-          colValues = new DenseDoubleVector(bColVector->getDimension());
+        double dot = a_row_vector->dot(b_col_vector);
         
-        double dot = aRowVector->dot(bColVector);
-        
-        colValues->set(toInt(bColKey), dot);
+        col_values.push_back(dot);
       }
       
+      DenseDoubleVector *col_values_vector = new DenseDoubleVector(col_values.size(), col_values.data());
+      
       // Submit one calculated row
+      // :key:value1,value2,value3
       std::stringstream message;
-      message << aRowKey << ":" << colValues->toString();
-      context.sendMessage(masterTask, message.str());
+      message << ":" << a_row_key << ":" << col_values_vector->toString();
+      context.sendMessage(master_task_, message.str());
       
       reopenMatrixB(context);
     }
     
-    context.sequenceFileClose(seqFileID);
+    context.sequenceFileClose(seq_file_id_);
     context.sync();
   }
   
-  void cleanup(BSPContext& context) {
-    if (context.getPeerName().compare(masterTask)==0) {
+  void cleanup(BSPContext<int,string,int,string,string>& context) {
+    if (context.getPeerName().compare(master_task_)==0) {
       
-      int msgCount = context.getNumCurrentMessages();
+      int msg_count = context.getNumCurrentMessages();
       
-      for (int i=0; i<msgCount; i++) {
+      for (int i=0; i < msg_count; i++) {
         
+        // :key:value1,value2,value3
         string received = context.getCurrentMessage();
-        //key:value1,value2,value3
-        int pos = (int)received.find(":");
-        string key = received.substr(0,pos);
-        string values = received.substr(pos+1,received.length());
+        string key_value_str = received.substr(1);
+        int pos = (int)key_value_str.find(received.substr(0,1));
+        int key = HadoopUtils::toInt(key_value_str.substr(0,pos));
+        string values = key_value_str.substr(pos+1);
         
         context.write(key, values);
       }
     }
   }
   
-  void reopenMatrixB(BSPContext& context) {
-    if (seqFileID!=0) {
-      context.sequenceFileClose(seqFileID);
+  void reopenMatrixB(BSPContext<int,string,int,string,string>& context) {
+    if (seq_file_id_!=0) {
+      context.sequenceFileClose(seq_file_id_);
     }
     
     const BSPJob* job = context.getBSPJob();
-    string path = job->get(HAMA_MAT_MULT_B_PATH);
+    string path = job->get(HAMA_MAT_MULT_B_PATH_);
     
-    seqFileID = context.sequenceFileOpen(path,"r",
-                                         "org.apache.hadoop.io.IntWritable",
-                                         "org.apache.hama.ml.writable.VectorWritable");
+    seq_file_id_ = context.sequenceFileOpen(path,"r",
+                                            "org.apache.hadoop.io.IntWritable",
+                                            "org.apache.hama.commons.io.PipesVectorWritable");
   }
   
 };
 
-class MatrixRowPartitioner: public Partitioner {
+class MatrixRowPartitioner: public Partitioner<int,string,int,string,string> {
 public:
-  MatrixRowPartitioner(BSPContext& context) { }
+  MatrixRowPartitioner(BSPContext<int,string,int,string,string>& context) { }
   
-  int partition(const string& key,const string& value, int32_t numTasks) {
-    return toInt(key) % numTasks;
+  int partition(const int& key,const string& value, int32_t num_tasks) {
+    return key % num_tasks;
   }
 };
 
 int main(int argc, char *argv[]) {
-  return HamaPipes::runTask(HamaPipes::TemplateFactory<MatrixMultiplicationBSP,MatrixRowPartitioner>());
+  return HamaPipes::runTask<int,string,int,string,string>(HamaPipes::TemplateFactory<MatrixMultiplicationBSP,int,string,int,string,string,MatrixRowPartitioner>());
 }

Modified: hama/trunk/c++/src/main/native/examples/impl/piestimator.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/piestimator.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/piestimator.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/piestimator.cc Sat Nov 23 08:41:09 2013
@@ -33,31 +33,32 @@ using HamaPipes::BSP;
 using HamaPipes::BSPContext;
 using namespace HadoopUtils;
 
-class PiEstimatorBSP: public BSP {
-private:
-  string masterTask;
-  long iterations; // iterations_per_bsp_task
-public:
-  PiEstimatorBSP(BSPContext& context) {
-    iterations = 1000000L;
+class PiEstimatorBSP: public BSP<string,string,string,double,int> {
+  private:
+  string master_task_;
+  long iterations_; // iterations_per_bsp_task
+  
+  public:
+  PiEstimatorBSP(BSPContext<string,string,string,double,int>& context) {
+    iterations_ = 1000000L;
   }
   
   inline double closed_interval_rand(double x0, double x1) {
     return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
   }
   
-  void setup(BSPContext& context) {
+  void setup(BSPContext<string,string,string,double,int>& context) {
     // Choose one as a master
-    masterTask = context.getPeerName(context.getNumPeers() / 2);
+    master_task_ = context.getPeerName(context.getNumPeers() / 2);
   }
   
-  void bsp(BSPContext& context) {
+  void bsp(BSPContext<string,string,string,double,int>& context) {
     
     /* initialize random seed */
     srand(time(NULL));
     
     int in = 0;
-    for (long i = 0; i < iterations; i++) {
+    for (long i = 0; i < iterations_; i++) {
       double x = 2.0 * closed_interval_rand(0, 1) - 1.0;
       double y = 2.0 * closed_interval_rand(0, 1) - 1.0;
       if (sqrt(x * x + y * y) < 1.0) {
@@ -65,28 +66,26 @@ public:
       }
     }
     
-    context.sendMessage(masterTask, toString(in));
+    context.sendMessage(master_task_, in);
     context.sync();
   }
   
-  void cleanup(BSPContext& context) {
-    if (context.getPeerName().compare(masterTask)==0) {
+  void cleanup(BSPContext<string,string,string,double,int>& context) {
+    if (context.getPeerName().compare(master_task_)==0) {
       
-      long totalHits = 0;
-      int msgCount = context.getNumCurrentMessages();
-      string received;
-      for (int i=0; i<msgCount; i++) {
-        string received = context.getCurrentMessage();
-        totalHits += toInt(received);
+      long total_hits = 0;
+      int msg_count = context.getNumCurrentMessages();
+      for (int i=0; i < msg_count; i++) {
+        total_hits += context.getCurrentMessage();
       }
       
-      double pi = 4.0 * totalHits / (msgCount * iterations);
-      context.write("Estimated value of PI is", toString(pi));
+      double pi = 4.0 * total_hits / (msg_count * iterations_);
+      context.write("Estimated value of PI", pi);
     }
   }
 };
 
 int main(int argc, char *argv[]) {
-  return HamaPipes::runTask(HamaPipes::TemplateFactory<PiEstimatorBSP>());
+  return HamaPipes::runTask<string,string,string,double,int>(HamaPipes::TemplateFactory<PiEstimatorBSP,string,string,string,double,int>());
 }
 

Modified: hama/trunk/c++/src/main/native/examples/impl/summation.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/summation.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/summation.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/summation.cc Sat Nov 23 08:41:09 2013
@@ -29,46 +29,46 @@ using std::string;
 using HamaPipes::BSP;
 using HamaPipes::BSPContext;
 
-class SummationBSP: public BSP {
-private:
-  string masterTask;
-public:
-  SummationBSP(BSPContext& context) {  }
+class SummationBSP: public BSP<string,string,string,double,double> {
+  private:
+  string master_task_;
   
-  void setup(BSPContext& context) {
+  public:
+  SummationBSP(BSPContext<string,string,string,double,double>& context) {  }
+  
+  void setup(BSPContext<string,string,string,double,double>& context) {
     // Choose one as a master
-    masterTask = context.getPeerName(context.getNumPeers() / 2);
+    master_task_ = context.getPeerName(context.getNumPeers() / 2);
   }
   
-  void bsp(BSPContext& context) {
+  void bsp(BSPContext<string,string,string,double,double>& context) {
     
-    double intermediateSum = 0.0;
+    double intermediate_sum = 0.0;
     string key;
     string value;
     
     while(context.readNext(key,value)) {
-      intermediateSum += HadoopUtils::toDouble(value);
+      intermediate_sum += HadoopUtils::toDouble(value);
     }
     
-    context.sendMessage(masterTask, HadoopUtils::toString(intermediateSum));
+    context.sendMessage(master_task_, intermediate_sum);
     context.sync();
   }
   
-  void cleanup(BSPContext& context) {
-    if (context.getPeerName().compare(masterTask)==0) {
+  void cleanup(BSPContext<string,string,string,double,double>& context) {
+    if (context.getPeerName().compare(master_task_)==0) {
       
       double sum = 0.0;
-      int msgCount = context.getNumCurrentMessages();
-      for (int i=0; i<msgCount; i++) {
-        string received = context.getCurrentMessage();
-        sum += HadoopUtils::toDouble(received);
+      int msg_count = context.getNumCurrentMessages();
+      for (int i=0; i < msg_count; i++) {
+        sum += context.getCurrentMessage();
       }
-      context.write("Sum", HadoopUtils::toString(sum));
+      context.write("Sum", sum);
     }
   }
 };
 
 int main(int argc, char *argv[]) {
-  return HamaPipes::runTask(HamaPipes::TemplateFactory<SummationBSP>());
+  return HamaPipes::runTask<string,string,string,double,double>(HamaPipes::TemplateFactory<SummationBSP,string,string,string,double,double>());
 }
 

Modified: hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/api/hama/Pipes.hh?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh (original)
+++ hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh Sat Nov 23 08:41:09 2013
@@ -18,357 +18,709 @@
 #ifndef HAMA_PIPES_HH
 #define HAMA_PIPES_HH
 
-#ifdef SWIG
-%module (directors="1") HamaPipes
-%include "std_string.i"
-%feature("director") BSP;
-%feature("director") Partitioner;
-%feature("director") RecordReader;
-%feature("director") RecordWriter;
-%feature("director") Factory;
-#else
+#include <errno.h>
+#include <map>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <sstream> /* ostringstream */
+#include <stdint.h>
+#include <stdio.h> /* printf */
+#include <stdlib.h> /* getenv */
 #include <string>
+#include <string.h>
+#include <strings.h>
+#include <sys/socket.h>
+#include <typeinfo> /* typeid */
+#include <unistd.h> /* sleep */
 #include <vector>
-#endif
 
-#include <stdint.h>
+#include "hadoop/SerialUtils.hh"
+#include "hadoop/StringUtils.hh"
 
+#define stringify( name ) # name
+
+using std::map;
 using std::string;
 using std::vector;
+using std::pair;
 
-namespace HamaPipes {
+using namespace HadoopUtils;
 
-/**
- * This interface defines the interface between application code and the 
- * foreign code interface to Hadoop Map/Reduce.
- */
-
-/**
- * A BSPJob defines the properties for a job.
- */
-class BSPJob {
-public:
-  virtual bool hasKey(const string& key) const = 0;
-  virtual const string& get(const string& key) const = 0;
-  virtual int getInt(const string& key) const = 0;
-  virtual float getFloat(const string& key) const = 0;
-  virtual bool getBoolean(const string&key) const = 0;
-  virtual ~BSPJob() {}
-};
-
-/**
- * Task context provides the information about the task and job.
- */
-class TaskContext {
-public:
-  /**
-   * Counter to keep track of a property and its value.
-   */
-  class Counter {
-  private:
-    int id;
-  public:
-    Counter(int counterId) : id(counterId) {}
-    Counter(const Counter& counter) : id(counter.id) {}
-
-    int getId() const { return id; }
+namespace HamaPipes {
+  
+  // global varibales
+  bool logging;
+  
+  /********************************************/
+  /*************** MESSAGE_TYPE ***************/
+  /********************************************/
+  enum MESSAGE_TYPE {
+    START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES,
+    RUN_SETUP, RUN_BSP, RUN_CLEANUP,
+    READ_KEYVALUE, WRITE_KEYVALUE,
+    GET_MSG, GET_MSG_COUNT,
+    SEND_MSG, SYNC,
+    GET_ALL_PEERNAME, GET_PEERNAME,
+    GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT,
+    REOPEN_INPUT, CLEAR,
+    CLOSE, ABORT,
+    DONE, TASK_DONE,
+    REGISTER_COUNTER, INCREMENT_COUNTER,
+    SEQFILE_OPEN, SEQFILE_READNEXT,
+    SEQFILE_APPEND, SEQFILE_CLOSE,
+    PARTITION_REQUEST, PARTITION_RESPONSE,
+    LOG, END_OF_DATA
+  };
+  
+  /* Only needed for debugging output */
+  const char* messageTypeNames[] = {
+    stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ),
+    stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ),
+    stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ),
+    stringify( GET_MSG ), stringify( GET_MSG_COUNT ),
+    stringify( SEND_MSG ), stringify( SYNC ),
+    stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ),
+    stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ),
+    stringify( REOPEN_INPUT ), stringify( CLEAR ),
+    stringify( CLOSE ), stringify( ABORT ),
+    stringify( DONE ), stringify( TASK_DONE ),
+    stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ),
+    stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ),
+    stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ),
+    stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE ),
+    stringify( LOG ), stringify( END_OF_DATA )
   };
   
   /**
-   * Get the BSPJob for the current task.
-   */
-  virtual const BSPJob* getBSPJob() = 0;
-
-  /**
-   * Get the current key. 
-   * @return the current key
-   */
-  //virtual const string& getInputKey() = 0;
-
-  /**
-   * Get the current value. 
-   * @return the current value
-   */
-  //virtual const string& getInputValue() = 0;
-
-  /**
-   * Generate an output record
-   */
-  //virtual void emit(const string& key, const string& value) = 0;
-
-  /**
-   * Mark your task as having made progress without changing the status 
-   * message.
+   * Generic KeyValuePair including is_empty
    */
-  //virtual void progress() = 0;
-
+  template <typename K, typename V>
+  struct KeyValuePair : pair<K, V> {
+    typedef pair<K, V> base_t;
+    bool is_empty;
+    
+    KeyValuePair() : is_empty(false) {}
+    explicit KeyValuePair(bool x) : is_empty(x) {}
+    KeyValuePair(const K& k, const V& v) : base_t(k, v), is_empty(false) {}
+    
+    template <class X, class Y>
+    KeyValuePair(const pair<X,Y> &p) : base_t(p), is_empty(false) {}
+    
+    template <class X, class Y>
+    KeyValuePair(const KeyValuePair<X,Y> &p) : base_t(p), is_empty(p.is_empty) {}
+  };
+  
   /**
-   * Set the status message and call progress.
+   * Override Generic KeyValuePair << operator
    */
-  //virtual void setStatus(const string& status) = 0;
-
+  template <typename OS, typename K, typename V>
+  OS &operator<<(OS &os, const KeyValuePair<K, V>& p) {
+    os << "<KeyValuePair: ";
+    if (!p.is_empty) {
+      os << p.first << ", " << p.second;
+    } else {
+      os << "empty";
+    }
+    os << ">";
+    return os;
+  }
+  
+  /********************************************/
+  /****************** BSPJob ******************/
+  /********************************************/
   /**
-   * Register a counter with the given group and name.
+   * A BSPJob defines the properties for a job.
    */
-  //virtual Counter* getCounter(const string& group, const string& name) = 0;
-
+  class BSPJob {
+  public:
+    virtual bool hasKey(const string& key) const = 0;
+    virtual const string& get(const string& key) const = 0;
+    virtual int getInt(const string& key) const = 0;
+    virtual float getFloat(const string& key) const = 0;
+    virtual bool getBoolean(const string& key) const = 0;
+    virtual ~BSPJob() {}
+  };
+  
+  /********************************************/
+  /**************** BSPJobImpl ****************/
+  /********************************************/
+  class BSPJobImpl: public BSPJob {
+  private:
+    map<string, string> values_;
+  public:
+    void set(const string& key, const string& value) {
+      values_[key] = value;
+    }
+    
+    virtual bool hasKey(const string& key) const {
+      return values_.find(key) != values_.end();
+    }
+    
+    virtual const string& get(const string& key) const {
+      map<string,string>::const_iterator itr = values_.find(key);
+      if (itr == values_.end()) {
+        throw Error("Key " + key + " not found in BSPJob");
+      }
+      return itr->second;
+    }
+    
+    virtual int getInt(const string& key) const {
+      const string& val = get(key);
+      return toInt(val);
+    }
+    
+    virtual float getFloat(const string& key) const {
+      const string& val = get(key);
+      return toFloat(val);
+    }
+    
+    virtual bool getBoolean(const string& key) const {
+      const string& val = get(key);
+      return toBool(val);
+    }
+  };
+  
+  /********************************************/
+  /**************** TaskContext ***************/
+  /********************************************/
   /**
-   * Increment the value of the counter with the given amount.
+   * Task context provides the information about the task and job.
    */
-  //virtual void incrementCounter(const Counter* counter, uint64_t amount) = 0;
-  virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
+  class TaskContext {
+  public:
+    /**
+     * Counter to keep track of a property and its value.
+     */
+    class Counter {
+    private:
+      int id_;
+    public:
+      Counter(int counter_id) : id_(counter_id) {}
+      Counter(const Counter& counter) : id_(counter.id_) {}
+      
+      int getId() const { return id_; }
+    };
+    
+    /**
+     * Get the BSPJob for the current task.
+     */
+    virtual const BSPJob* getBSPJob() = 0;
+    
+    /**
+     * Get the current key.
+     * @return the current key
+     */
+    //virtual const string& getInputKey() = 0;
+    
+    /**
+     * Get the current value.
+     * @return the current value
+     */
+    //virtual const string& getInputValue() = 0;
+    
+    /**
+     * Generate an output record
+     */
+    //virtual void emit(const string& key, const string& value) = 0;
+    
+    /**
+     * Mark your task as having made progress without changing the status
+     * message.
+     */
+    //virtual void progress() = 0;
+    
+    /**
+     * Set the status message and call progress.
+     */
+    //virtual void setStatus(const string& status) = 0;
+    
+    /**
+     * Register a counter with the given group and name.
+     */
+    //virtual Counter* getCounter(const string& group, const string& name) = 0;
+    
+    /**
+     * Increment the value of the counter with the given amount.
+     */
+    //virtual void incrementCounter(const Counter* counter, uint64_t amount) = 0;
+    virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
+    
+    virtual ~TaskContext() {}
+  };
+  
+  /********************************************/
+  /************* DownwardProtocol *************/
+  /********************************************/
+  template<class K1, class V1>
+  class DownwardProtocol {
+  public:
+    virtual void start(int protocol_version) = 0;
+    virtual void setBSPJob(vector<string> values) = 0;
+    virtual void setInputTypes(string key_type, string value_type) = 0;
+    
+    virtual void runBsp(bool piped_input, bool piped_output) = 0;
+    virtual void runCleanup(bool piped_input, bool piped_output) = 0;
+    virtual void runSetup(bool piped_input, bool piped_output) = 0;
+    
+    virtual void runPartition(const K1& key, const V1& value, int32_t num_tasks) = 0;
+    
+    virtual void close() = 0;
+    virtual void abort() = 0;
+    virtual ~DownwardProtocol() {}
+  };
   
-  virtual ~TaskContext() {}
-};
+  /********************************************/
+  /************** UpwardProtocol **************/
+  /********************************************/
+  template<typename D>
+  class UpwardProtocol {
+  public:
+    virtual void sendCommand(int32_t cmd) = 0;
     
+    template<class T>
+    void sendCommand(int32_t cmd, T value) {
+      static_cast<D*>(this)->template sendCommand<T>(cmd, value);
+    }
+    
+    template<class T>
+    void sendCommand(int32_t cmd, const T values[], int size) {
+      static_cast<D*>(this)->template sendCommand<T>(cmd, values, size);
+    }
+    
+    template<class T1, class T2>
+    void sendCommand(int32_t cmd, T1 value1, T2 value2) {
+      static_cast<D*>(this)->template sendCommand<T1,T2>(cmd, value1, value2);
+    }
+    
+    template<class T1, class T2>
+    void sendCommand(int32_t cmd, T1 value, const T2 values[], int size) {
+      static_cast<D*>(this)->template sendCommand<T1,T2>(cmd, value, values, size);
+    }
+    
+    //virtual void registerCounter(int id, const string& group, const string& name) = 0;
+    //virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
+    virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
+    virtual ~UpwardProtocol() {}
+  };
+  
+  /* Forward definition of BinaryUpwardProtocol to pass to UpwardProtocol */
+  class BinaryUpwardProtocol;
+  
+  /********************************************/
+  /***************** Protocol *****************/
+  /********************************************/
+  template<typename D>
+  class Protocol {
+  public:
     
-/**
- * SequenceFile Connector
- */
-class SequenceFileConnector {
-public:
-  /**
-   * Open SequenceFile with option "r" or "w"
-   * key and value type of the values stored in the SequenceFile
-   * @return the corresponding fileID
-   */
-  virtual int sequenceFileOpen(const string& path, const string& option, const string& keyType, const string& valueType) = 0;
+    template<class T>
+    T getResult(int32_t expected_response_cmd) {
+      return static_cast<D*>(this)->template getResult<T>(expected_response_cmd);
+    }
+    
+    template<class T>
+    vector<T> getVectorResult(int32_t expected_response_cmd) {
+      return static_cast<D*>(this)->template getVectorResult<T>(expected_response_cmd);
+    }
+    
+    template<class K, class V>
+    KeyValuePair<K,V> getKeyValueResult(int32_t expected_response_cmd) {
+      return static_cast<D*>(this)->template getKeyValueResult<K,V>(expected_response_cmd);
+    }
+    
+    virtual void nextEvent() = 0;
+    virtual UpwardProtocol<BinaryUpwardProtocol>* getUplink() = 0;
+    virtual ~Protocol(){}
+  };
+  
+  /********************************************/
+  /*********** SequenceFileConnector **********/
+  /********************************************/
+  /**
+   * SequenceFile Connector
+   */
+  /* Using Curiously recurring template pattern(CTRP) */
+  /* because virtual template function not possible */
+  template<typename D>
+  class SequenceFileConnector {
+  public:
+    /**
+     * Open SequenceFile with option "r" or "w"
+     * key and value type of the values stored in the SequenceFile
+     * @return the corresponding fileID
+     */
+    virtual int32_t sequenceFileOpen(const string& path, const string& option, const string& key_type, const string& value_type) = 0;
     
-  /**
-   * Read next key/value pair from the SequenceFile with fileID
-   */
-  virtual bool sequenceFileReadNext(int fileID, string& key, string& value) = 0;
-
-  /**
-   * Append the next key/value pair to the SequenceFile with fileID
-   */
-  virtual bool sequenceFileAppend(int fileID, const string& key, const string& value) = 0;
+    /**
+     * Close SequenceFile
+     */
+    virtual bool sequenceFileClose(int32_t file_id) = 0;
     
-  /**
-   * Close SequenceFile
-   */
-  virtual bool sequenceFileClose(int fileID) = 0;
-};    
-
-
-class BSPContext: public TaskContext, public SequenceFileConnector {
-public:
-
-  /**
-   * Access the InputSplit of the mapper.
-   */
-  //virtual const string& getInputSplit() = 0;
-
-  /**
-   * Get the name of the key class of the input to this task.
-   */
-  virtual const string& getInputKeyClass() = 0;
-
-  /**
-   * Get the name of the value class of the input to this task.
-   */
-  virtual const string& getInputValueClass() = 0;
+    /**
+     * Read next key/value pair from the SequenceFile with fileID
+     * Using Curiously recurring template pattern(CTRP)
+     */
+    template<class K, class V>
+    bool sequenceFileReadNext(int32_t file_id, K& key, V& value) {
+      return static_cast<D*>(this)->template sequenceFileReadNext<K,V>(file_id, key, value);
+    }
     
+    /**
+     * Append the next key/value pair to the SequenceFile with fileID
+     * Using Curiously recurring template pattern(CTRP)
+     */
+    template<class K, class V>
+    bool sequenceFileAppend(int32_t file_id, const K& key, const V& value) {
+      return static_cast<D*>(this)->template sequenceFileAppend<K,V>(file_id, key, value);
+    }
+  };
+  
+  /* Forward definition of BSPContextImpl to pass to SequenceFileConnector */
+  template<class K1, class V1, class K2, class V2, class M>
+  class BSPContextImpl;
+  
+  
+  template<class K1, class V1, class K2, class V2, class M>
+  class BSPContext: public TaskContext, public SequenceFileConnector<BSPContextImpl<K1, V1, K2, V2, M> > {
+  public:
     
+    /**
+     * Access the InputSplit of the mapper.
+     */
+    //virtual const string& getInputSplit() = 0;
     
-  /**
-   * Send a data with a tag to another BSPSlave corresponding to hostname.
-   * Messages sent by this method are not guaranteed to be received in a sent
-   * order.
-   */
-  virtual void sendMessage(const string& peerName, const string& msg) = 0;
+    /**
+     * Get the name of the key class of the input to this task.
+     */
+    virtual string getInputKeyClass() = 0;
     
-  /**
-   * @return A message from the peer's received messages queue (a FIFO).
-   */
-  virtual const string& getCurrentMessage() = 0;
+    /**
+     * Get the name of the value class of the input to this task.
+     */
+    virtual string getInputValueClass() = 0;
     
-  /**
-   * @return The number of messages in the peer's received messages queue.
-   */
-  virtual int getNumCurrentMessages() = 0;
+    /**
+     * Send a data with a tag to another BSPSlave corresponding to hostname.
+     * Messages sent by this method are not guaranteed to be received in a sent
+     * order.
+     */
+    virtual void sendMessage(const string& peer_name, const M& msg) = 0;
     
-  /**
-   * Barrier Synchronization.
-   * 
-   * Sends all the messages in the outgoing message queues to the corresponding
-   * remote peers.
-   */
-  virtual void sync() = 0;
+    /**
+     * @return A message from the peer's received messages queue (a FIFO).
+     */
+    virtual M getCurrentMessage() = 0;
     
-  /**
-   * @return the count of current super-step
-   */
-  virtual long getSuperstepCount() = 0;
-     
-  /**
-   * @return the name of this peer in the format "hostname:port".
-   */ 
-  virtual const string& getPeerName() = 0;
+    /**
+     * @return The number of messages in the peer's received messages queue.
+     */
+    virtual int getNumCurrentMessages() = 0;
     
-  /**
-   * @return the name of n-th peer from sorted array by name.
-   */
-  virtual const string& getPeerName(int index) = 0;
+    /**
+     * Barrier Synchronization.
+     *
+     * Sends all the messages in the outgoing message queues to the corresponding
+     * remote peers.
+     */
+    virtual void sync() = 0;
     
-  /**
-   * @return the index of this peer from sorted array by name.
-   */
-  virtual int getPeerIndex() = 0;
+    /**
+     * @return the count of current super-step
+     */
+    virtual long getSuperstepCount() = 0;
     
-  /**
-   * @return the names of all the peers executing tasks from the same job
-   *         (including this peer).
-   */
-  virtual vector<string> getAllPeerNames() = 0;
+    /**
+     * @return the name of this peer in the format "hostname:port".
+     */
+    virtual string getPeerName() = 0;
     
-  /**
-   * @return the number of peers
-   */
-  virtual int getNumPeers() = 0;
+    /**
+     * @return the name of n-th peer from sorted array by name.
+     */
+    virtual string getPeerName(int index) = 0;
     
-  /**
-   * Clears all queues entries.
-   */
-  virtual void clear() = 0;
+    /**
+     * @return the index of this peer from sorted array by name.
+     */
+    virtual int getPeerIndex() = 0;
     
-  /**
-   * Writes a key/value pair to the output collector
-   */
-  virtual void write(const string& key, const string& value) = 0;
+    /**
+     * @return the names of all the peers executing tasks from the same job
+     *         (including this peer).
+     */
+    virtual vector<string> getAllPeerNames() = 0;
     
-  /**
-   * Deserializes the next input key value into the given objects;
-   */
-  virtual bool readNext(string& key, string& value) = 0;
+    /**
+     * @return the number of peers
+     */
+    virtual int getNumPeers() = 0;
+    
+    /**
+     * Clears all queues entries.
+     */
+    virtual void clear() = 0;
+    
+    /**
+     * Writes a key/value pair to the output collector
+     */
+    virtual void write(const K2& key, const V2& value) = 0;
+    
+    /**
+     * Deserializes the next input key value into the given objects;
+     */
+    virtual bool readNext(K1& key, V1& value) = 0;
     
     /**
      * Reads the next key value pair and returns it as a pair. It may reuse a
      * {@link KeyValuePair} instance to save garbage collection time.
-     * 
+     *
      * @return null if there are no records left.
      * @throws IOException
      */
     //public KeyValuePair<K1, V1> readNext() throws IOException;
     
+    /**
+     * Closes the input and opens it right away, so that the file pointer is at
+     * the beginning again.
+     */
+    virtual void reopenInput() = 0;
+    
+  };
+  
+  class Closable {
+  public:
+    virtual void close() {}
+    virtual ~Closable() {}
+  };
+  
   /**
-   * Closes the input and opens it right away, so that the file pointer is at
-   * the beginning again.
+   * The application's BSP class to do bsp.
    */
-  virtual void reopenInput() = 0;
+  template<class K1, class V1, class K2, class V2, class M>
+  class BSP: public Closable {
+  public:
+    /**
+     * This method is called before the BSP method. It can be used for setup
+     * purposes.
+     */
+    virtual void setup(BSPContext<K1, V1, K2, V2, M>& context) = 0;
     
-};
+    /**
+     * This method is your computation method, the main work of your BSP should be
+     * done here.
+     */
+    virtual void bsp(BSPContext<K1, V1, K2, V2, M>& context) = 0;
     
-class Closable {
-public:
-  virtual void close() {}
-  virtual ~Closable() {}
-};
-
-/**
- * The application's BSP class to do bsp.
- */
-class BSP: public Closable {
-public:
+    /**
+     * This method is called after the BSP method. It can be used for cleanup
+     * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+     * case of exceptions.
+     */
+    virtual void cleanup(BSPContext<K1, V1, K2, V2, M>& context) = 0;
+  };
+  
   /**
-   * This method is called before the BSP method. It can be used for setup
-   * purposes.
+   * User code to decide where each key should be sent.
    */
-  virtual void setup(BSPContext& context) = 0;
-   
+  template<class K1, class V1, class K2, class V2, class M>
+  class Partitioner {
+  public:
+    
+    virtual int partition(const K1& key, const V1& value, int32_t num_tasks) = 0;
+    virtual ~Partitioner() {}
+  };
+  
   /**
-   * This method is your computation method, the main work of your BSP should be
-   * done here.
+   * For applications that want to read the input directly for the map function
+   * they can define RecordReaders in C++.
    */
-  virtual void bsp(BSPContext& context) = 0;
+  template<class K, class V>
+  class RecordReader: public Closable {
+  public:
+    virtual bool next(K& key, V& value) = 0;
     
+    /**
+     * The progress of the record reader through the split as a value between
+     * 0.0 and 1.0.
+     */
+    virtual float getProgress() = 0;
+  };
+  
   /**
-   * This method is called after the BSP method. It can be used for cleanup
-   * purposes. Cleanup is guranteed to be called after the BSP runs, even in
-   * case of exceptions.
+   * An object to write key/value pairs as they are emited from the reduce.
    */
-  virtual void cleanup(BSPContext& context) = 0;
-};
-
-/**
- * User code to decide where each key should be sent.
- */
-class Partitioner {
-public:
+  template<class K, class V>
+  class RecordWriter: public Closable {
+  public:
+    virtual void emit(const K& key, const V& value) = 0;
+  };
+  
+  /**
+   * A factory to create the necessary application objects.
+   */
+  template<class K1, class V1, class K2, class V2, class M>
+  class Factory {
+  public:
+    virtual BSP<K1, V1, K2, V2, M>* createBSP(BSPContext<K1, V1, K2, V2, M>& context) const = 0;
     
-    virtual int partition(const string& key,const string& value, int32_t numTasks) = 0;
-    virtual ~Partitioner() {}
-};
+    /**
+     * Create an application partitioner object.
+     * @return the new partitioner or NULL, if the default partitioner should be
+     * used.
+     */
+    virtual Partitioner<K1, V1, K2, V2, M>* createPartitioner(BSPContext<K1, V1, K2, V2, M>& context) const {
+      return NULL;
+    }
     
-/**
- * For applications that want to read the input directly for the map function
- * they can define RecordReaders in C++.
- */
-class RecordReader: public Closable {
-public:
-  virtual bool next(string& key, string& value) = 0;
-
+    /**
+     * Create an application record reader.
+     * @return the new RecordReader or NULL, if the Java RecordReader should be
+     *    used.
+     */
+    virtual RecordReader<K1,V1>* createRecordReader(BSPContext<K1, V1, K2, V2, M>& context) const {
+      return NULL;
+    }
+    
+    /**
+     * Create an application record writer.
+     * @return the new RecordWriter or NULL, if the Java RecordWriter should be
+     *    used.
+     */
+    virtual RecordWriter<K2,V2>* createRecordWriter(BSPContext<K1, V1, K2, V2, M>& context) const {
+      return NULL;
+    }
+    
+    virtual ~Factory() {}
+  };
+  
   /**
-   * The progress of the record reader through the split as a value between
-   * 0.0 and 1.0.
+   * Generic toString
    */
-  virtual float getProgress() = 0;
-};
-
-/**
- * An object to write key/value pairs as they are emited from the reduce.
- */
-class RecordWriter: public Closable {
-public:
-  virtual void emit(const string& key,
-                    const string& value) = 0;
-};
-
-/**
- * A factory to create the necessary application objects.
- */
-class Factory {
-public:
-  virtual BSP* createBSP(BSPContext& context) const = 0;
-
+  template <class T>
+  string toString(const T& t)
+  {
+    std::ostringstream oss;
+    oss << t;
+    return oss.str();
+  }
   /**
-   * Create an application partitioner object.
-   * @return the new partitioner or NULL, if the default partitioner should be 
-   * used.
+   * Generic toString template specializations
    */
-  virtual Partitioner* createPartitioner(BSPContext& context) const {
-    return NULL;
+  template <> string toString<string>(const string& t) {
+    return t;
   }
-    
+  
   /**
-   * Create an application record reader.
-   * @return the new RecordReader or NULL, if the Java RecordReader should be
-   *    used.
+   * Generic serialization
    */
-  virtual RecordReader* createRecordReader(BSPContext& context) const {
-    return NULL; 
+  template<class T>
+  void serialize(T t, OutStream& stream) {
+    serializeString(toString<T>(t), stream);
   }
-
+  
   /**
-   * Create an application record writer.
-   * @return the new RecordWriter or NULL, if the Java RecordWriter should be
-   *    used.
+   * Generic serialization template specializations
    */
-  virtual RecordWriter* createRecordWriter(BSPContext& context) const {
-    return NULL;
+  template <> void serialize<int32_t>(int32_t t, OutStream& stream) {
+    serializeInt(t, stream);
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BinaryProtocol::serializeInt '%d'\n", t);
+    }
   }
-
-  virtual ~Factory() {}
-};
-
-/**
- * Run the assigned task in the framework.
- * The user's main function should set the various functions using the 
- * set* functions above and then call this.
- * @return true, if the task succeeded.
- */
-bool runTask(const Factory& factory);
-
+  template <> void serialize<int64_t>(int64_t t, OutStream& stream) {
+    serializeLong(t, stream);
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BinaryProtocol::serializeLong '%ld'\n", (long)t);
+    }
+  }
+  template <> void serialize<float>(float t, OutStream& stream) {
+    serializeFloat(t, stream);
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BinaryProtocol::serializeFloat '%f'\n", t);
+    }
+  }
+  template <> void serialize<double>(double t, OutStream& stream) {
+    serializeDouble(t, stream);
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BinaryProtocol::serializeDouble '%f'\n", t);
+    }
+  }
+  template <> void serialize<string>(string t, OutStream& stream) {
+    serializeString(t, stream);
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BinaryProtocol::serializeString '%s'\n", t.c_str());
+    }
+  }
+  
+  /**
+   * Generic deserialization
+   */
+  template<class T>
+  T deserialize(InStream& stream) {
+    string str = "Not able to deserialize type: ";
+    throw Error(str.append(typeid(T).name()));
+  }
+  
+  /**
+   * Generic deserialization template specializations
+   */
+  template <> int32_t deserialize<int32_t>(InStream& stream) {
+    int32_t result = deserializeInt(stream);
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeInt result: '%d'\n",
+              result);
+    }
+    return result;
+  }
+  template <> int64_t deserialize<int64_t>(InStream& stream) {
+    int64_t result = deserializeLong(stream);
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeLong result: '%ld'\n",
+              (long)result);
+    }
+    return result;
+  }
+  template <> float deserialize<float>(InStream& stream) {
+    float result = deserializeFloat(stream);
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeFloat result: '%f'\n",
+              result);
+    }
+    return result;
+  }
+  template <> double deserialize<double>(InStream& stream) {
+    double result = deserializeDouble(stream);
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeDouble result: '%f'\n",
+              result);
+    }
+    return result;
+  }
+  template <> string deserialize<string>(InStream& stream) {
+    string result = deserializeString(stream);
+    
+    if (logging) {
+      if (result.empty()) {
+        fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeString returns EMPTY string! Maybe wrong Serialization?\n");
+      } else {
+        fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeString result: '%s'\n",
+                ((result.length()<10)?result.c_str():result.substr(0,9).append("...").c_str()));
+      }
+    }
+    return result;
+  }
+  
+  /**
+   * Run the assigned task in the framework.
+   * The user's main function should set the various functions using the
+   * set* functions above and then call this.
+   * @return true, if the task succeeded.
+   */
+  template<class K1, class V1, class K2, class V2, class M>
+  bool runTask(const Factory<K1, V1, K2, V2, M>& factory);
+  
+  // Include implementation in header because of templates
+  #include "../../impl/Pipes.cc"
 }
 
 #endif

Modified: hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/api/hama/TemplateFactory.hh?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh (original)
+++ hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh Sat Nov 23 08:41:09 2013
@@ -19,27 +19,31 @@
 #define HAMA_PIPES_TEMPLATE_FACTORY_HH
 
 namespace HamaPipes {
-
-  template <class BSP>
-  class TemplateFactory2: public Factory {
+  
+  /* Generic template factory specification */
+  template <class BSP, class K1, class V1, class K2, class V2, class M>
+  class TemplateFactory2: public Factory<K1, V1, K2, V2, M> {
   public:
-    BSP* createBSP(BSPContext& context) const {
+    BSP* createBSP(BSPContext<K1, V1, K2, V2, M>& context) const {
       return new BSP(context);
     }
   };
-
-  template <class BSP, class partitioner=void>
-  class TemplateFactory: public TemplateFactory2<BSP> {
+  
+  /* Template factory including partitioner specification */
+  template <class BSP, class K1, class V1, class K2, class V2, class M, class partitioner=void>
+  class TemplateFactory: public TemplateFactory2<BSP, K1, V1, K2, V2, M> {
   public:
-      partitioner* createPartitioner(BSPContext& context) const {
-          return new partitioner(context);
-      }
+    partitioner* createPartitioner(BSPContext<K1, V1, K2, V2, M>& context) const {
+      return new partitioner(context);
+    }
   };
-  template <class BSP>
-  class TemplateFactory<BSP,void>
-      : public TemplateFactory2<BSP> {
+  
+  /* Template factory without partitioner specification */
+  template <class BSP, class K1, class V1, class K2, class V2, class M>
+  class TemplateFactory<BSP, K1, V1, K2, V2, M, void>
+  : public TemplateFactory2<BSP, K1, V1, K2, V2, M> {
   };
-    
+  
 }
 
 #endif