You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by Apache Wiki <wi...@apache.org> on 2012/08/25 17:02:53 UTC
[Hama Wiki] Update of "HamaPipes" by MartinIllecker
Dear Wiki user,
You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.
The "HamaPipes" page has been changed by MartinIllecker:
http://wiki.apache.org/hama/HamaPipes?action=diff&rev1=1&rev2=2
- Describe HamaPipes here.
+ '''Hama Pipes''' is equivalent to '''Hadoop Pipes''' and offers the possibility to use Hama with C/C++.
+ The current status of Hama Pipes is experimental and can be found here: [[https://issues.apache.org/jira/browse/HAMA-619|HAMA-619]]
+
+ Hama Pipes will be part of Hama 0.6.0.
+
+ Hama Pipes provides the following methods for C/C++ integration: (similar to the [[http://wiki.apache.org/hama/BSPModel|BSPModel]])
+
+ ||Function||Description||
+ ||`void sendMessage(const string& peerName, const string& msg)`||Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order.||
+ ||`string& getCurrentMessage()`||Returns a message from the peer's received messages queue (a FIFO).||
+ ||`int getNumCurrentMessages()`||Returns the number of messages in the peer's received messages queue.||
+ ||`void sync()`||Starts the barrier synchronization and sends all the messages in the outgoing message queues to the corresponding remote peers.||
+ ||`long getSuperstepCount()`||Returns the count of current super-step.||
+ ||`string& getPeerName()`||Returns the name of this peer in the format "hostname:port".||
+ ||`string& getPeerName(int index)`||Returns the name of n-th peer from sorted array by name.||
+ ||`int getPeerIndex()`||Returns the index of this peer from sorted array by name.||
+ ||`vector<string> getAllPeerNames()`||Returns the names of all the peers executing tasks from the same job (including this peer).||
+ ||`int getNumPeers()`||Returns the number of peers.||
+ ||`void clear()`||Clears all queues entries.||
+ ||`void write(const string& key, const string& value)`||Writes a key/value pair to the output collector.||
+ ||`bool readNext(string& key, string& value)`||Deserializes the next input key value into the given objects.||
+ ||`void reopenInput()`||Closes the input and opens it right away, so that the file pointer is at the beginning again.||
+
+ The following additional methods support access to [[http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/SequenceFile.html|SequenceFiles]] under C/C++:
+
+ ||Function||Description||
+ ||`int sequenceFileOpen(const string& path, const string& option)`||Opens a SequenceFile with option "r" or "w" returns the corresponding fileID.||
+ ||`bool sequenceFileReadNext(int fileID, string& key, string& value)`||Reads the next key/value pair from the SequenceFile.||
+ ||`bool sequenceFileAppend(int fileID, const string& key, const string& value)`||Appends the next key/value pair to the SequenceFile.||
+ ||`bool sequenceFileClose(int fileID)`||Closes a SequenceFile.||
+
+ Finally here is the [[http://wiki.apache.org/hama/PiEstimator|Pi Estimator]] example implemented with Hama Pipes:
+
+ {{{
+ #include "hama/Pipes.hh"
+ #include "hama/TemplateFactory.hh"
+ #include "hadoop/StringUtils.hh"
+
+ #include <time.h>
+ #include <math.h>
+ #include <string>
+ #include <iostream>
+
+ using std::string;
+ using std::cout;
+
+ using HamaPipes::BSP;
+ using HamaPipes::BSPContext;
+ using namespace HadoopUtils;
+
+ class PiCalculationBSP: public BSP {
+ private:
+ string masterTask;
+ int iterations;
+ public:
+ PiCalculationBSP(BSPContext& context) {
+ iterations = 10000;
+ }
+
+ inline double closed_interval_rand(double x0, double x1) {
+ return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
+ }
+
+ void bsp(BSPContext& context) {
+
+ // initialize random seed
+ srand(time(NULL));
+
+ int in = 0;
+ for (int i = 0; i < iterations; i++) {
+ //rand() -> greater than or equal to 0.0 and less than 1.0.
+ double x = 2.0 * closed_interval_rand(0, 1) - 1.0;
+ double y = 2.0 * closed_interval_rand(0, 1) - 1.0;
+ if (sqrt(x * x + y * y) < 1.0) {
+ in++;
+ }
+ }
+
+ double data = 4.0 * in / iterations;
+
+ context.sendMessage(masterTask, toString(data));
+ context.sync();
+ }
+
+ void setup(BSPContext& context) {
+ // Choose one as a master
+ masterTask = context.getPeerName(context.getNumPeers() / 2);
+ }
+
+ void cleanup(BSPContext& context) {
+ if (context.getPeerName().compare(masterTask)==0) {
+ double pi = 0.0;
+ int msgCount = context.getNumCurrentMessages();
+ string received;
+ for (int i=0; i<msgCount; i++) {
+ string received = context.getCurrentMessage();
+ pi += toDouble(received);
+ }
+
+ pi = pi / msgCount; //msgCount = numPeers
+ context.write("Estimated value of PI is", toString(pi));
+ }
+ }
+ };
+
+ int main(int argc, char *argv[]) {
+ return HamaPipes::runTask(HamaPipes::TemplateFactory<PiCalculationBSP>());
+ }
+ }}}
+
+ Makefile for this example:
+ {{{
+ CC = g++
+ CPPFLAGS = -m64 -I$(HAMA_HOME)/c++/install/include
+
+ PiCalculation: PiCalculation.cc
+ $(CC) $(CPPFLAGS) $< -L$(HAMA_HOME)/c++/install/lib -lhamapipes -lhadooputils -lcrypto -lpthread -g -O2 -o $@
+
+ clean:
+ rm -f PiCalculation
+ }}}
+
+ The corresponding job configuration `PiCalculation_job.xml` looks like that:
+ {{{
+ <?xml version="1.0"?>
+ <configuration>
+ <property>
+ // Set the binary path on DFS
+ <name>hama.pipes.executable</name>
+ <value>bin/PiCalculation</value>
+ </property>
+ <property>
+ <name>hama.pipes.java.recordreader</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hama.pipes.java.recordwriter</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>bsp.input.format.class</name>
+ <value>org.apache.hama.bsp.NullInputFormat</value>
+ </property>
+ <property>
+ <name>bsp.output.format.class</name>
+ <value>org.apache.hama.bsp.TextOutputFormat</value>
+ </property>
+ <property>
+ <name>hama.pipes.logging</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>bsp.peers.num</name>
+ <value>10</value>
+ </property>
+ </configuration>
+ }}}
+
+ Finally the PiCalculation example can be submitted with these commands:
+ {{{
+ # delete output dir
+ hadoop dfs -rmr output/PiCalculation
+ # copy piCalculation binary to HDFS
+ hadoop dfs -rmr bin/PiCalculation
+ hadoop dfs -put PiCalculation bin/PiCalculation
+ # submit job
+ hama pipes -conf PiCalculation_job.xml -output output/PiCalculation
+ }}}
+