You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by Apache Wiki <wi...@apache.org> on 2013/11/23 11:53:03 UTC

[Hama Wiki] Update of "HamaPipes" by MartinIllecker

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.

The "HamaPipes" page has been changed by MartinIllecker:
https://wiki.apache.org/hama/HamaPipes?action=diff&rev1=6&rev2=7

- '''Hama Pipes''' is equivalent to '''Hadoop Pipes''' and offers the possibility to use Hama with C/C++.
+ '''Hama Pipes''' is equivalent to '''Hadoop Pipes''' and offers the possibility to use Hama with C++.
  
  == Installation ==
  
- You can compile Hama Pipes by executing the following commands:
+ The native compilation is integrated into the Maven build process.
  
  {{{
- cd $HAMA_HOME/c++/utils
- ./configure
- make install
+ mvn install
- 
- cd $HAMA_HOME/c++/pipes
- ./configure
- make install
  }}}
  
  == Interface ==
  
- Hama Pipes provides the following methods for C/C++ integration: (similar to the [[BSPModel|BSPModel]])
+ Hama Pipes provides the following methods for C++ integration: (similar to the functions of [[BSPModel#Communication_Model|BSP Communication Model]])
  
  ||Function||Description||
- ||`void sendMessage(const string& peerName, const string& msg)`||Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order.||
+ ||`void sendMessage(const string& peerName, const M& msg)`||Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order.||
- ||`string& getCurrentMessage()`||Returns a message from the peer's received messages queue (a FIFO).||
+ ||`M getCurrentMessage()`||Returns a message from the peer's received messages queue (a FIFO).||
  ||`int getNumCurrentMessages()`||Returns the number of messages in the peer's received messages queue.||
  ||`void sync()`||Starts the barrier synchronization and sends all the messages in the outgoing message queues to the corresponding remote peers.||
  ||`long getSuperstepCount()`||Returns the count of current super-step.||
@@ -32, +26 @@

  ||`vector<string> getAllPeerNames()`||Returns the names of all the peers executing tasks from the same job (including this peer).||
  ||`int getNumPeers()`||Returns the number of peers.||
  ||`void clear()`||Clears all queues entries.||
- ||`void write(const string& key, const string& value)`||Writes a key/value pair to the output collector.||
+ ||`void write(const K2& key, const V2& value)`||Writes a key/value pair to the output collector.||
- ||`bool readNext(string& key, string& value)`||Deserializes the next input key value into the given objects.||
+ ||`bool readNext(K1& key, V1& value)`||Deserializes the next input key value into the given objects.||
  ||`void reopenInput()`||Closes the input and opens it right away, so that the file pointer is at the beginning again.||
  
- The following additional methods support access to [[http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/SequenceFile.html|SequenceFiles]] under C/C++:
+ The following additional methods support access to [[http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/SequenceFile.html|SequenceFiles]] in C++:
  
  ||Function||Description||
  ||`sequenceFileOpen(const string& path, const string& option, const string& keyType, const string& valueType)`||Opens a SequenceFile with option "r" or "w", key/value type and returns the corresponding fileID.||
- ||`bool sequenceFileReadNext(int fileID, string& key, string& value)`||Reads the next key/value pair from the SequenceFile.||
+ ||`bool sequenceFileReadNext(int fileID, K& key, V& value)`||Reads the next key/value pair from the SequenceFile.||
- ||`bool sequenceFileAppend(int fileID, const string& key, const string& value)`||Appends the next key/value pair to the SequenceFile.||
+ ||`bool sequenceFileAppend(int fileID, const K& key, const V& value)`||Appends the next key/value pair to the SequenceFile.||
  ||`bool sequenceFileClose(int fileID)`||Closes a SequenceFile.||
  
- == C++ BSP example ==
+ == Compilation ==
  
- Finally here is the [[PiEstimator|Pi Estimator]] example implemented with Hama Pipes:
+ The following command can be used to compile a Hama Pipes application:
+ 
+ {{{
+ g++ -Ic++/src/main/native/utils/api \
+     -Ic++/src/main/native/pipes/api \
+     -Lc++/target/native \
+     -lhadooputils -lpthread \
+     PROGRAM.cc \
+     -o PROGRAM \
+     -g -Wall -O2
+ }}}
+ Please notice that paths have to be adjusted, if you are not operating in the Hama source folder.
+ 
+ == Examples ==
+ Hama Pipes includes the following examples:
+  * Summation
+  * PiEstimator
+  * MatrixMultiplication  
+ 
+ These examples are located in `c++/src/main/native/examples` and explained in `c++/src/main/native/examples/README.txt`.
+ 
+ === PiEstimator Example ===
+ 
+ This is a Hama Pipes C++ implementation of [[PiEstimator|PiEstimator]].
  
  {{{
  #include "hama/Pipes.hh"
@@ -55, +72 @@

  
  #include <time.h>
  #include <math.h>
+ #include <stdlib.h>
  #include <string>
  #include <iostream>
- #include <cstdlib>
  
  using std::string;
  using std::cout;
@@ -66, +83 @@

  using HamaPipes::BSPContext;
  using namespace HadoopUtils;
  
- class PiCalculationBSP: public BSP {
+ class PiEstimatorBSP: public BSP<string,string,string,double,int> {
  private:
-     string masterTask;
+   string master_task_;
-     int iterations;
+   long iterations_; // iterations_per_bsp_task
  public:
-   PiCalculationBSP(BSPContext& context) {
+   PiEstimatorBSP(BSPContext<string,string,string,double,int>& context) {
-       iterations = 10000;
+     iterations_ = 1000000L;
+   }
-   }
+   
-     
    inline double closed_interval_rand(double x0, double x1) {
      return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
    }
- 
-   void bsp(BSPContext& context) {
-       
+   
+   void setup(BSPContext<string,string,string,double,int>& context) {
+     // Choose one as a master
+     master_task_ = context.getPeerName(context.getNumPeers() / 2);
+   }
+   
+   void bsp(BSPContext<string,string,string,double,int>& context) {
-     // initialize random seed
+     /* initialize random seed */
      srand(time(NULL));
      
      int in = 0;
-     for (int i = 0; i < iterations; i++) {
+     for (long i = 0; i < iterations_; i++) {
-       //rand() -> greater than or equal to 0.0 and less than 1.0. 
        double x = 2.0 * closed_interval_rand(0, 1) - 1.0;
-       double y = 2.0 * closed_interval_rand(0, 1) - 1.0;    
+       double y = 2.0 * closed_interval_rand(0, 1) - 1.0;
        if (sqrt(x * x + y * y) < 1.0) {
          in++;
        }
-     }      
+     }
+     
+     context.sendMessage(master_task_, in);
+     context.sync();
+   }
+   
+   void cleanup(BSPContext<string,string,string,double,int>& context) {
+     if (context.getPeerName().compare(master_task_)==0) {
        
-     double data = 4.0 * in / iterations;
+       long total_hits = 0;
+       int msg_count = context.getNumCurrentMessages();
+       for (int i=0; i < msg_count; i++) {
+         total_hits += context.getCurrentMessage();
+       }
        
+       double pi = 4.0 * total_hits / (msg_count * iterations_);
-     context.sendMessage(masterTask, toString(data));
-     context.sync();
-   }
-     
-   void setup(BSPContext& context) {
-     // Choose one as a master
-     masterTask = context.getPeerName(context.getNumPeers() / 2);
-   }
-     
-   void cleanup(BSPContext& context) {
-     if (context.getPeerName().compare(masterTask)==0) {
-       double pi = 0.0;
-       int msgCount = context.getNumCurrentMessages();
-       string received;
-       for (int i=0; i<msgCount; i++) {
-         string received = context.getCurrentMessage();
-         pi += toDouble(received);
-       }
- 
-       pi = pi / msgCount; //msgCount = numPeers
-       context.write("Estimated value of PI is", toString(pi));
+       context.write("Estimated value of PI", pi);
      }
    }
  };
  
  int main(int argc, char *argv[]) {
-   return HamaPipes::runTask(HamaPipes::TemplateFactory<PiCalculationBSP>());
+   return HamaPipes::runTask<string,string,string,double,int>(HamaPipes::TemplateFactory<PiEstimatorBSP,string,string,string,double,int>());
  }
  }}}
  
- Makefile for this example:
- {{{
- CC = g++
- CPPFLAGS = -m64 -I$(HAMA_HOME)/c++/install/include
- 
- PiCalculation: PiCalculation.cc
- 	$(CC) $(CPPFLAGS) $< -L$(HAMA_HOME)/c++/install/lib -lhamapipes -lhadooputils -lcrypto -lpthread -g -O2 -o $@
- 
- clean:
- 	rm -f PiCalculation
- }}}
- 
- The corresponding job configuration `PiCalculation_job.xml` looks like that:
+ The corresponding job configuration `piestimator.xml` looks like that:
  {{{
- <?xml version="1.0"?>
  <configuration>
    <property>
-     // Set the binary path on DFS
      <name>hama.pipes.executable</name>
-     <value>bin/PiCalculation</value>
+     <value>hdfs:/examples/bin/piestimator</value>
    </property>
    <property>
      <name>hama.pipes.java.recordreader</name>
@@ -161, +159 @@

    </property>
    <property>
      <name>bsp.output.format.class</name>
-     <value>org.apache.hama.bsp.TextOutputFormat</value>
+     <value>org.apache.hama.bsp.SequenceFileOutputFormat</value>
+   </property>
+   <property>
+     <name>bsp.output.key.class</name>
+     <value>org.apache.hadoop.io.Text</value>
+   </property>
+   <property>
+     <name>bsp.output.value.class</name>
+     <value>org.apache.hadoop.io.DoubleWritable</value>
+   </property>
+   <property>
+     <name>bsp.message.class</name>
+     <value>org.apache.hadoop.io.IntWritable</value>
    </property>
    <property>
      <name>hama.pipes.logging</name>
@@ -169, +179 @@

    </property>
    <property>
      <name>bsp.peers.num</name>
-     <value>10</value>                                            
+     <value>3</value>                                            
    </property>
  </configuration>
  }}}
  
- Finally the PiCalculation example can be submitted with these commands:
+ Finally you can run the PiCalculation example by executing the following commands:
  {{{
+ # First copy piestimator binary to dfs
+ hadoop fs -put c++/target/native/examples/piestimator \
+  /examples/bin/piestimator
+ 
+ # Run piestimator example
+ hama pipes \
+  -conf c++/src/main/native/examples/conf/piestimator.xml \
+  -output /examples/output/piestimator
+ 
+ # View output data
+ hama seqdumper -seqFile /examples/output/piestimator/part-00001
+ 
+ # You should see
+ # Input Path: /examples/output/piestimator/part-00001
+ # Key class: class org.apache.hadoop.io.Text 
+ # Value Class: class org.apache.hadoop.io.DoubleWritable
+ # Key: Estimated value of PI: Value: 3.139116
+ # Count: 1
+ 
- # delete output dir
+ # Delete output folder
+ hadoop fs -rmr /examples/output/piestimator
- hadoop dfs -rmr output/PiCalculation
- # copy piCalculation binary to HDFS
- hadoop dfs -rmr bin/PiCalculation
- hadoop dfs -put PiCalculation bin/PiCalculation
- # submit job
- hama pipes -conf PiCalculation_job.xml -output output/PiCalculation
  }}}