You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/11/23 09:41:10 UTC
svn commit: r1544761 [1/3] - in /hama/trunk: ./ c++/ c++/src/
c++/src/main/native/examples/ c++/src/main/native/examples/conf/
c++/src/main/native/examples/impl/ c++/src/main/native/examples/input/
c++/src/main/native/pipes/api/hama/ c++/src/main/nativ...
Author: millecker
Date: Sat Nov 23 08:41:09 2013
New Revision: 1544761
URL: http://svn.apache.org/r1544761
Log:
HAMA-815: Hama Pipes uses C++ templates
Added:
hama/trunk/c++/README.txt
hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java
hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java
hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VectorWritableMatrixGen.java
Removed:
hama/trunk/c++/src/main/native/examples/input/
hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc
hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplicable.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/c++/src/CMakeLists.txt
hama/trunk/c++/src/main/native/examples/README.txt
hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
hama/trunk/c++/src/main/native/examples/conf/piestimator.xml
hama/trunk/c++/src/main/native/examples/conf/summation.xml
hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.cc
hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.hh
hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc
hama/trunk/c++/src/main/native/examples/impl/piestimator.cc
hama/trunk/c++/src/main/native/examples/impl/summation.cc
hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh
hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh
hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc
hama/trunk/c++/src/main/native/utils/impl/StringUtils.cc
hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java
hama/trunk/core/pom.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/Generator.java
hama/trunk/src/assemble/bin.xml
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sat Nov 23 08:41:09 2013
@@ -2,6 +2,10 @@ Hama Change Log
Release 0.7.0 (unreleased changes)
+ NEW FEATURES
+
+ HAMA-815: Hama Pipes uses C++ templates (Martin Illecker)
+
IMPROVEMENTS
HAMA-699: Add commons module (Martin Illecker)
@@ -9,7 +13,7 @@ Release 0.7.0 (unreleased changes)
Release 0.6.3 - October 11, 2013
NEW FEATURES
-
+
HAMA-807: Make aggregators skip supersteps (Anastasis Andronidis)
HAMA-800: Hama Pipes Examples (Martin Illecker)
HAMA-804: Create NeuralNetwork Example (Yexi Jiang)
Added: hama/trunk/c++/README.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/README.txt?rev=1544761&view=auto
==============================================================================
--- hama/trunk/c++/README.txt (added)
+++ hama/trunk/c++/README.txt Sat Nov 23 08:41:09 2013
@@ -0,0 +1,25 @@
+####################################################################
+# Hama Pipes README #
+####################################################################
+# Hama Pipes includes the following three examples: #
+# - 1) Summation #
+# - 2) PiEstimator #
+# - 3) MatrixMultiplication #
+# in c++/src/main/native/examples #
+# Please see c++/src/main/native/examples/README.txt #
+####################################################################
+
+Please use the following command to compile:
+
+% g++ -m64 -Ic++/src/main/native/utils/api \
+ -Ic++/src/main/native/pipes/api \
+ -Lc++/target/native \
+ -lhadooputils -lpthread \
+ PROGRAM.cc \
+ -o PROGRAM \
+ -g -Wall -O2
+
+Attention: The paths have to be adjusted, if you are not operating
+ in the Hama source folder.
+
+####################################################################
Modified: hama/trunk/c++/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/CMakeLists.txt?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/CMakeLists.txt (original)
+++ hama/trunk/c++/src/CMakeLists.txt Sat Nov 23 08:41:09 2013
@@ -34,7 +34,7 @@ include(JNIFlags.cmake NO_POLICY_SCOPE)
function(output_directory TGT DIR)
SET_TARGET_PROPERTIES(${TGT} PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
- SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+ SET_TARGET_PROPERTIES(${TGT} PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
SET_TARGET_PROPERTIES(${TGT} PROPERTIES
LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
@@ -49,15 +49,15 @@ include_directories(
# Example programs
add_executable(summation main/native/examples/impl/summation.cc)
-target_link_libraries(summation hamapipes hadooputils)
+target_link_libraries(summation hadooputils)
output_directory(summation examples)
add_executable(piestimator main/native/examples/impl/piestimator.cc)
-target_link_libraries(piestimator hamapipes hadooputils)
+target_link_libraries(piestimator hadooputils)
output_directory(piestimator examples)
add_executable(matrixmultiplication main/native/examples/impl/matrixmultiplication.cc)
-target_link_libraries(matrixmultiplication DenseDoubleVector hamapipes hadooputils)
+target_link_libraries(matrixmultiplication DenseDoubleVector hadooputils)
output_directory(matrixmultiplication examples)
add_library(DenseDoubleVector
@@ -70,11 +70,11 @@ add_library(hadooputils STATIC
main/native/utils/impl/SerialUtils.cc
)
-add_library(hamapipes STATIC
- main/native/pipes/impl/HamaPipes.cc
-)
+#add_library(hamapipes STATIC
+# main/native/pipes/impl/Pipes.cc
+#)
-target_link_libraries(hamapipes
+#target_link_libraries(hamapipes
# ${OPENSSL_LIBRARIES}
- pthread
-)
+# pthread
+#)
Modified: hama/trunk/c++/src/main/native/examples/README.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/README.txt?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/README.txt (original)
+++ hama/trunk/c++/src/main/native/examples/README.txt Sat Nov 23 08:41:09 2013
@@ -1,113 +1,127 @@
####################################################################
-# Hama Pipes Examples #
+# Hama Pipes Examples README #
####################################################################
-# - Summation #
-# - PiEstimator #
-# - MatrixMultiplication #
+# Hama Pipes includes the following three examples: #
+# - 1) Summation #
+# - 2) PiEstimator #
+# - 3) MatrixMultiplication #
####################################################################
To run the examples, first compile them:
-% mvn install
-
-and then copy the binaries to dfs:
-
-% hadoop fs -put c++/target/native/examples/summation \
- /examples/bin/summation
-
-create an input directory with text files:
-
-% hadoop fs -put my-data in-dir
-
-and run the word count example:
-
-% hama pipes \
- -conf c++/src/main/native/examples/conf/summation.xml \
- -input in-dir -output out-dir
+% mvn install
####################################################################
+# 1) Summation Example #
+####################################################################
-# Summation Example
+First copy summation binary to dfs
% hadoop fs -put c++/target/native/examples/summation \
/examples/bin/summation
+Generate input data and copy to dfs
+
% echo -e "key1\t1.0\nkey2\t2.0\nkey3\t3.0\nkey4\t4.0\n\
key5\t5.0\nkey6\t6.0\nkey7\t7.0\nkey8\t8.0\nkey9\t9.0\n\
key10\t10.0" > summation.txt && hadoop fs -put summation.txt \
/examples/input/summation/input.txt && rm summation.txt
+Run summation example
+
% hama pipes \
-conf c++/src/main/native/examples/conf/summation.xml \
-input /examples/input/summation \
-output /examples/output/summation
+View input and output data
+
% hadoop fs -cat /examples/input/summation/input.txt
-% hadoop fs -cat /examples/output/summation/part-00000
+% hama seqdumper -seqFile /examples/output/summation/part-00000
+
+You should see
+# Input Path: /examples/output/summation/part-00000
+# Key class: class org.apache.hadoop.io.Text
+# Value Class: class org.apache.hadoop.io.DoubleWritable
+# Key: Sum: Value: 55.0
+# Count: 1
+
+Delete input and output folder
% hadoop fs -rmr /examples/input/summation
% hadoop fs -rmr /examples/output/summation
####################################################################
+# 2) PiEstimator Example #
+####################################################################
-# PiEstimator Example
+First copy piestimator binary to dfs
% hadoop fs -put c++/target/native/examples/piestimator \
/examples/bin/piestimator
+Run piestimator example because no input data is needed
+
% hama pipes \
-conf c++/src/main/native/examples/conf/piestimator.xml \
-output /examples/output/piestimator
-% hadoop fs -cat /examples/output/piestimator/part-00001
+View output data
+
+% hama seqdumper -seqFile /examples/output/piestimator/part-00001
+
+You should see
+# Input Path: /examples/output/piestimator/part-00001
+# Key class: class org.apache.hadoop.io.Text
+# Value Class: class org.apache.hadoop.io.DoubleWritable
+# Key: Estimated value of PI: Value: 3.139116
+# Count: 1
+
+Delete output folder
% hadoop fs -rmr /examples/output/piestimator
####################################################################
+# 3) MatrixMultiplication Example #
+####################################################################
-# MatrixMultiplication Example
+First copy matrixmultiplication binary to dfs
% hadoop fs -put c++/target/native/examples/matrixmultiplication \
/examples/bin/matrixmultiplication
-% hadoop fs -put c++/src/main/native/examples/input/MatrixA.seq \
- /examples/input/matrixmultiplication/MatrixA.seq
+Generate input data
-% hadoop fs -put \
- c++/src/main/native/examples/input/MatrixB_transposed.seq \
- /examples/input/matrixmultiplication/MatrixB_transposed.seq
+% hama jar dist/target/hama-*/hama-*/hama-examples-*.jar \
+ gen vectorwritablematrix 4 4 \
+ /examples/input/matrixmultiplication/MatrixA.seq \
+ false true 0 10 0
+
+% hama jar dist/target/hama-*/hama-*/hama-examples-*.jar \
+ gen vectorwritablematrix 4 4 \
+ /examples/input/matrixmultiplication/MatrixB_transposed.seq \
+ false true 0 10 0
+
+Run matrixmultiplication example
% hama pipes \
-conf c++/src/main/native/examples/conf/matrixmultiplication.xml \
-output /examples/output/matrixmultiplication
+View input and output data
+
% hama seqdumper \
-seqFile /examples/input/matrixmultiplication/MatrixA.seq
% hama seqdumper \
- -seqFile /examples/input/matrixmultiplication/MatrixB_transposed.seq
+ -seqFile \
+ /examples/input/matrixmultiplication/MatrixB_transposed.seq
-% hadoop fs -cat /examples/output/matrixmultiplication/part-00000
+% hama seqdumper -seqFile \
+ /examples/output/matrixmultiplication/part-00001
-# Matrix A
-# 9 4 1 9
-# 1 8 6 3
-# 8 3 3 9
-# 7 1 9 6
-
-# Matrix B (not transposed)
-# 2 1 6 5
-# 7 8 9 5
-# 2 1 5 8
-# 7 4 4 9
-
-# Resulting Matrix C
-# 111.0 78.0 131.0 154.0
-# 91.0 83.0 120.0 120.0
-# 106.0 71.0 126.0 160.0
-# 81.0 48.0 120.0 166.0
+Delete output folder
% hadoop fs -rmr /examples/output/matrixmultiplication
####################################################################
-
Modified: hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/matrixmultiplication.xml?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml Sat Nov 23 08:41:09 2013
@@ -30,10 +30,6 @@
<value>true</value>
</property>
<property>
- <name>bsp.input.format.class</name>
- <value>org.apache.hama.bsp.SequenceFileInputFormat</value>
- </property>
- <property>
<name>bsp.input.dir</name>
<value>/examples/input/matrixmultiplication/MatrixA.seq</value>
</property>
@@ -42,15 +38,47 @@
<value>/examples/input/matrixmultiplication/MatrixB_transposed.seq</value>
</property>
<property>
- <name>hama.pipes.logging</name>
- <value>false</value>
+ <name>bsp.input.format.class</name>
+ <value>org.apache.hama.bsp.SequenceFileInputFormat</value>
+ </property>
+ <property>
+ <name>bsp.input.key.class</name>
+ <value>org.apache.hadoop.io.IntWritable</value>
+ </property>
+ <property>
+ <name>bsp.input.value.class</name>
+ <value>org.apache.hama.commons.io.PipesVectorWritable</value>
+ </property>
+ <property>
+ <name>bsp.output.format.class</name>
+ <value>org.apache.hama.bsp.SequenceFileOutputFormat</value>
+ </property>
+ <property>
+ <name>bsp.output.key.class</name>
+ <value>org.apache.hadoop.io.IntWritable</value>
+ </property>
+ <property>
+ <name>bsp.output.value.class</name>
+ <value>org.apache.hama.commons.io.PipesVectorWritable</value>
+ </property>
+ <property>
+ <name>bsp.message.class</name>
+ <value>org.apache.hama.commons.io.PipesKeyValueWritable</value>
</property>
<property>
- <name>hama.messenger.queue.class</name>
- <value>org.apache.hama.bsp.message.queue.SortedMessageQueue</value>
+ <name>hama.messenger.xfer.queue.class</name>
+ <value>org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol</value>
</property>
<property>
<name>bsp.input.partitioner.class</name>
<value>org.apache.hama.pipes.PipesPartitioner</value>
</property>
+ <property>
+ <name>hama.pipes.logging</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>bsp.peers.num</name>
+ <value>3</value>
+ </property>
</configuration>
Modified: hama/trunk/c++/src/main/native/examples/conf/piestimator.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/piestimator.xml?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/piestimator.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/piestimator.xml Sat Nov 23 08:41:09 2013
@@ -35,7 +35,19 @@
</property>
<property>
<name>bsp.output.format.class</name>
- <value>org.apache.hama.bsp.TextOutputFormat</value>
+ <value>org.apache.hama.bsp.SequenceFileOutputFormat</value>
+ </property>
+ <property>
+ <name>bsp.output.key.class</name>
+ <value>org.apache.hadoop.io.Text</value>
+ </property>
+ <property>
+ <name>bsp.output.value.class</name>
+ <value>org.apache.hadoop.io.DoubleWritable</value>
+ </property>
+ <property>
+ <name>bsp.message.class</name>
+ <value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>hama.pipes.logging</name>
Modified: hama/trunk/c++/src/main/native/examples/conf/summation.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/summation.xml?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/summation.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/summation.xml Sat Nov 23 08:41:09 2013
@@ -34,8 +34,28 @@
<value>org.apache.hama.bsp.KeyValueTextInputFormat</value>
</property>
<property>
+ <name>bsp.input.key.class</name>
+ <value>org.apache.hadoop.io.Text</value>
+ </property>
+ <property>
+ <name>bsp.input.value.class</name>
+ <value>org.apache.hadoop.io.Text</value>
+ </property>
+ <property>
<name>bsp.output.format.class</name>
- <value>org.apache.hama.bsp.TextOutputFormat</value>
+ <value>org.apache.hama.bsp.SequenceFileOutputFormat</value>
+ </property>
+ <property>
+ <name>bsp.output.key.class</name>
+ <value>org.apache.hadoop.io.Text</value>
+ </property>
+ <property>
+ <name>bsp.output.value.class</name>
+ <value>org.apache.hadoop.io.DoubleWritable</value>
+ </property>
+ <property>
+ <name>bsp.message.class</name>
+ <value>org.apache.hadoop.io.DoubleWritable</value>
</property>
<property>
<name>hama.pipes.logging</name>
Modified: hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/DenseDoubleVector.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.cc Sat Nov 23 08:41:09 2013
@@ -32,25 +32,26 @@ using HadoopUtils::Splitter;
namespace math {
- DenseDoubleVector::DenseDoubleVector(int len) : size(len), vector(new double[len]) {
+ DenseDoubleVector::DenseDoubleVector(int len) : vector(new double[len]), size(len) {
}
- DenseDoubleVector::DenseDoubleVector(int len, double val) : size(len), vector(new double[len]) {
+ DenseDoubleVector::DenseDoubleVector(int len, double val) : vector(new double[len]), size(len) {
for (int i=0; i<len; i++)
vector[i] = val;
}
- DenseDoubleVector::DenseDoubleVector(double arr[]) : vector(arr) {
+ DenseDoubleVector::DenseDoubleVector(int len, double arr[]) : vector(arr), size(len) {
}
-
+
DenseDoubleVector::DenseDoubleVector(const string values) {
Splitter split ( values, "," );
size = split.size();
vector = new double[size];
- for ( Splitter::size_type i = 0; i < split.size(); i++ )
+ for ( Splitter::size_type i = 0; i < split.size(); i++ ) {
vector[i] = HadoopUtils::toDouble(split[i]);
+ }
}
DenseDoubleVector::~DenseDoubleVector() {
Modified: hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/DenseDoubleVector.hh?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.hh (original)
+++ hama/trunk/c++/src/main/native/examples/impl/DenseDoubleVector.hh Sat Nov 23 08:41:09 2013
@@ -32,7 +32,7 @@ namespace math {
/// Creates a new vector with the given length and default value.
DenseDoubleVector(int length, double val);
// Creates a new vector with the given array.
- DenseDoubleVector(double arr[]);
+ DenseDoubleVector(int length, double arr[]);
DenseDoubleVector(const string values);
~DenseDoubleVector(); // Destructor
Modified: hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/matrixmultiplication.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc Sat Nov 23 08:41:09 2013
@@ -24,119 +24,121 @@
#include <time.h>
#include <math.h>
#include <string>
-#include <string>
#include <iostream>
#include <sstream>
using std::string;
-using std::cout;
+using std::vector;
using HamaPipes::BSP;
using HamaPipes::BSPJob;
using HamaPipes::Partitioner;
using HamaPipes::BSPContext;
-using namespace HadoopUtils;
using math::DenseDoubleVector;
-class MatrixMultiplicationBSP: public BSP {
+class MatrixMultiplicationBSP: public BSP<int,string,int,string,string> {
private:
- string masterTask;
- int seqFileID;
- string HAMA_MAT_MULT_B_PATH;
+ string master_task_;
+ int seq_file_id_;
+ string HAMA_MAT_MULT_B_PATH_;
+
public:
- MatrixMultiplicationBSP(BSPContext& context) {
- seqFileID = 0;
- HAMA_MAT_MULT_B_PATH = "hama.mat.mult.B.path";
+ MatrixMultiplicationBSP(BSPContext<int,string,int,string,string>& context) {
+ seq_file_id_ = 0;
+ HAMA_MAT_MULT_B_PATH_ = "hama.mat.mult.B.path";
}
- void setup(BSPContext& context) {
+ void setup(BSPContext<int,string,int,string,string>& context) {
// Choose one as a master
- masterTask = context.getPeerName(context.getNumPeers() / 2);
+ master_task_ = context.getPeerName(context.getNumPeers() / 2);
reopenMatrixB(context);
}
- void bsp(BSPContext& context) {
+ void bsp(BSPContext<int,string,int,string,string>& context) {
- string aRowKey;
- string aRowVectorStr;
+ int a_row_key = 0;
+ string a_row_vector_str;
// while for each row of matrix A
- while(context.readNext(aRowKey, aRowVectorStr)) {
+ while(context.readNext(a_row_key, a_row_vector_str)) {
+
+ DenseDoubleVector *a_row_vector = new DenseDoubleVector(a_row_vector_str);
- DenseDoubleVector *aRowVector = new DenseDoubleVector(aRowVectorStr);
- DenseDoubleVector *colValues = NULL;
+ int b_col_key = 0;
+ string b_col_vector_str;
- string bColKey;
- string bColVectorStr;
+ // dynamic column values, depend on matrix B cols
+ vector<double> col_values;
// while for each col of matrix B
- while (context.sequenceFileReadNext(seqFileID,bColKey,bColVectorStr)) {
+ while (context.sequenceFileReadNext<int,string>(seq_file_id_, b_col_key, b_col_vector_str)) {
- DenseDoubleVector *bColVector = new DenseDoubleVector(bColVectorStr);
+ DenseDoubleVector *b_col_vector = new DenseDoubleVector(b_col_vector_str);
- if (colValues == NULL)
- colValues = new DenseDoubleVector(bColVector->getDimension());
+ double dot = a_row_vector->dot(b_col_vector);
- double dot = aRowVector->dot(bColVector);
-
- colValues->set(toInt(bColKey), dot);
+ col_values.push_back(dot);
}
+ DenseDoubleVector *col_values_vector = new DenseDoubleVector(col_values.size(), col_values.data());
+
// Submit one calculated row
+ // :key:value1,value2,value3
std::stringstream message;
- message << aRowKey << ":" << colValues->toString();
- context.sendMessage(masterTask, message.str());
+ message << ":" << a_row_key << ":" << col_values_vector->toString();
+ context.sendMessage(master_task_, message.str());
reopenMatrixB(context);
}
- context.sequenceFileClose(seqFileID);
+ context.sequenceFileClose(seq_file_id_);
context.sync();
}
- void cleanup(BSPContext& context) {
- if (context.getPeerName().compare(masterTask)==0) {
+ void cleanup(BSPContext<int,string,int,string,string>& context) {
+ if (context.getPeerName().compare(master_task_)==0) {
- int msgCount = context.getNumCurrentMessages();
+ int msg_count = context.getNumCurrentMessages();
- for (int i=0; i<msgCount; i++) {
+ for (int i=0; i < msg_count; i++) {
+ // :key:value1,value2,value3
string received = context.getCurrentMessage();
- //key:value1,value2,value3
- int pos = (int)received.find(":");
- string key = received.substr(0,pos);
- string values = received.substr(pos+1,received.length());
+ string key_value_str = received.substr(1);
+ int pos = (int)key_value_str.find(received.substr(0,1));
+ int key = HadoopUtils::toInt(key_value_str.substr(0,pos));
+ string values = key_value_str.substr(pos+1);
context.write(key, values);
}
}
}
- void reopenMatrixB(BSPContext& context) {
- if (seqFileID!=0) {
- context.sequenceFileClose(seqFileID);
+ void reopenMatrixB(BSPContext<int,string,int,string,string>& context) {
+ if (seq_file_id_!=0) {
+ context.sequenceFileClose(seq_file_id_);
}
const BSPJob* job = context.getBSPJob();
- string path = job->get(HAMA_MAT_MULT_B_PATH);
+ string path = job->get(HAMA_MAT_MULT_B_PATH_);
- seqFileID = context.sequenceFileOpen(path,"r",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hama.ml.writable.VectorWritable");
+ seq_file_id_ = context.sequenceFileOpen(path,"r",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hama.commons.io.PipesVectorWritable");
}
};
-class MatrixRowPartitioner: public Partitioner {
+class MatrixRowPartitioner: public Partitioner<int,string,int,string,string> {
public:
- MatrixRowPartitioner(BSPContext& context) { }
+ MatrixRowPartitioner(BSPContext<int,string,int,string,string>& context) { }
- int partition(const string& key,const string& value, int32_t numTasks) {
- return toInt(key) % numTasks;
+ int partition(const int& key,const string& value, int32_t num_tasks) {
+ return key % num_tasks;
}
};
int main(int argc, char *argv[]) {
- return HamaPipes::runTask(HamaPipes::TemplateFactory<MatrixMultiplicationBSP,MatrixRowPartitioner>());
+ return HamaPipes::runTask<int,string,int,string,string>(HamaPipes::TemplateFactory<MatrixMultiplicationBSP,int,string,int,string,string,MatrixRowPartitioner>());
}
Modified: hama/trunk/c++/src/main/native/examples/impl/piestimator.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/piestimator.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/piestimator.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/piestimator.cc Sat Nov 23 08:41:09 2013
@@ -33,31 +33,32 @@ using HamaPipes::BSP;
using HamaPipes::BSPContext;
using namespace HadoopUtils;
-class PiEstimatorBSP: public BSP {
-private:
- string masterTask;
- long iterations; // iterations_per_bsp_task
-public:
- PiEstimatorBSP(BSPContext& context) {
- iterations = 1000000L;
+class PiEstimatorBSP: public BSP<string,string,string,double,int> {
+ private:
+ string master_task_;
+ long iterations_; // iterations_per_bsp_task
+
+ public:
+ PiEstimatorBSP(BSPContext<string,string,string,double,int>& context) {
+ iterations_ = 1000000L;
}
inline double closed_interval_rand(double x0, double x1) {
return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
}
- void setup(BSPContext& context) {
+ void setup(BSPContext<string,string,string,double,int>& context) {
// Choose one as a master
- masterTask = context.getPeerName(context.getNumPeers() / 2);
+ master_task_ = context.getPeerName(context.getNumPeers() / 2);
}
- void bsp(BSPContext& context) {
+ void bsp(BSPContext<string,string,string,double,int>& context) {
/* initialize random seed */
srand(time(NULL));
int in = 0;
- for (long i = 0; i < iterations; i++) {
+ for (long i = 0; i < iterations_; i++) {
double x = 2.0 * closed_interval_rand(0, 1) - 1.0;
double y = 2.0 * closed_interval_rand(0, 1) - 1.0;
if (sqrt(x * x + y * y) < 1.0) {
@@ -65,28 +66,26 @@ public:
}
}
- context.sendMessage(masterTask, toString(in));
+ context.sendMessage(master_task_, in);
context.sync();
}
- void cleanup(BSPContext& context) {
- if (context.getPeerName().compare(masterTask)==0) {
+ void cleanup(BSPContext<string,string,string,double,int>& context) {
+ if (context.getPeerName().compare(master_task_)==0) {
- long totalHits = 0;
- int msgCount = context.getNumCurrentMessages();
- string received;
- for (int i=0; i<msgCount; i++) {
- string received = context.getCurrentMessage();
- totalHits += toInt(received);
+ long total_hits = 0;
+ int msg_count = context.getNumCurrentMessages();
+ for (int i=0; i < msg_count; i++) {
+ total_hits += context.getCurrentMessage();
}
- double pi = 4.0 * totalHits / (msgCount * iterations);
- context.write("Estimated value of PI is", toString(pi));
+ double pi = 4.0 * total_hits / (msg_count * iterations_);
+ context.write("Estimated value of PI", pi);
}
}
};
int main(int argc, char *argv[]) {
- return HamaPipes::runTask(HamaPipes::TemplateFactory<PiEstimatorBSP>());
+ return HamaPipes::runTask<string,string,string,double,int>(HamaPipes::TemplateFactory<PiEstimatorBSP,string,string,string,double,int>());
}
Modified: hama/trunk/c++/src/main/native/examples/impl/summation.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/summation.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/summation.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/summation.cc Sat Nov 23 08:41:09 2013
@@ -29,46 +29,46 @@ using std::string;
using HamaPipes::BSP;
using HamaPipes::BSPContext;
-class SummationBSP: public BSP {
-private:
- string masterTask;
-public:
- SummationBSP(BSPContext& context) { }
+class SummationBSP: public BSP<string,string,string,double,double> {
+ private:
+ string master_task_;
- void setup(BSPContext& context) {
+ public:
+ SummationBSP(BSPContext<string,string,string,double,double>& context) { }
+
+ void setup(BSPContext<string,string,string,double,double>& context) {
// Choose one as a master
- masterTask = context.getPeerName(context.getNumPeers() / 2);
+ master_task_ = context.getPeerName(context.getNumPeers() / 2);
}
- void bsp(BSPContext& context) {
+ void bsp(BSPContext<string,string,string,double,double>& context) {
- double intermediateSum = 0.0;
+ double intermediate_sum = 0.0;
string key;
string value;
while(context.readNext(key,value)) {
- intermediateSum += HadoopUtils::toDouble(value);
+ intermediate_sum += HadoopUtils::toDouble(value);
}
- context.sendMessage(masterTask, HadoopUtils::toString(intermediateSum));
+ context.sendMessage(master_task_, intermediate_sum);
context.sync();
}
- void cleanup(BSPContext& context) {
- if (context.getPeerName().compare(masterTask)==0) {
+ void cleanup(BSPContext<string,string,string,double,double>& context) {
+ if (context.getPeerName().compare(master_task_)==0) {
double sum = 0.0;
- int msgCount = context.getNumCurrentMessages();
- for (int i=0; i<msgCount; i++) {
- string received = context.getCurrentMessage();
- sum += HadoopUtils::toDouble(received);
+ int msg_count = context.getNumCurrentMessages();
+ for (int i=0; i < msg_count; i++) {
+ sum += context.getCurrentMessage();
}
- context.write("Sum", HadoopUtils::toString(sum));
+ context.write("Sum", sum);
}
}
};
int main(int argc, char *argv[]) {
- return HamaPipes::runTask(HamaPipes::TemplateFactory<SummationBSP>());
+ return HamaPipes::runTask<string,string,string,double,double>(HamaPipes::TemplateFactory<SummationBSP,string,string,string,double,double>());
}
Modified: hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/api/hama/Pipes.hh?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh (original)
+++ hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh Sat Nov 23 08:41:09 2013
@@ -18,357 +18,709 @@
#ifndef HAMA_PIPES_HH
#define HAMA_PIPES_HH
-#ifdef SWIG
-%module (directors="1") HamaPipes
-%include "std_string.i"
-%feature("director") BSP;
-%feature("director") Partitioner;
-%feature("director") RecordReader;
-%feature("director") RecordWriter;
-%feature("director") Factory;
-#else
+#include <errno.h>
+#include <map>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <sstream> /* ostringstream */
+#include <stdint.h>
+#include <stdio.h> /* printf */
+#include <stdlib.h> /* getenv */
#include <string>
+#include <string.h>
+#include <strings.h>
+#include <sys/socket.h>
+#include <typeinfo> /* typeid */
+#include <unistd.h> /* sleep */
#include <vector>
-#endif
-#include <stdint.h>
+#include "hadoop/SerialUtils.hh"
+#include "hadoop/StringUtils.hh"
+#define stringify( name ) # name
+
+using std::map;
using std::string;
using std::vector;
+using std::pair;
-namespace HamaPipes {
+using namespace HadoopUtils;
-/**
- * This interface defines the interface between application code and the
- * foreign code interface to Hadoop Map/Reduce.
- */
-
-/**
- * A BSPJob defines the properties for a job.
- */
-class BSPJob {
-public:
- virtual bool hasKey(const string& key) const = 0;
- virtual const string& get(const string& key) const = 0;
- virtual int getInt(const string& key) const = 0;
- virtual float getFloat(const string& key) const = 0;
- virtual bool getBoolean(const string&key) const = 0;
- virtual ~BSPJob() {}
-};
-
-/**
- * Task context provides the information about the task and job.
- */
-class TaskContext {
-public:
- /**
- * Counter to keep track of a property and its value.
- */
- class Counter {
- private:
- int id;
- public:
- Counter(int counterId) : id(counterId) {}
- Counter(const Counter& counter) : id(counter.id) {}
-
- int getId() const { return id; }
+namespace HamaPipes {
+
+ // global varibales
+ bool logging;
+
+ /********************************************/
+ /*************** MESSAGE_TYPE ***************/
+ /********************************************/
+ enum MESSAGE_TYPE {
+ START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES,
+ RUN_SETUP, RUN_BSP, RUN_CLEANUP,
+ READ_KEYVALUE, WRITE_KEYVALUE,
+ GET_MSG, GET_MSG_COUNT,
+ SEND_MSG, SYNC,
+ GET_ALL_PEERNAME, GET_PEERNAME,
+ GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT,
+ REOPEN_INPUT, CLEAR,
+ CLOSE, ABORT,
+ DONE, TASK_DONE,
+ REGISTER_COUNTER, INCREMENT_COUNTER,
+ SEQFILE_OPEN, SEQFILE_READNEXT,
+ SEQFILE_APPEND, SEQFILE_CLOSE,
+ PARTITION_REQUEST, PARTITION_RESPONSE,
+ LOG, END_OF_DATA
+ };
+
+ /* Only needed for debugging output */
+ const char* messageTypeNames[] = {
+ stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ),
+ stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ),
+ stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ),
+ stringify( GET_MSG ), stringify( GET_MSG_COUNT ),
+ stringify( SEND_MSG ), stringify( SYNC ),
+ stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ),
+ stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ),
+ stringify( REOPEN_INPUT ), stringify( CLEAR ),
+ stringify( CLOSE ), stringify( ABORT ),
+ stringify( DONE ), stringify( TASK_DONE ),
+ stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ),
+ stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ),
+ stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ),
+ stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE ),
+ stringify( LOG ), stringify( END_OF_DATA )
};
/**
- * Get the BSPJob for the current task.
- */
- virtual const BSPJob* getBSPJob() = 0;
-
- /**
- * Get the current key.
- * @return the current key
- */
- //virtual const string& getInputKey() = 0;
-
- /**
- * Get the current value.
- * @return the current value
- */
- //virtual const string& getInputValue() = 0;
-
- /**
- * Generate an output record
- */
- //virtual void emit(const string& key, const string& value) = 0;
-
- /**
- * Mark your task as having made progress without changing the status
- * message.
+ * Generic KeyValuePair including is_empty
*/
- //virtual void progress() = 0;
-
+ template <typename K, typename V>
+ struct KeyValuePair : pair<K, V> {
+ typedef pair<K, V> base_t;
+ bool is_empty;
+
+ KeyValuePair() : is_empty(false) {}
+ explicit KeyValuePair(bool x) : is_empty(x) {}
+ KeyValuePair(const K& k, const V& v) : base_t(k, v), is_empty(false) {}
+
+ template <class X, class Y>
+ KeyValuePair(const pair<X,Y> &p) : base_t(p), is_empty(false) {}
+
+ template <class X, class Y>
+ KeyValuePair(const KeyValuePair<X,Y> &p) : base_t(p), is_empty(p.is_empty) {}
+ };
+
/**
- * Set the status message and call progress.
+ * Override Generic KeyValuePair << operator
*/
- //virtual void setStatus(const string& status) = 0;
-
+ template <typename OS, typename K, typename V>
+ OS &operator<<(OS &os, const KeyValuePair<K, V>& p) {
+ os << "<KeyValuePair: ";
+ if (!p.is_empty) {
+ os << p.first << ", " << p.second;
+ } else {
+ os << "empty";
+ }
+ os << ">";
+ return os;
+ }
+
+ /********************************************/
+ /****************** BSPJob ******************/
+ /********************************************/
/**
- * Register a counter with the given group and name.
+ * A BSPJob defines the properties for a job.
*/
- //virtual Counter* getCounter(const string& group, const string& name) = 0;
-
+ class BSPJob {
+ public:
+ virtual bool hasKey(const string& key) const = 0;
+ virtual const string& get(const string& key) const = 0;
+ virtual int getInt(const string& key) const = 0;
+ virtual float getFloat(const string& key) const = 0;
+ virtual bool getBoolean(const string& key) const = 0;
+ virtual ~BSPJob() {}
+ };
+
+ /********************************************/
+ /**************** BSPJobImpl ****************/
+ /********************************************/
+ class BSPJobImpl: public BSPJob {
+ private:
+ map<string, string> values_;
+ public:
+ void set(const string& key, const string& value) {
+ values_[key] = value;
+ }
+
+ virtual bool hasKey(const string& key) const {
+ return values_.find(key) != values_.end();
+ }
+
+ virtual const string& get(const string& key) const {
+ map<string,string>::const_iterator itr = values_.find(key);
+ if (itr == values_.end()) {
+ throw Error("Key " + key + " not found in BSPJob");
+ }
+ return itr->second;
+ }
+
+ virtual int getInt(const string& key) const {
+ const string& val = get(key);
+ return toInt(val);
+ }
+
+ virtual float getFloat(const string& key) const {
+ const string& val = get(key);
+ return toFloat(val);
+ }
+
+ virtual bool getBoolean(const string& key) const {
+ const string& val = get(key);
+ return toBool(val);
+ }
+ };
+
+ /********************************************/
+ /**************** TaskContext ***************/
+ /********************************************/
/**
- * Increment the value of the counter with the given amount.
+ * Task context provides the information about the task and job.
*/
- //virtual void incrementCounter(const Counter* counter, uint64_t amount) = 0;
- virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
+ class TaskContext {
+ public:
+ /**
+ * Counter to keep track of a property and its value.
+ */
+ class Counter {
+ private:
+ int id_;
+ public:
+ Counter(int counter_id) : id_(counter_id) {}
+ Counter(const Counter& counter) : id_(counter.id_) {}
+
+ int getId() const { return id_; }
+ };
+
+ /**
+ * Get the BSPJob for the current task.
+ */
+ virtual const BSPJob* getBSPJob() = 0;
+
+ /**
+ * Get the current key.
+ * @return the current key
+ */
+ //virtual const string& getInputKey() = 0;
+
+ /**
+ * Get the current value.
+ * @return the current value
+ */
+ //virtual const string& getInputValue() = 0;
+
+ /**
+ * Generate an output record
+ */
+ //virtual void emit(const string& key, const string& value) = 0;
+
+ /**
+ * Mark your task as having made progress without changing the status
+ * message.
+ */
+ //virtual void progress() = 0;
+
+ /**
+ * Set the status message and call progress.
+ */
+ //virtual void setStatus(const string& status) = 0;
+
+ /**
+ * Register a counter with the given group and name.
+ */
+ //virtual Counter* getCounter(const string& group, const string& name) = 0;
+
+ /**
+ * Increment the value of the counter with the given amount.
+ */
+ //virtual void incrementCounter(const Counter* counter, uint64_t amount) = 0;
+ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
+
+ virtual ~TaskContext() {}
+ };
+
+ /********************************************/
+ /************* DownwardProtocol *************/
+ /********************************************/
+ template<class K1, class V1>
+ class DownwardProtocol {
+ public:
+ virtual void start(int protocol_version) = 0;
+ virtual void setBSPJob(vector<string> values) = 0;
+ virtual void setInputTypes(string key_type, string value_type) = 0;
+
+ virtual void runBsp(bool piped_input, bool piped_output) = 0;
+ virtual void runCleanup(bool piped_input, bool piped_output) = 0;
+ virtual void runSetup(bool piped_input, bool piped_output) = 0;
+
+ virtual void runPartition(const K1& key, const V1& value, int32_t num_tasks) = 0;
+
+ virtual void close() = 0;
+ virtual void abort() = 0;
+ virtual ~DownwardProtocol() {}
+ };
- virtual ~TaskContext() {}
-};
+ /********************************************/
+ /************** UpwardProtocol **************/
+ /********************************************/
+ template<typename D>
+ class UpwardProtocol {
+ public:
+ virtual void sendCommand(int32_t cmd) = 0;
+ template<class T>
+ void sendCommand(int32_t cmd, T value) {
+ static_cast<D*>(this)->template sendCommand<T>(cmd, value);
+ }
+
+ template<class T>
+ void sendCommand(int32_t cmd, const T values[], int size) {
+ static_cast<D*>(this)->template sendCommand<T>(cmd, values, size);
+ }
+
+ template<class T1, class T2>
+ void sendCommand(int32_t cmd, T1 value1, T2 value2) {
+ static_cast<D*>(this)->template sendCommand<T1,T2>(cmd, value1, value2);
+ }
+
+ template<class T1, class T2>
+ void sendCommand(int32_t cmd, T1 value, const T2 values[], int size) {
+ static_cast<D*>(this)->template sendCommand<T1,T2>(cmd, value, values, size);
+ }
+
+ //virtual void registerCounter(int id, const string& group, const string& name) = 0;
+ //virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
+ virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
+ virtual ~UpwardProtocol() {}
+ };
+
+ /* Forward definition of BinaryUpwardProtocol to pass to UpwardProtocol */
+ class BinaryUpwardProtocol;
+
+ /********************************************/
+ /***************** Protocol *****************/
+ /********************************************/
+ template<typename D>
+ class Protocol {
+ public:
-/**
- * SequenceFile Connector
- */
-class SequenceFileConnector {
-public:
- /**
- * Open SequenceFile with option "r" or "w"
- * key and value type of the values stored in the SequenceFile
- * @return the corresponding fileID
- */
- virtual int sequenceFileOpen(const string& path, const string& option, const string& keyType, const string& valueType) = 0;
+ template<class T>
+ T getResult(int32_t expected_response_cmd) {
+ return static_cast<D*>(this)->template getResult<T>(expected_response_cmd);
+ }
+
+ template<class T>
+ vector<T> getVectorResult(int32_t expected_response_cmd) {
+ return static_cast<D*>(this)->template getVectorResult<T>(expected_response_cmd);
+ }
+
+ template<class K, class V>
+ KeyValuePair<K,V> getKeyValueResult(int32_t expected_response_cmd) {
+ return static_cast<D*>(this)->template getKeyValueResult<K,V>(expected_response_cmd);
+ }
+
+ virtual void nextEvent() = 0;
+ virtual UpwardProtocol<BinaryUpwardProtocol>* getUplink() = 0;
+ virtual ~Protocol(){}
+ };
+
+ /********************************************/
+ /*********** SequenceFileConnector **********/
+ /********************************************/
+ /**
+ * SequenceFile Connector
+ */
+ /* Using Curiously recurring template pattern(CTRP) */
+ /* because virtual template function not possible */
+ template<typename D>
+ class SequenceFileConnector {
+ public:
+ /**
+ * Open SequenceFile with option "r" or "w"
+ * key and value type of the values stored in the SequenceFile
+ * @return the corresponding fileID
+ */
+ virtual int32_t sequenceFileOpen(const string& path, const string& option, const string& key_type, const string& value_type) = 0;
- /**
- * Read next key/value pair from the SequenceFile with fileID
- */
- virtual bool sequenceFileReadNext(int fileID, string& key, string& value) = 0;
-
- /**
- * Append the next key/value pair to the SequenceFile with fileID
- */
- virtual bool sequenceFileAppend(int fileID, const string& key, const string& value) = 0;
+ /**
+ * Close SequenceFile
+ */
+ virtual bool sequenceFileClose(int32_t file_id) = 0;
- /**
- * Close SequenceFile
- */
- virtual bool sequenceFileClose(int fileID) = 0;
-};
-
-
-class BSPContext: public TaskContext, public SequenceFileConnector {
-public:
-
- /**
- * Access the InputSplit of the mapper.
- */
- //virtual const string& getInputSplit() = 0;
-
- /**
- * Get the name of the key class of the input to this task.
- */
- virtual const string& getInputKeyClass() = 0;
-
- /**
- * Get the name of the value class of the input to this task.
- */
- virtual const string& getInputValueClass() = 0;
+ /**
+ * Read next key/value pair from the SequenceFile with fileID
+ * Using Curiously recurring template pattern(CTRP)
+ */
+ template<class K, class V>
+ bool sequenceFileReadNext(int32_t file_id, K& key, V& value) {
+ return static_cast<D*>(this)->template sequenceFileReadNext<K,V>(file_id, key, value);
+ }
+ /**
+ * Append the next key/value pair to the SequenceFile with fileID
+ * Using Curiously recurring template pattern(CTRP)
+ */
+ template<class K, class V>
+ bool sequenceFileAppend(int32_t file_id, const K& key, const V& value) {
+ return static_cast<D*>(this)->template sequenceFileAppend<K,V>(file_id, key, value);
+ }
+ };
+
+ /* Forward definition of BSPContextImpl to pass to SequenceFileConnector */
+ template<class K1, class V1, class K2, class V2, class M>
+ class BSPContextImpl;
+
+
+ template<class K1, class V1, class K2, class V2, class M>
+ class BSPContext: public TaskContext, public SequenceFileConnector<BSPContextImpl<K1, V1, K2, V2, M> > {
+ public:
+ /**
+ * Access the InputSplit of the mapper.
+ */
+ //virtual const string& getInputSplit() = 0;
- /**
- * Send a data with a tag to another BSPSlave corresponding to hostname.
- * Messages sent by this method are not guaranteed to be received in a sent
- * order.
- */
- virtual void sendMessage(const string& peerName, const string& msg) = 0;
+ /**
+ * Get the name of the key class of the input to this task.
+ */
+ virtual string getInputKeyClass() = 0;
- /**
- * @return A message from the peer's received messages queue (a FIFO).
- */
- virtual const string& getCurrentMessage() = 0;
+ /**
+ * Get the name of the value class of the input to this task.
+ */
+ virtual string getInputValueClass() = 0;
- /**
- * @return The number of messages in the peer's received messages queue.
- */
- virtual int getNumCurrentMessages() = 0;
+ /**
+ * Send a data with a tag to another BSPSlave corresponding to hostname.
+ * Messages sent by this method are not guaranteed to be received in a sent
+ * order.
+ */
+ virtual void sendMessage(const string& peer_name, const M& msg) = 0;
- /**
- * Barrier Synchronization.
- *
- * Sends all the messages in the outgoing message queues to the corresponding
- * remote peers.
- */
- virtual void sync() = 0;
+ /**
+ * @return A message from the peer's received messages queue (a FIFO).
+ */
+ virtual M getCurrentMessage() = 0;
- /**
- * @return the count of current super-step
- */
- virtual long getSuperstepCount() = 0;
-
- /**
- * @return the name of this peer in the format "hostname:port".
- */
- virtual const string& getPeerName() = 0;
+ /**
+ * @return The number of messages in the peer's received messages queue.
+ */
+ virtual int getNumCurrentMessages() = 0;
- /**
- * @return the name of n-th peer from sorted array by name.
- */
- virtual const string& getPeerName(int index) = 0;
+ /**
+ * Barrier Synchronization.
+ *
+ * Sends all the messages in the outgoing message queues to the corresponding
+ * remote peers.
+ */
+ virtual void sync() = 0;
- /**
- * @return the index of this peer from sorted array by name.
- */
- virtual int getPeerIndex() = 0;
+ /**
+ * @return the count of current super-step
+ */
+ virtual long getSuperstepCount() = 0;
- /**
- * @return the names of all the peers executing tasks from the same job
- * (including this peer).
- */
- virtual vector<string> getAllPeerNames() = 0;
+ /**
+ * @return the name of this peer in the format "hostname:port".
+ */
+ virtual string getPeerName() = 0;
- /**
- * @return the number of peers
- */
- virtual int getNumPeers() = 0;
+ /**
+ * @return the name of n-th peer from sorted array by name.
+ */
+ virtual string getPeerName(int index) = 0;
- /**
- * Clears all queues entries.
- */
- virtual void clear() = 0;
+ /**
+ * @return the index of this peer from sorted array by name.
+ */
+ virtual int getPeerIndex() = 0;
- /**
- * Writes a key/value pair to the output collector
- */
- virtual void write(const string& key, const string& value) = 0;
+ /**
+ * @return the names of all the peers executing tasks from the same job
+ * (including this peer).
+ */
+ virtual vector<string> getAllPeerNames() = 0;
- /**
- * Deserializes the next input key value into the given objects;
- */
- virtual bool readNext(string& key, string& value) = 0;
+ /**
+ * @return the number of peers
+ */
+ virtual int getNumPeers() = 0;
+
+ /**
+ * Clears all queues entries.
+ */
+ virtual void clear() = 0;
+
+ /**
+ * Writes a key/value pair to the output collector
+ */
+ virtual void write(const K2& key, const V2& value) = 0;
+
+ /**
+ * Deserializes the next input key value into the given objects;
+ */
+ virtual bool readNext(K1& key, V1& value) = 0;
/**
* Reads the next key value pair and returns it as a pair. It may reuse a
* {@link KeyValuePair} instance to save garbage collection time.
- *
+ *
* @return null if there are no records left.
* @throws IOException
*/
//public KeyValuePair<K1, V1> readNext() throws IOException;
+ /**
+ * Closes the input and opens it right away, so that the file pointer is at
+ * the beginning again.
+ */
+ virtual void reopenInput() = 0;
+
+ };
+
+ class Closable {
+ public:
+ virtual void close() {}
+ virtual ~Closable() {}
+ };
+
/**
- * Closes the input and opens it right away, so that the file pointer is at
- * the beginning again.
+ * The application's BSP class to do bsp.
*/
- virtual void reopenInput() = 0;
+ template<class K1, class V1, class K2, class V2, class M>
+ class BSP: public Closable {
+ public:
+ /**
+ * This method is called before the BSP method. It can be used for setup
+ * purposes.
+ */
+ virtual void setup(BSPContext<K1, V1, K2, V2, M>& context) = 0;
-};
+ /**
+ * This method is your computation method, the main work of your BSP should be
+ * done here.
+ */
+ virtual void bsp(BSPContext<K1, V1, K2, V2, M>& context) = 0;
-class Closable {
-public:
- virtual void close() {}
- virtual ~Closable() {}
-};
-
-/**
- * The application's BSP class to do bsp.
- */
-class BSP: public Closable {
-public:
+ /**
+ * This method is called after the BSP method. It can be used for cleanup
+ * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+ * case of exceptions.
+ */
+ virtual void cleanup(BSPContext<K1, V1, K2, V2, M>& context) = 0;
+ };
+
/**
- * This method is called before the BSP method. It can be used for setup
- * purposes.
+ * User code to decide where each key should be sent.
*/
- virtual void setup(BSPContext& context) = 0;
-
+ template<class K1, class V1, class K2, class V2, class M>
+ class Partitioner {
+ public:
+
+ virtual int partition(const K1& key, const V1& value, int32_t num_tasks) = 0;
+ virtual ~Partitioner() {}
+ };
+
/**
- * This method is your computation method, the main work of your BSP should be
- * done here.
+ * For applications that want to read the input directly for the map function
+ * they can define RecordReaders in C++.
*/
- virtual void bsp(BSPContext& context) = 0;
+ template<class K, class V>
+ class RecordReader: public Closable {
+ public:
+ virtual bool next(K& key, V& value) = 0;
+ /**
+ * The progress of the record reader through the split as a value between
+ * 0.0 and 1.0.
+ */
+ virtual float getProgress() = 0;
+ };
+
/**
- * This method is called after the BSP method. It can be used for cleanup
- * purposes. Cleanup is guranteed to be called after the BSP runs, even in
- * case of exceptions.
+ * An object to write key/value pairs as they are emited from the reduce.
*/
- virtual void cleanup(BSPContext& context) = 0;
-};
-
-/**
- * User code to decide where each key should be sent.
- */
-class Partitioner {
-public:
+ template<class K, class V>
+ class RecordWriter: public Closable {
+ public:
+ virtual void emit(const K& key, const V& value) = 0;
+ };
+
+ /**
+ * A factory to create the necessary application objects.
+ */
+ template<class K1, class V1, class K2, class V2, class M>
+ class Factory {
+ public:
+ virtual BSP<K1, V1, K2, V2, M>* createBSP(BSPContext<K1, V1, K2, V2, M>& context) const = 0;
- virtual int partition(const string& key,const string& value, int32_t numTasks) = 0;
- virtual ~Partitioner() {}
-};
+ /**
+ * Create an application partitioner object.
+ * @return the new partitioner or NULL, if the default partitioner should be
+ * used.
+ */
+ virtual Partitioner<K1, V1, K2, V2, M>* createPartitioner(BSPContext<K1, V1, K2, V2, M>& context) const {
+ return NULL;
+ }
-/**
- * For applications that want to read the input directly for the map function
- * they can define RecordReaders in C++.
- */
-class RecordReader: public Closable {
-public:
- virtual bool next(string& key, string& value) = 0;
-
+ /**
+ * Create an application record reader.
+ * @return the new RecordReader or NULL, if the Java RecordReader should be
+ * used.
+ */
+ virtual RecordReader<K1,V1>* createRecordReader(BSPContext<K1, V1, K2, V2, M>& context) const {
+ return NULL;
+ }
+
+ /**
+ * Create an application record writer.
+ * @return the new RecordWriter or NULL, if the Java RecordWriter should be
+ * used.
+ */
+ virtual RecordWriter<K2,V2>* createRecordWriter(BSPContext<K1, V1, K2, V2, M>& context) const {
+ return NULL;
+ }
+
+ virtual ~Factory() {}
+ };
+
/**
- * The progress of the record reader through the split as a value between
- * 0.0 and 1.0.
+ * Generic toString
*/
- virtual float getProgress() = 0;
-};
-
-/**
- * An object to write key/value pairs as they are emited from the reduce.
- */
-class RecordWriter: public Closable {
-public:
- virtual void emit(const string& key,
- const string& value) = 0;
-};
-
-/**
- * A factory to create the necessary application objects.
- */
-class Factory {
-public:
- virtual BSP* createBSP(BSPContext& context) const = 0;
-
+ template <class T>
+ string toString(const T& t)
+ {
+ std::ostringstream oss;
+ oss << t;
+ return oss.str();
+ }
/**
- * Create an application partitioner object.
- * @return the new partitioner or NULL, if the default partitioner should be
- * used.
+ * Generic toString template specializations
*/
- virtual Partitioner* createPartitioner(BSPContext& context) const {
- return NULL;
+ template <> string toString<string>(const string& t) {
+ return t;
}
-
+
/**
- * Create an application record reader.
- * @return the new RecordReader or NULL, if the Java RecordReader should be
- * used.
+ * Generic serialization
*/
- virtual RecordReader* createRecordReader(BSPContext& context) const {
- return NULL;
+ template<class T>
+ void serialize(T t, OutStream& stream) {
+ serializeString(toString<T>(t), stream);
}
-
+
/**
- * Create an application record writer.
- * @return the new RecordWriter or NULL, if the Java RecordWriter should be
- * used.
+ * Generic serialization template specializations
*/
- virtual RecordWriter* createRecordWriter(BSPContext& context) const {
- return NULL;
+ template <> void serialize<int32_t>(int32_t t, OutStream& stream) {
+ serializeInt(t, stream);
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::serializeInt '%d'\n", t);
+ }
}
-
- virtual ~Factory() {}
-};
-
-/**
- * Run the assigned task in the framework.
- * The user's main function should set the various functions using the
- * set* functions above and then call this.
- * @return true, if the task succeeded.
- */
-bool runTask(const Factory& factory);
-
+ template <> void serialize<int64_t>(int64_t t, OutStream& stream) {
+ serializeLong(t, stream);
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::serializeLong '%ld'\n", (long)t);
+ }
+ }
+ template <> void serialize<float>(float t, OutStream& stream) {
+ serializeFloat(t, stream);
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::serializeFloat '%f'\n", t);
+ }
+ }
+ template <> void serialize<double>(double t, OutStream& stream) {
+ serializeDouble(t, stream);
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::serializeDouble '%f'\n", t);
+ }
+ }
+ template <> void serialize<string>(string t, OutStream& stream) {
+ serializeString(t, stream);
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::serializeString '%s'\n", t.c_str());
+ }
+ }
+
+ /**
+ * Generic deserialization
+ */
+ template<class T>
+ T deserialize(InStream& stream) {
+ string str = "Not able to deserialize type: ";
+ throw Error(str.append(typeid(T).name()));
+ }
+
+ /**
+ * Generic deserialization template specializations
+ */
+ template <> int32_t deserialize<int32_t>(InStream& stream) {
+ int32_t result = deserializeInt(stream);
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeInt result: '%d'\n",
+ result);
+ }
+ return result;
+ }
+ template <> int64_t deserialize<int64_t>(InStream& stream) {
+ int64_t result = deserializeLong(stream);
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeLong result: '%ld'\n",
+ (long)result);
+ }
+ return result;
+ }
+ template <> float deserialize<float>(InStream& stream) {
+ float result = deserializeFloat(stream);
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeFloat result: '%f'\n",
+ result);
+ }
+ return result;
+ }
+ template <> double deserialize<double>(InStream& stream) {
+ double result = deserializeDouble(stream);
+ if (logging) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeDouble result: '%f'\n",
+ result);
+ }
+ return result;
+ }
+ template <> string deserialize<string>(InStream& stream) {
+ string result = deserializeString(stream);
+
+ if (logging) {
+ if (result.empty()) {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeString returns EMPTY string! Maybe wrong Serialization?\n");
+ } else {
+ fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeString result: '%s'\n",
+ ((result.length()<10)?result.c_str():result.substr(0,9).append("...").c_str()));
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Run the assigned task in the framework.
+ * The user's main function should set the various functions using the
+ * set* functions above and then call this.
+ * @return true, if the task succeeded.
+ */
+ template<class K1, class V1, class K2, class V2, class M>
+ bool runTask(const Factory<K1, V1, K2, V2, M>& factory);
+
+ // Include implementation in header because of templates
+ #include "../../impl/Pipes.cc"
}
#endif
Modified: hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/api/hama/TemplateFactory.hh?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh (original)
+++ hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh Sat Nov 23 08:41:09 2013
@@ -19,27 +19,31 @@
#define HAMA_PIPES_TEMPLATE_FACTORY_HH
namespace HamaPipes {
-
- template <class BSP>
- class TemplateFactory2: public Factory {
+
+ /* Generic template factory specification */
+ template <class BSP, class K1, class V1, class K2, class V2, class M>
+ class TemplateFactory2: public Factory<K1, V1, K2, V2, M> {
public:
- BSP* createBSP(BSPContext& context) const {
+ BSP* createBSP(BSPContext<K1, V1, K2, V2, M>& context) const {
return new BSP(context);
}
};
-
- template <class BSP, class partitioner=void>
- class TemplateFactory: public TemplateFactory2<BSP> {
+
+ /* Template factory including partitioner specification */
+ template <class BSP, class K1, class V1, class K2, class V2, class M, class partitioner=void>
+ class TemplateFactory: public TemplateFactory2<BSP, K1, V1, K2, V2, M> {
public:
- partitioner* createPartitioner(BSPContext& context) const {
- return new partitioner(context);
- }
+ partitioner* createPartitioner(BSPContext<K1, V1, K2, V2, M>& context) const {
+ return new partitioner(context);
+ }
};
- template <class BSP>
- class TemplateFactory<BSP,void>
- : public TemplateFactory2<BSP> {
+
+ /* Template factory without partitioner specification */
+ template <class BSP, class K1, class V1, class K2, class V2, class M>
+ class TemplateFactory<BSP, K1, V1, K2, V2, M, void>
+ : public TemplateFactory2<BSP, K1, V1, K2, V2, M> {
};
-
+
}
#endif