You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by Apache Wiki <wi...@apache.org> on 2013/11/23 11:53:03 UTC
[Hama Wiki] Update of "HamaPipes" by MartinIllecker
Dear Wiki user,
You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.
The "HamaPipes" page has been changed by MartinIllecker:
https://wiki.apache.org/hama/HamaPipes?action=diff&rev1=6&rev2=7
- '''Hama Pipes''' is equivalent to '''Hadoop Pipes''' and offers the possibility to use Hama with C/C++.
+ '''Hama Pipes''' is equivalent to '''Hadoop Pipes''' and offers the possibility to use Hama with C++.
== Installation ==
- You can compile Hama Pipes by executing the following commands:
+ The native compilation is integrated into the Maven build process.
{{{
- cd $HAMA_HOME/c++/utils
- ./configure
- make install
+ mvn install
-
- cd $HAMA_HOME/c++/pipes
- ./configure
- make install
}}}
== Interface ==
- Hama Pipes provides the following methods for C/C++ integration: (similar to the [[BSPModel|BSPModel]])
+ Hama Pipes provides the following methods for C++ integration: (similar to the functions of [[BSPModel#Communication_Model|BSP Communication Model]])
||Function||Description||
- ||`void sendMessage(const string& peerName, const string& msg)`||Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order.||
+ ||`void sendMessage(const string& peerName, const M& msg)`||Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order.||
- ||`string& getCurrentMessage()`||Returns a message from the peer's received messages queue (a FIFO).||
+ ||`M getCurrentMessage()`||Returns a message from the peer's received messages queue (a FIFO).||
||`int getNumCurrentMessages()`||Returns the number of messages in the peer's received messages queue.||
||`void sync()`||Starts the barrier synchronization and sends all the messages in the outgoing message queues to the corresponding remote peers.||
||`long getSuperstepCount()`||Returns the count of current super-step.||
@@ -32, +26 @@
||`vector<string> getAllPeerNames()`||Returns the names of all the peers executing tasks from the same job (including this peer).||
||`int getNumPeers()`||Returns the number of peers.||
||`void clear()`||Clears all queues entries.||
- ||`void write(const string& key, const string& value)`||Writes a key/value pair to the output collector.||
+ ||`void write(const K2& key, const V2& value)`||Writes a key/value pair to the output collector.||
- ||`bool readNext(string& key, string& value)`||Deserializes the next input key value into the given objects.||
+ ||`bool readNext(K1& key, V1& value)`||Deserializes the next input key value into the given objects.||
||`void reopenInput()`||Closes the input and opens it right away, so that the file pointer is at the beginning again.||
- The following additional methods support access to [[http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/SequenceFile.html|SequenceFiles]] under C/C++:
+ The following additional methods support access to [[http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/SequenceFile.html|SequenceFiles]] in C++:
||Function||Description||
||`sequenceFileOpen(const string& path, const string& option, const string& keyType, const string& valueType)`||Opens a SequenceFile with option "r" or "w", key/value type and returns the corresponding fileID.||
- ||`bool sequenceFileReadNext(int fileID, string& key, string& value)`||Reads the next key/value pair from the SequenceFile.||
+ ||`bool sequenceFileReadNext(int fileID, K& key, V& value)`||Reads the next key/value pair from the SequenceFile.||
- ||`bool sequenceFileAppend(int fileID, const string& key, const string& value)`||Appends the next key/value pair to the SequenceFile.||
+ ||`bool sequenceFileAppend(int fileID, const K& key, const V& value)`||Appends the next key/value pair to the SequenceFile.||
||`bool sequenceFileClose(int fileID)`||Closes a SequenceFile.||
- == C++ BSP example ==
+ == Compilation ==
- Finally here is the [[PiEstimator|Pi Estimator]] example implemented with Hama Pipes:
+ The following command can be used to compile a Hama Pipes application:
+
+ {{{
+ g++ -Ic++/src/main/native/utils/api \
+ -Ic++/src/main/native/pipes/api \
+ -Lc++/target/native \
+ -lhadooputils -lpthread \
+ PROGRAM.cc \
+ -o PROGRAM \
+ -g -Wall -O2
+ }}}
+ Please notice that paths have to be adjusted, if you are not operating in the Hama source folder.
+
+ == Examples ==
+ Hama Pipes includes the following examples:
+ * Summation
+ * PiEstimator
+ * MatrixMultiplication
+
+ These examples are located in `c++/src/main/native/examples` and explained in `c++/src/main/native/examples/README.txt`.
+
+ === PiEstimator Example ===
+
+ This is a Hama Pipes C++ implementation of [[PiEstimator|PiEstimator]].
{{{
#include "hama/Pipes.hh"
@@ -55, +72 @@
#include <time.h>
#include <math.h>
+ #include <stdlib.h>
#include <string>
#include <iostream>
- #include <cstdlib>
using std::string;
using std::cout;
@@ -66, +83 @@
using HamaPipes::BSPContext;
using namespace HadoopUtils;
- class PiCalculationBSP: public BSP {
+ class PiEstimatorBSP: public BSP<string,string,string,double,int> {
private:
- string masterTask;
+ string master_task_;
- int iterations;
+ long iterations_; // iterations_per_bsp_task
public:
- PiCalculationBSP(BSPContext& context) {
+ PiEstimatorBSP(BSPContext<string,string,string,double,int>& context) {
- iterations = 10000;
+ iterations_ = 1000000L;
+ }
- }
+
-
inline double closed_interval_rand(double x0, double x1) {
return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
}
-
- void bsp(BSPContext& context) {
-
+
+ void setup(BSPContext<string,string,string,double,int>& context) {
+ // Choose one as a master
+ master_task_ = context.getPeerName(context.getNumPeers() / 2);
+ }
+
+ void bsp(BSPContext<string,string,string,double,int>& context) {
- // initialize random seed
+ /* initialize random seed */
srand(time(NULL));
int in = 0;
- for (int i = 0; i < iterations; i++) {
+ for (long i = 0; i < iterations_; i++) {
- //rand() -> greater than or equal to 0.0 and less than 1.0.
double x = 2.0 * closed_interval_rand(0, 1) - 1.0;
- double y = 2.0 * closed_interval_rand(0, 1) - 1.0;
+ double y = 2.0 * closed_interval_rand(0, 1) - 1.0;
if (sqrt(x * x + y * y) < 1.0) {
in++;
}
- }
+ }
+
+ context.sendMessage(master_task_, in);
+ context.sync();
+ }
+
+ void cleanup(BSPContext<string,string,string,double,int>& context) {
+ if (context.getPeerName().compare(master_task_)==0) {
- double data = 4.0 * in / iterations;
+ long total_hits = 0;
+ int msg_count = context.getNumCurrentMessages();
+ for (int i=0; i < msg_count; i++) {
+ total_hits += context.getCurrentMessage();
+ }
+ double pi = 4.0 * total_hits / (msg_count * iterations_);
- context.sendMessage(masterTask, toString(data));
- context.sync();
- }
-
- void setup(BSPContext& context) {
- // Choose one as a master
- masterTask = context.getPeerName(context.getNumPeers() / 2);
- }
-
- void cleanup(BSPContext& context) {
- if (context.getPeerName().compare(masterTask)==0) {
- double pi = 0.0;
- int msgCount = context.getNumCurrentMessages();
- string received;
- for (int i=0; i<msgCount; i++) {
- string received = context.getCurrentMessage();
- pi += toDouble(received);
- }
-
- pi = pi / msgCount; //msgCount = numPeers
- context.write("Estimated value of PI is", toString(pi));
+ context.write("Estimated value of PI", pi);
}
}
};
int main(int argc, char *argv[]) {
- return HamaPipes::runTask(HamaPipes::TemplateFactory<PiCalculationBSP>());
+ return HamaPipes::runTask<string,string,string,double,int>(HamaPipes::TemplateFactory<PiEstimatorBSP,string,string,string,double,int>());
}
}}}
- Makefile for this example:
- {{{
- CC = g++
- CPPFLAGS = -m64 -I$(HAMA_HOME)/c++/install/include
-
- PiCalculation: PiCalculation.cc
- $(CC) $(CPPFLAGS) $< -L$(HAMA_HOME)/c++/install/lib -lhamapipes -lhadooputils -lcrypto -lpthread -g -O2 -o $@
-
- clean:
- rm -f PiCalculation
- }}}
-
- The corresponding job configuration `PiCalculation_job.xml` looks like that:
+ The corresponding job configuration `piestimator.xml` looks like that:
{{{
- <?xml version="1.0"?>
<configuration>
<property>
- // Set the binary path on DFS
<name>hama.pipes.executable</name>
- <value>bin/PiCalculation</value>
+ <value>hdfs:/examples/bin/piestimator</value>
</property>
<property>
<name>hama.pipes.java.recordreader</name>
@@ -161, +159 @@
</property>
<property>
<name>bsp.output.format.class</name>
- <value>org.apache.hama.bsp.TextOutputFormat</value>
+ <value>org.apache.hama.bsp.SequenceFileOutputFormat</value>
+ </property>
+ <property>
+ <name>bsp.output.key.class</name>
+ <value>org.apache.hadoop.io.Text</value>
+ </property>
+ <property>
+ <name>bsp.output.value.class</name>
+ <value>org.apache.hadoop.io.DoubleWritable</value>
+ </property>
+ <property>
+ <name>bsp.message.class</name>
+ <value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>hama.pipes.logging</name>
@@ -169, +179 @@
</property>
<property>
<name>bsp.peers.num</name>
- <value>10</value>
+ <value>3</value>
</property>
</configuration>
}}}
- Finally the PiCalculation example can be submitted with these commands:
+ Finally you can run the PiCalculation example by executing the following commands:
{{{
+ # First copy piestimator binary to dfs
+ hadoop fs -put c++/target/native/examples/piestimator \
+ /examples/bin/piestimator
+
+ # Run piestimator example
+ hama pipes \
+ -conf c++/src/main/native/examples/conf/piestimator.xml \
+ -output /examples/output/piestimator
+
+ # View output data
+ hama seqdumper -seqFile /examples/output/piestimator/part-00001
+
+ # You should see
+ # Input Path: /examples/output/piestimator/part-00001
+ # Key class: class org.apache.hadoop.io.Text
+ # Value Class: class org.apache.hadoop.io.DoubleWritable
+ # Key: Estimated value of PI: Value: 3.139116
+ # Count: 1
+
- # delete output dir
+ # Delete output folder
+ hadoop fs -rmr /examples/output/piestimator
- hadoop dfs -rmr output/PiCalculation
- # copy piCalculation binary to HDFS
- hadoop dfs -rmr bin/PiCalculation
- hadoop dfs -put PiCalculation bin/PiCalculation
- # submit job
- hama pipes -conf PiCalculation_job.xml -output output/PiCalculation
}}}