You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/11/23 09:41:10 UTC

svn commit: r1544761 [2/3] - in /hama/trunk: ./ c++/ c++/src/ c++/src/main/native/examples/ c++/src/main/native/examples/conf/ c++/src/main/native/examples/impl/ c++/src/main/native/examples/input/ c++/src/main/native/pipes/api/hama/ c++/src/main/nativ...

Added: hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/Pipes.cc?rev=1544761&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc (added)
+++ hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc Sat Nov 23 08:41:09 2013
@@ -0,0 +1,1356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/********************************************/
+/*********** BinaryUpwardProtocol ***********/
+/********************************************/
+class BinaryUpwardProtocol: public UpwardProtocol<BinaryUpwardProtocol> {
+private:
+  FileOutStream* out_stream_;
+public:
+  BinaryUpwardProtocol(FILE* out_stream) {
+    out_stream_ = new FileOutStream();
+    HADOOP_ASSERT(out_stream_->open(out_stream), "problem opening stream");
+  }
+  
+  /* local function */
+  void sendCommand(int32_t cmd, bool flush) {
+    serializeInt(cmd, *out_stream_);
+    if (flush) {
+      out_stream_->flush();
+    }
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s\n",
+              messageTypeNames[cmd]);
+    }
+  }
+  
+  template<class T>
+  void sendCommand(int32_t cmd, T value) {
+    sendCommand(cmd, false);
+    // Write out generic value
+    serialize<T>(value, *out_stream_);
+    out_stream_->flush();
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: '%s'\n",
+              messageTypeNames[cmd], toString<T>(value).c_str());
+    }
+  }
+  
+  template<class T>
+  void sendCommand(int32_t cmd, const T values[], int size) {
+    sendCommand(cmd, false);
+    // Write out generic values
+    for (int i=0; i<size; i++) {
+      serialize<T>(values[i], *out_stream_);
+      if(logging) {
+        fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: '%s'\n",
+                messageTypeNames[cmd], i+1, toString<T>(values[i]).c_str());
+      }
+    }
+    out_stream_->flush();
+  }
+  
+  template<class T1, class T2>
+  void sendCommand(int32_t cmd, T1 value1, T2 value2) {
+    sendCommand(cmd, false);
+    // Write out generic value1
+    serialize<T1>(value1, *out_stream_);
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param1: '%s'\n",
+              messageTypeNames[cmd], toString<T1>(value1).c_str());
+    }
+    // Write out generic value2
+    serialize<T2>(value2, *out_stream_);
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param2: '%s'\n",
+              messageTypeNames[cmd], toString<T2>(value2).c_str());
+    }
+    out_stream_->flush();
+  }
+  
+  template<class T1, class T2>
+  void sendCommand(int32_t cmd, T1 value, const T2 values[], int size) {
+    sendCommand(cmd, false);
+    // Write out generic value
+    serialize<T1>(value, *out_stream_);
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: '%s'\n",
+              messageTypeNames[cmd], 0, toString<T1>(value).c_str());
+    }
+    // Write out generic values
+    for (int i=0; i<size; i++) {
+      serialize<T2>(values[i], *out_stream_);
+      if(logging) {
+        fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: '%s'\n",
+                messageTypeNames[cmd], i+1, toString<T2>(value).c_str());
+      }
+    }
+    out_stream_->flush();
+  }
+  
+  virtual void sendCommand(int32_t cmd) {
+    sendCommand(cmd, true);
+  }
+  
+  /*
+   virtual void registerCounter(int id, const string& group,
+   const string& name) {
+   serializeInt(REGISTER_COUNTER, *stream);
+   serializeInt(id, *stream);
+   serializeString(group, *stream);
+   serializeString(name, *stream);
+   }
+   
+   virtual void incrementCounter(const TaskContext::Counter* counter,
+   uint64_t amount) {
+   serializeInt(INCREMENT_COUNTER, *stream);
+   serializeInt(counter->getId(), *stream);
+   serializeLong(amount, *stream);
+   }
+   */
+  
+  virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
+    serializeInt(INCREMENT_COUNTER, *out_stream_);
+    serializeString(group, *out_stream_);
+    serializeString(name, *out_stream_);
+    serializeLong(amount, *out_stream_);
+    out_stream_->flush();
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n");
+    }
+  }
+  
+  ~BinaryUpwardProtocol() {
+    delete out_stream_;
+  }
+};
+
+/********************************************/
+/************** BinaryProtocol **************/
+/********************************************/
+template<class K1, class V1>
+class BinaryProtocol: public Protocol< BinaryProtocol<K1,V1> > {
+private:
+  FileInStream* in_stream_;
+  DownwardProtocol<K1,V1>* handler_;
+  BinaryUpwardProtocol* uplink_;
+  
+public:
+  BinaryProtocol(FILE* in_stream, DownwardProtocol<K1,V1>* handler, FILE* uplink) {
+    in_stream_ = new FileInStream();
+    in_stream_->open(in_stream);
+    uplink_ = new BinaryUpwardProtocol(uplink);
+    handler_ = handler;
+  }
+  
+  UpwardProtocol<BinaryUpwardProtocol>* getUplink() {
+    return uplink_;
+  }
+  
+  /**
+   * Wait for next event, but don't expect a response for
+   * a previously sent command
+   */
+  void nextEvent() {
+    // read command
+    int32_t cmd;
+    cmd = deserializeInt(*in_stream_);
+    
+    switch (cmd) {
+        
+      case START_MESSAGE: {
+        int32_t protocol_version;
+        protocol_version = deserialize<int32_t>(*in_stream_);
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE protocol_version: %d\n",
+                  protocol_version);
+        }
+        handler_->start(protocol_version);
+        break;
+      }
+        // setup BSP Job Configuration
+      case SET_BSPJOB_CONF: {
+        int32_t entries;
+        entries = deserialize<int32_t>(*in_stream_);
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n",
+                  entries);
+        }
+        vector<string> properties(entries*2);
+        for(int i=0; i < entries*2; ++i) {
+          string item;
+          item = deserialize<string>(*in_stream_);
+          properties.push_back(item);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n",
+                    item.c_str());
+          }
+        }
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n",
+                  entries);
+        }
+        handler_->setBSPJob(properties);
+        break;
+      }
+      case SET_INPUT_TYPES: {
+        string key_type;
+        string value_type;
+        key_type = deserialize<string>(*in_stream_);
+        value_type = deserialize<string>(*in_stream_);
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",
+                  key_type.c_str(), value_type.c_str());
+        }
+        handler_->setInputTypes(key_type, value_type);
+        break;
+      }
+      case RUN_SETUP: {
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n");
+        }
+        int32_t piped_input;
+        int32_t piped_output;
+        piped_input = deserialize<int32_t>(*in_stream_);
+        piped_output = deserialize<int32_t>(*in_stream_);
+        handler_->runSetup(piped_input, piped_output);
+        break;
+      }
+      case RUN_BSP: {
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n");
+        }
+        int32_t piped_input;
+        int32_t piped_output;
+        piped_input = deserialize<int32_t>(*in_stream_);
+        piped_output = deserialize<int32_t>(*in_stream_);
+        handler_->runBsp(piped_input, piped_output);
+        break;
+      }
+      case RUN_CLEANUP: {
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n");
+        }
+        int32_t piped_input;
+        int32_t piped_output;
+        piped_input = deserialize<int32_t>(*in_stream_);
+        piped_output = deserialize<int32_t>(*in_stream_);
+        handler_->runCleanup(piped_input, piped_output);
+        break;
+      }
+      case PARTITION_REQUEST: {
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n");
+        }
+        
+        K1 partion_key;
+        V1 partion_value;
+        int32_t num_tasks;
+        
+        partion_key = deserialize<K1>(*in_stream_);
+        partion_value = deserialize<V1>(*in_stream_);
+        num_tasks = deserialize<int32_t>(*in_stream_);
+        
+        handler_->runPartition(partion_key, partion_value, num_tasks);
+        break;
+      }
+      case CLOSE: {
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
+        }
+        handler_->close();
+        break;
+      }
+      case ABORT: {
+        if(logging) {
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
+        }
+        handler_->abort();
+        break;
+      }
+      default: {
+        HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+        fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",
+                cmd);
+      }
+    }
+  }
+  
+  /**
+   * Wait for next event, which should be a response for
+   * a previously sent command (expected_response_cmd)
+   * and return the generic result
+   */
+  template<class T>
+  T getResult(int32_t expected_response_cmd) {
+    
+    T result = T();
+    
+    // read response command
+    int32_t cmd;
+    cmd = deserializeInt(*in_stream_);
+    
+    // check if response is expected
+    if (expected_response_cmd == cmd) {
+      
+      switch (cmd) {
+          
+        case GET_MSG_COUNT: {
+          T msg_count;
+          msg_count = deserialize<T>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msg_count: '%s'\n",
+                    toString<T>(msg_count).c_str());
+          }
+          return msg_count;
+        }
+        case GET_MSG: {
+          T msg;
+          msg = deserialize<T>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: '%s'\n",
+                    toString<T>(msg).c_str());
+          }
+          return msg;
+        }
+        case GET_PEERNAME: {
+          T peername;
+          peername = deserialize<T>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",
+                    toString<T>(peername).c_str());
+          }
+          return peername;
+        }
+        case GET_PEER_INDEX: {
+          T peer_index = deserialize<T>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peer_index: '%s'\n",
+                    toString<T>(peer_index).c_str());
+          }
+          return peer_index;
+        }
+        case GET_PEER_COUNT: {
+          T peer_count = deserialize<T>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peer_count: '%s'\n",
+                    toString<T>(peer_count).c_str());
+          }
+          return peer_count;
+        }
+        case GET_SUPERSTEP_COUNT: {
+          T superstep_count = deserialize<T>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstep_count: '%s'\n",
+                    toString<T>(superstep_count).c_str());
+          }
+          return superstep_count;
+        }
+          
+        case SEQFILE_OPEN: {
+          T file_id = deserialize<T>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN file_id: '%s'\n",
+                    toString<T>(file_id).c_str());
+          }
+          return file_id;
+        }
+        case SEQFILE_APPEND: {
+          result = deserialize<T>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: '%s'\n",
+                    toString<T>(result).c_str());
+          }
+          return result;
+        }
+        case SEQFILE_CLOSE: {
+          result = deserialize<T>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: '%s'\n",
+                    toString<T>(result).c_str());
+          }
+          return result;
+        }
+      }
+      // Not expected response
+    } else {
+      
+      /*
+       case CLOSE: {
+       if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
+       handler_->close();
+       break;
+       }
+       case ABORT: {
+       if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
+       handler_->abort();
+       break;
+       }
+       */
+      HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+      fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(%d) - Unknown binary command: %d\n",
+              expected_response_cmd, cmd);
+    }
+    return result;
+  }
+  
+  /**
+   * Wait for next event, which should be a response for
+   * a previously sent command (expected_response_cmd)
+   * and return the generic vector result list
+   */
+  template<class T>
+  vector<T> getVectorResult(int32_t expected_response_cmd) {
+    
+    vector<T> results;
+    
+    // read response command
+    int32_t cmd;
+    cmd = deserializeInt(*in_stream_);
+    
+    // check if response is expected
+    if (expected_response_cmd == cmd) {
+      
+      switch (cmd) {
+        case GET_ALL_PEERNAME: {
+          vector<T> peernames;
+          T peername;
+          int32_t peername_count = deserialize<int32_t>(*in_stream_);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername_count: %d\n",
+                    peername_count);
+          }
+          for (int i=0; i<peername_count; i++)  {
+            peername = deserialize<T>(*in_stream_);
+            peernames.push_back(peername);
+            if(logging) {
+              fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: '%s'\n",
+                      toString<T>(peername).c_str());
+            }
+          }
+          return peernames;
+        }
+      }
+    } else {
+      HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+      fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(%d) - Unknown binary command: %d\n",
+              expected_response_cmd, cmd);
+    }
+    return results;
+  }
+  
+  /**
+   * Wait for next event, which should be a response for
+   * a previously sent command (expected_response_cmd)
+   * and return the generic KeyValuePair or an empty one
+   * if no data is available
+   */
+  template <class K, class V>
+  KeyValuePair<K,V> getKeyValueResult(int32_t expected_response_cmd) {
+    
+    KeyValuePair<K,V> key_value_pair;
+    
+    // read response command
+    int32_t cmd;
+    cmd = deserializeInt(*in_stream_);
+    
+    // check if response is expected or END_OF_DATA
+    if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) {
+      
+      switch (cmd) {
+          
+        case READ_KEYVALUE: {
+          K key = deserialize<K>(*in_stream_);
+          V value = deserialize<V>(*in_stream_);
+          
+          if(logging) {
+            string k = toString<K>(key);
+            string v = toString<V>(value);
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: '%s' value: '%s'\n",
+                    ((k.length()<10)?k.c_str():k.substr(0,9).append("...").c_str()),
+                    ((v.length()<10)?v.c_str():v.substr(0,9).append("...").c_str()) );
+          }
+          
+          key_value_pair = pair<K,V>(key, value);
+          return key_value_pair;
+        }
+        case SEQFILE_READNEXT: {
+          K key = deserialize<K>(*in_stream_);
+          V value = deserialize<V>(*in_stream_);
+          
+          if(logging) {
+            string k = toString<K>(key);
+            string v = toString<V>(value);
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: '%s' value: '%s'\n",
+                    ((k.length()<10)?k.c_str():k.substr(0,9).append("...").c_str()),
+                    ((v.length()<10)?v.c_str():v.substr(0,9).append("...").c_str()) );
+          }
+          
+          key_value_pair = pair<K,V>(key, value);
+          return key_value_pair;
+        }
+        case END_OF_DATA: {
+          key_value_pair = KeyValuePair<K,V>(true);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got END_OF_DATA\n");
+          }
+        }
+      }
+    } else {
+      key_value_pair = KeyValuePair<K,V>(true);
+      fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(expected_cmd = %d) - Unknown binary command: %d\n",
+              expected_response_cmd, cmd);
+      fprintf(stderr,"ERORR: Please verfiy serialization! The key or value type could possibly not be deserialized!\n");
+      HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+    }
+    return key_value_pair;
+  }
+  
+  virtual ~BinaryProtocol() {
+    delete in_stream_;
+    delete uplink_;
+    delete handler_;
+  }
+};
+
+/********************************************/
+/************** BSPContextImpl **************/
+/********************************************/
+template<class K1, class V1, class K2, class V2, class M>
+class BSPContextImpl: public BSPContext<K1, V1, K2, V2, M>, public DownwardProtocol<K1, V1> {
+private:
+  const Factory<K1, V1, K2, V2, M>* factory_;
+  BSPJob* job_;
+  BSP<K1, V1, K2, V2, M>* bsp_;
+  Partitioner<K1, V1, K2, V2, M>* partitioner_;
+  RecordReader<K1, V1>* reader_;
+  RecordWriter<K2, V2>* writer_;
+  Protocol< BinaryProtocol<K1,V1> >* protocol_;
+  UpwardProtocol<BinaryUpwardProtocol>* uplink_;
+  
+  bool done_;
+  bool has_task_;
+  pthread_mutex_t mutex_done_;
+  std::vector<int> registered_counter_ids_;
+  
+  pair<string, string> inputClass_;
+  //string* inputSplit_;
+  
+public:
+  
+  BSPContextImpl(const Factory<K1, V1, K2, V2, M>& factory) {
+    
+    factory_ = &factory;
+    job_ = NULL;
+    bsp_ = NULL;
+    partitioner_ = NULL;
+    reader_ = NULL;
+    writer_ = NULL;
+    protocol_ = NULL;
+    
+    done_ = false;
+    has_task_ = false;
+    pthread_mutex_init(&mutex_done_, NULL);
+    
+    //inputSplit_ = NULL;
+  }
+  
+  
+  /********************************************/
+  /*********** DownwardProtocol IMPL **********/
+  /********************************************/
+  virtual void start(int protocol_version) {
+    if (protocol_version != 0) {
+      throw Error("Protocol version " + toString(protocol_version) +
+                  " not supported");
+    }
+    partitioner_ = factory_->createPartitioner(*this);
+  }
+  
+  virtual void setBSPJob(vector<string> values) {
+    int len = values.size();
+    BSPJobImpl* result = new BSPJobImpl();
+    HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
+    for(int i=0; i < len; i += 2) {
+      result->set(values[i], values[i+1]);
+    }
+    job_ = result;
+  }
+  
+  virtual void setInputTypes(string key_type, string value_type) {
+    inputClass_ = pair<string,string>(key_type, value_type);
+  }
+  
+  /* local method */
+  void setupReaderWriter(bool piped_input, bool piped_output) {
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n",
+              (piped_input)?"true":"false", (piped_output)?"true":"false");
+    }
+    
+    if (piped_input && reader_==NULL) {
+      reader_ = factory_->createRecordReader(*this);
+      HADOOP_ASSERT((reader_ == NULL) == piped_input,
+                    piped_input ? "RecordReader defined when not needed.":
+                    "RecordReader not defined");
+      
+      //if (reader != NULL) {
+      //  value = new string();
+      //}
+    }
+    
+    if (piped_output && writer_==NULL) {
+      writer_ = factory_->createRecordWriter(*this);
+      HADOOP_ASSERT((writer_ == NULL) == piped_output,
+                    piped_output ? "RecordWriter defined when not needed.":
+                    "RecordWriter not defined");
+    }
+  }
+  
+  virtual void runSetup(bool piped_input, bool piped_output) {
+    setupReaderWriter(piped_input, piped_output);
+    
+    if (bsp_ == NULL) {
+      bsp_ = factory_->createBSP(*this);
+    }
+    
+    if (bsp_ != NULL) {
+      has_task_ = true;
+      bsp_->setup(*this);
+      has_task_ = false;
+      uplink_->sendCommand(TASK_DONE);
+    }
+  }
+  
+  virtual void runBsp(bool piped_input, bool piped_output) {
+    setupReaderWriter(piped_input, piped_output);
+    
+    if (bsp_ == NULL) {
+      bsp_ = factory_->createBSP(*this);
+    }
+    
+    if (bsp_ != NULL) {
+      has_task_ = true;
+      bsp_->bsp(*this);
+      has_task_ = false;
+      uplink_->sendCommand(TASK_DONE);
+    }
+  }
+  
+  virtual void runCleanup(bool piped_input, bool piped_output) {
+    setupReaderWriter(piped_input, piped_output);
+    
+    if (bsp_ != NULL) {
+      has_task_ = true;
+      bsp_->cleanup(*this);
+      has_task_ = false;
+      uplink_->sendCommand(TASK_DONE);
+    }
+  }
+  
+  /********************************************/
+  /*******       Partitioner            *******/
+  /********************************************/
+  virtual void runPartition(const K1& key, const V1& value, int32_t num_tasks) {
+    if (partitioner_ != NULL) {
+      int part = partitioner_->partition(key, value, num_tasks);
+      uplink_->sendCommand<int32_t>(PARTITION_RESPONSE, part);
+    } else {
+      if(logging) {
+        fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
+      }
+    }
+  }
+  
+  virtual void close() {
+    pthread_mutex_lock(&mutex_done_);
+    done_ = true;
+    has_task_ = false;
+    pthread_mutex_unlock(&mutex_done_);
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n",
+              (done_)?"true":"false",(has_task_)?"true":"false");
+    }
+  }
+  
+  virtual void abort() {
+    throw Error("Aborted by driver");
+  }
+  
+  /********************************************/
+  /************** TaskContext IMPL ************/
+  /********************************************/
+  
+  /**
+   * Get the BSPJob for the current task.
+   */
+  virtual const BSPJob* getBSPJob() {
+    return job_;
+  }
+  
+  /**
+   * Get the current key.
+   * @return the current key or NULL if called before the first map or reduce
+   */
+  //virtual const string& getInputKey() {
+  //  return key;
+  //}
+  
+  /**
+   * Get the current value.
+   * @return the current value or NULL if called before the first map or
+   *    reduce
+   */
+  //virtual const string& getInputValue() {
+  //  return *value;
+  //}
+  
+  /**
+   * Register a counter with the given group and name.
+   */
+  /*
+   virtual Counter* getCounter(const std::string& group,
+   const std::string& name) {
+   int id = registeredCounterIds.size();
+   registeredCounterIds.push_back(id);
+   uplink->registerCounter(id, group, name);
+   return new Counter(id);
+   }*/
+  
+  /**
+   * Increment the value of the counter with the given amount.
+   */
+  virtual void incrementCounter(const string& group, const string& name, uint64_t amount)  {
+    uplink_->incrementCounter(group, name, amount);
+  }
+  
+  /********************************************/
+  /************** BSPContext IMPL *************/
+  /********************************************/
+  
+  /**
+   * Access the InputSplit of the bsp.
+   */
+  //virtual const string& getInputSplit() {
+  //  return *inputSplit_;
+  //}
+  
+  /**
+   * Get the name of the key class of the input to this task.
+   */
+  virtual string getInputKeyClass() {
+    return inputClass_.first;
+  }
+  
+  /**
+   * Get the name of the value class of the input to this task.
+   */
+  virtual string getInputValueClass() {
+    return inputClass_.second;
+  }
+  
+  /**
+   * Send a data with a tag to another BSPSlave corresponding to hostname.
+   * Messages sent by this method are not guaranteed to be received in a sent
+   * order.
+   */
+  virtual void sendMessage(const string& peer_name, const M& msg) {
+    uplink_->sendCommand<string,M>(SEND_MSG, peer_name, msg);
+  }
+  
+  /**
+   * @return A message from the peer's received messages queue (a FIFO).
+   */
+  virtual M getCurrentMessage() {
+    uplink_->sendCommand(GET_MSG);
+    
+    M message = protocol_->template getResult<M>(GET_MSG);
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - result: %s\n",
+              toString<M>(message).c_str());
+    }
+    return message;
+  }
+  
+  /**
+   * @return The number of messages in the peer's received messages queue.
+   */
+  virtual int getNumCurrentMessages() {
+    uplink_->sendCommand(GET_MSG_COUNT);
+    
+    int result = protocol_->template getResult<int32_t>(GET_MSG_COUNT);
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::getNumCurrentMessages - result: %d\n",
+              result);
+    }
+    return result;
+  }
+  
+  /**
+   * Barrier Synchronization.
+   *
+   * Sends all the messages in the outgoing message queues to the corresponding
+   * remote peers.
+   */
+  virtual void sync() {
+    uplink_->sendCommand(SYNC);
+  }
+  
+  /**
+   * @return the name of this peer in the format "hostname:port".
+   */
+  virtual string getPeerName() {
+    // submit id=-1 to receive own peername
+    uplink_->sendCommand<int32_t>(GET_PEERNAME, -1);
+    
+    string result = protocol_->template getResult<string>(GET_PEERNAME);
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - result: %s\n",
+              result.c_str());
+    }
+    return result;
+  }
+  
+  /**
+   * @return the name of n-th peer from sorted array by name.
+   */
+  virtual string getPeerName(int index) {
+    uplink_->sendCommand<int32_t>(GET_PEERNAME, index);
+    
+    string result = protocol_->template getResult<string>(GET_PEERNAME);
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - result: %s\n",
+              result.c_str());
+    }
+    
+    return result;
+  }
+  
+  /**
+   * @return the names of all the peers executing tasks from the same job
+   *         (including this peer).
+   */
+  virtual vector<string> getAllPeerNames() {
+    uplink_->sendCommand(GET_ALL_PEERNAME);
+    
+    vector<string> results = protocol_->template getVectorResult<string>(GET_ALL_PEERNAME);
+    
+    return results;
+  }
+  
+  /**
+   * @return the index of this peer from sorted array by name.
+   */
+  virtual int getPeerIndex() {
+    uplink_->sendCommand(GET_PEER_INDEX);
+    
+    int result = protocol_->template getResult<int32_t>(GET_PEER_INDEX);
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerIndex - result: %d\n",
+              result);
+    }
+    return result;
+  }
+  
+  /**
+   * @return the number of peers
+   */
+  virtual int getNumPeers() {
+    uplink_->sendCommand(GET_PEER_COUNT);
+    
+    int result = protocol_->template getResult<int32_t>(GET_PEER_COUNT);
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::getNumPeers - result: %d\n",
+              result);
+    }
+    return result;
+  }
+  
+  /**
+   * @return the count of current super-step
+   */
+  virtual long getSuperstepCount() {
+    uplink_->sendCommand(GET_SUPERSTEP_COUNT);
+    
+    long result = protocol_->template getResult<int64_t>(GET_SUPERSTEP_COUNT);
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::getSuperstepCount - result: %ld\n",
+              result);
+    }
+    return result;
+  }
+  
+  /**
+   * Clears all queues entries.
+   */
+  virtual void clear() {
+    uplink_->sendCommand(CLEAR);
+  }
+  
+  /**
+   * Writes a key/value pair to the output collector
+   */
+  virtual void write(const K2& key, const V2& value) {
+    if (writer_ != NULL) {
+      writer_->emit(key, value);
+    } else {
+      uplink_->sendCommand<K2,V2>(WRITE_KEYVALUE, key, value);
+    }
+  }
+  
+  /**
+   * Deserializes the next input key value into the given objects;
+   */
+  virtual bool readNext(K1& key, V1& value) {
+    
+    uplink_->sendCommand(READ_KEYVALUE);
+    
+    // TODO
+    // check if value is array [0, 1, 2, ...], and remove brackets
+    /*
+     int len = current_value_.length();
+     if ( (current_value_[0]=='[') &&
+     (current_value_[len-1]==']') ) {
+     value = current_value_.substr(1,len-2);
+     } else {
+     value = current_value_;
+     }
+     
+     if (logging && key.empty() && value.empty()) {
+     fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty KeyValuePair\n");
+     }
+     
+     return (!key.empty() && !value.empty());
+     */
+    
+    KeyValuePair<K1,V1> key_value_pair;
+    key_value_pair = protocol_->template getKeyValueResult<K1,V1>(READ_KEYVALUE);
+    
+    if (!key_value_pair.is_empty) {
+      key = key_value_pair.first;
+      value = key_value_pair.second;
+    }
+    
+    if (logging && key_value_pair.is_empty) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - END_OF_DATA\n");
+    }
+    
+    return (!key_value_pair.is_empty);
+  }
+  
+  /**
+   * Closes the input and opens it right away, so that the file pointer is at
+   * the beginning again.
+   */
+  virtual void reopenInput() {
+    uplink_->sendCommand(REOPEN_INPUT);
+  }
+  
+  
+  /********************************************/
+  /*******  SequenceFileConnector IMPL  *******/
+  /********************************************/
+  
+  /**
+   * Open SequenceFile with opion "r" or "w"
+   * @return the corresponding fileID
+   */
+  virtual int32_t sequenceFileOpen(const string& path, const string& option,
+                                   const string& key_type, const string& value_type) {
+    if (logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",
+              path.c_str());
+    }
+    
+    if ( (option.compare("r")==0) || (option.compare("w")==0))  {
+      
+      string values[] = {path, option, key_type, value_type};
+      uplink_->sendCommand<string>(SEQFILE_OPEN, values, 4);
+      
+      int result = protocol_->template getResult<int32_t>(SEQFILE_OPEN);
+      
+      if(logging) {
+        fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - result: %d\n",
+                result);
+      }
+      return result;
+    } else {
+      //Error wrong option
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",
+              option.c_str());
+      return -1;
+    }
+  }
+  
+  /**
+   * Close SequenceFile
+   */
+  virtual bool sequenceFileClose(int32_t file_id) {
+    uplink_->sendCommand<int32_t>(SEQFILE_CLOSE, file_id);
+    
+    int result = protocol_->template getResult<int32_t>(SEQFILE_CLOSE);
+    
+    if (logging && result==0) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - Nothing was closed!\n");
+    } else if (logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - File was successfully closed!\n");
+    }
+    
+    return (result==1);
+  }
+  
+  /**
+   * Read next key/value pair from the SequenceFile with fileID
+   * Using Curiously recurring template pattern(CTRP)
+   */
+  template<class K, class V>
+  bool sequenceFileReadNext(int32_t file_id, K& key, V& value) {
+    
+    // send request
+    uplink_->sendCommand<int32_t>(SEQFILE_READNEXT, file_id);
+    
+    // TODO
+    /*
+     // check if value is array [0, 1, 2, ...], and remove brackets
+     int len = current_value_.length();
+     if ( (current_value_[0]=='[') &&
+     (current_value_[len-1]==']') ) {
+     value = current_value_(1,len-2);
+     } else {
+     value = current_value_;
+     }
+     */
+    
+    // get response
+    KeyValuePair<K,V> key_value_pair;
+    key_value_pair = protocol_->template getKeyValueResult<K,V>(SEQFILE_READNEXT);
+    
+    if (!key_value_pair.is_empty) {
+      key = key_value_pair.first;
+      value = key_value_pair.second;
+    }
+    
+    if (logging && key_value_pair.is_empty) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - END_OF_DATA\n");
+    }
+    
+    return (!key_value_pair.is_empty);
+  }
+  
+  /**
+   * Append the next key/value pair to the SequenceFile with fileID
+   * Using Curiously recurring template pattern(CTRP)
+   */
+  template<class K, class V>
+  bool sequenceFileAppend(int32_t file_id, const K& key, const V& value) {
+    string values[] = {key, value};
+    uplink_->sendCommand<int32_t,string>(SEQFILE_APPEND, file_id, values, 2);
+    
+    int result = protocol_->template getResult<int32_t>(SEQFILE_APPEND);
+    
+    if (logging && result==0) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileAppend - Nothing appended!\n");
+    } else if (logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileAppend - Successfully appended!\n");
+    }
+    
+    return (result==1);
+  }
+  
+  /********************************************/
+  /*************** Other STUFF  ***************/
+  /********************************************/
+  
+  void setProtocol(Protocol< BinaryProtocol<K1,V1> >* protocol, UpwardProtocol<BinaryUpwardProtocol>* uplink) {
+    protocol_ = protocol;
+    uplink_ = uplink;
+  }
+  
+  bool isDone() {
+    pthread_mutex_lock(&mutex_done_);
+    bool done_copy = done_;
+    pthread_mutex_unlock(&mutex_done_);
+    return done_copy;
+  }
+  
+  /**
+   * Advance to the next value.
+   */
+  /*
+   bool nextValue() {
+   if (isNewKey || done) {
+   return false;
+   }
+   isNewValue = false;
+   //progress();
+   protocol->nextEvent();
+   return isNewValue;
+   }
+   */
+  void waitForTask() {
+    while (!done_ && !has_task_) {
+      if(logging) {
+        fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n",
+                (done_)?"true":"false", (has_task_)?"true":"false");
+      }
+      protocol_->nextEvent();
+    }
+  }
+  /*
+   bool nextKey() {
+   if (reader == NULL) {
+   while (!isNewKey) {
+   nextValue();
+   if (done) {
+   return false;
+   }
+   }
+   key = *newKey;
+   } else {
+   if (!reader->next(key, const_cast<string&>(*value))) {
+   pthread_mutex_lock(&mutexDone);
+   done = true;
+   pthread_mutex_unlock(&mutexDone);
+   return false;
+   }
+   //progressFloat = reader->getProgress();
+   }
+   isNewKey = false;
+   
+   if (bsp != NULL) {
+   bsp->bsp(*this);
+   }
+   return true;
+   }
+   */
+  void closeAll() {
+    if (reader_) {
+      reader_->close();
+    }
+    
+    if (bsp_) {
+      bsp_->close();
+    }
+    
+    if (writer_) {
+      writer_->close();
+    }
+  }
+  
+  virtual ~BSPContextImpl() {
+    delete job_;
+    //delete inputSplit_;
+    //if (reader) {
+    //  delete value;
+    //}
+    delete reader_;
+    delete bsp_;
+    delete writer_;
+    pthread_mutex_destroy(&mutex_done_);
+  }
+};
+
+/**
+ * Ping the parent every 5 seconds to know if it is alive
+ */
+template<class K1, class V1, class K2, class V2, class M>
+void* ping(void* ptr) {
+  BSPContextImpl<K1, V1, K2, V2, M>* context = (BSPContextImpl<K1, V1, K2, V2, M>*) ptr;
+  char* portStr = getenv("hama.pipes.command.port");
+  int MAX_RETRIES = 3;
+  int remaining_retries = MAX_RETRIES;
+  while (!context->isDone()) {
+    try{
+      sleep(5);
+      int sock = -1;
+      if (portStr) {
+        sock = socket(PF_INET, SOCK_STREAM, 0);
+        HADOOP_ASSERT(sock != - 1,
+                      string("problem creating socket: ") + strerror(errno));
+        sockaddr_in addr;
+        addr.sin_family = AF_INET;
+        addr.sin_port = htons(toInt(portStr));
+        addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+        if(logging) {
+          fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n",
+                  portStr);
+        }
+        HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
+                      string("problem connecting command socket: ") +
+                      strerror(errno));
+        
+      }
+      if (sock != -1) {
+        int result = shutdown(sock, SHUT_RDWR);
+        HADOOP_ASSERT(result == 0, "problem shutting socket");
+        result = close(sock);
+        HADOOP_ASSERT(result == 0, "problem closing socket");
+      }
+      remaining_retries = MAX_RETRIES;
+    } catch (Error& err) {
+      if (!context->isDone()) {
+        fprintf(stderr, "Hama Pipes Exception: in ping %s\n",
+                err.getMessage().c_str());
+        remaining_retries -= 1;
+        if (remaining_retries == 0) {
+          exit(1);
+        }
+      } else {
+        return NULL;
+      }
+    }
+  }
+  return NULL;
+}
+
+/**
+ * Run the assigned task in the framework.
+ * The user's main function should set the various functions using the
+ * set* functions above and then call this.
+ * @return true, if the task succeeded.
+ */
+template<class K1, class V1, class K2, class V2, class M>
+bool runTask(const Factory<K1, V1, K2, V2, M>& factory) {
+  try {
+    HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL, "No environment found!");
+    
+    logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true;
+    if (logging) {
+      fprintf(stderr,"HamaPipes::runTask - logging is: %s\n",
+              ((logging)?"true":"false"));
+    }
+    
+    BSPContextImpl<K1, V1, K2, V2, M>* context = new BSPContextImpl<K1, V1, K2, V2, M>(factory);
+    Protocol< BinaryProtocol<K1,V1> >* protocol;
+    
+    char* port_str = getenv("hama.pipes.command.port");
+    int sock = -1;
+    FILE* in_stream = NULL;
+    FILE* out_stream = NULL;
+    char *bufin = NULL;
+    char *bufout = NULL;
+    if (port_str) {
+      sock = socket(PF_INET, SOCK_STREAM, 0);
+      HADOOP_ASSERT(sock != - 1,
+                    string("problem creating socket: ") + strerror(errno));
+      sockaddr_in addr;
+      addr.sin_family = AF_INET;
+      addr.sin_port = htons(toInt(port_str));
+      addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+      HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
+                    string("problem connecting command socket: ") +
+                    strerror(errno));
+      
+      in_stream = fdopen(sock, "r");
+      out_stream = fdopen(sock, "w");
+      
+      // increase buffer size
+      int bufsize = 128*1024;
+      int setbuf;
+      bufin = new char[bufsize];
+      bufout = new char[bufsize];
+      setbuf = setvbuf(in_stream, bufin, _IOFBF, bufsize);
+      HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for in_stream: ")
+                    + strerror(errno));
+      setbuf = setvbuf(out_stream, bufout, _IOFBF, bufsize);
+      HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for out_stream: ")
+                    + strerror(errno));
+      
+      protocol = new BinaryProtocol<K1,V1>(in_stream, context, out_stream);
+      if(logging) {
+        fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n",
+                port_str);
+      }
+      
+    } else if (getenv("hama.pipes.command.file")) {
+      char* filename = getenv("hama.pipes.command.file");
+      string out_filename = filename;
+      out_filename += ".out";
+      in_stream = fopen(filename, "r");
+      out_stream = fopen(out_filename.c_str(), "w");
+      protocol = new BinaryProtocol<K1,V1>(in_stream, context, out_stream);
+    } else {
+      //protocol = new TextProtocol(stdin, context, stdout);
+      fprintf(stderr,"HamaPipes::runTask - Protocol couldn't be initialized!\n");
+      return -1;
+    }
+    
+    context->setProtocol(protocol, protocol->getUplink());
+    
+    //pthread_t pingThread;
+    //pthread_create(&pingThread, NULL, ping, (void*)(context));
+    
+    context->waitForTask();
+    
+    //while (!context->isDone()) {
+    //context->nextKey();
+    //}
+    
+    context->closeAll();
+    protocol->getUplink()->sendCommand(DONE);
+    
+    //pthread_join(pingThread,NULL);
+    
+    // Cleanup
+    delete context;
+    delete protocol;
+    
+    if (in_stream != NULL) {
+      fflush(in_stream);
+    }
+    if (out_stream != NULL) {
+      fflush(out_stream);
+    }
+    
+    fflush(stdout);
+    
+    if (sock != -1) {
+      int result = shutdown(sock, SHUT_RDWR);
+      HADOOP_ASSERT(result == 0, "problem shutting socket");
+      result = close(sock);
+      HADOOP_ASSERT(result == 0, "problem closing socket");
+    }
+    
+    //TODO REFACTOR
+    if (in_stream != NULL) {
+      //fclose(stream);
+    }
+    if (out_stream != NULL) {
+      //fclose(outStream);
+    }
+    delete bufin;
+    delete bufout;
+    
+    return true;
+    
+  } catch (Error& err) {
+    fprintf(stderr, "Hama Pipes Exception: %s\n",
+            err.getMessage().c_str());
+    return false;
+  }
+}

Modified: hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/api/hadoop/SerialUtils.hh?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh (original)
+++ hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh Sat Nov 23 08:41:09 2013
@@ -22,7 +22,7 @@
 #include <stdint.h>
 
 namespace HadoopUtils {
-
+  
   /**
    * A simple exception class that records a message for the user.
    */
@@ -30,38 +30,38 @@ namespace HadoopUtils {
   private:
     std::string error;
   public:
-
+    
     /**
      * Create an error object with the given message.
      */
     Error(const std::string& msg);
-
+    
     /**
      * Construct an error object with the given message that was created on
      * the given file, line, and functino.
      */
-    Error(const std::string& msg, 
+    Error(const std::string& msg,
           const std::string& file, int line, const std::string& function);
-
+    
     /**
      * Get the error message.
      */
     const std::string& getMessage() const;
   };
-
+  
   /**
    * Check to make sure that the condition is true, and throw an exception
    * if it is not. The exception will contain the message and a description
    * of the source location.
    */
-  #define HADOOP_ASSERT(CONDITION, MESSAGE) \
-    { \
-      if (!(CONDITION)) { \
-        throw HadoopUtils::Error((MESSAGE), __FILE__, __LINE__, \
-                                    __PRETTY_FUNCTION__); \
-      } \
-    }
-
+#define HADOOP_ASSERT(CONDITION, MESSAGE) \
+{ \
+if (!(CONDITION)) { \
+throw HadoopUtils::Error((MESSAGE), __FILE__, __LINE__, \
+__PRETTY_FUNCTION__); \
+} \
+}
+  
   /**
    * An interface for an input stream.
    */
@@ -76,7 +76,7 @@ namespace HadoopUtils {
     virtual void read(void *buf, size_t len) = 0;
     virtual ~InStream() {}
   };
-
+  
   /**
    * An interface for an output stream.
    */
@@ -95,7 +95,7 @@ namespace HadoopUtils {
     virtual void flush() = 0;
     virtual ~OutStream() {}
   };
-
+  
   /**
    * A class to read a file as a stream.
    */
@@ -118,18 +118,18 @@ namespace HadoopUtils {
      */
     bool isOwned;
   };
-
+  
   /**
    * A class to write a stream to a file.
    */
   class FileOutStream: public OutStream {
   public:
-
+    
     /**
      * Create a stream that isn't bound to anything.
      */
     FileOutStream();
-
+    
     /**
      * Create the given file, potentially overwriting an existing file.
      */
@@ -144,7 +144,7 @@ namespace HadoopUtils {
     FILE *mFile;
     bool isOwned;
   };
-
+  
   /**
    * A stream that reads from a string.
    */
@@ -156,15 +156,17 @@ namespace HadoopUtils {
     const std::string& buffer;
     std::string::const_iterator itr;
   };
-
+  
   void serializeInt(int32_t t, OutStream& stream);
   int32_t deserializeInt(InStream& stream);
   void serializeLong(int64_t t, OutStream& stream);
   int64_t deserializeLong(InStream& stream);
   void serializeFloat(float t, OutStream& stream);
   float deserializeFloat(InStream& stream);
+  void serializeDouble(double t, OutStream& stream);
+  double deserializeDouble(InStream& stream);
   void serializeString(const std::string& t, OutStream& stream);
-  void deserializeString(std::string& t, InStream& stream);
+  std::string deserializeString(InStream& stream);
 }
 
 #endif

Modified: hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/impl/SerialUtils.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc (original)
+++ hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc Sat Nov 23 08:41:09 2013
@@ -27,41 +27,41 @@
 using std::string;
 
 namespace HadoopUtils {
-
+  
   Error::Error(const std::string& msg): error(msg) {
   }
-
-  Error::Error(const std::string& msg, 
-               const std::string& file, int line, 
+  
+  Error::Error(const std::string& msg,
+               const std::string& file, int line,
                const std::string& function) {
-    error = msg + " at " + file + ":" + toString(line) + 
-            " in " + function;
+    error = msg + " at " + file + ":" + toString(line) +
+    " in " + function;
   }
-
+  
   const std::string& Error::getMessage() const {
     return error;
   }
-
+  
   FileInStream::FileInStream()
   {
     mFile = NULL;
     isOwned = false;
   }
-
+  
   bool FileInStream::open(const std::string& name)
   {
     mFile = fopen(name.c_str(), "rb");
     isOwned = true;
     return (mFile != NULL);
   }
-
+  
   bool FileInStream::open(FILE* file)
   {
     mFile = file;
     isOwned = false;
     return (mFile != NULL);
   }
-
+  
   void FileInStream::read(void *buf, size_t len)
   {
     size_t result = fread(buf, len, 1, mFile);
@@ -73,12 +73,12 @@ namespace HadoopUtils {
       }
     }
   }
-
+  
   bool FileInStream::skip(size_t nbytes)
   {
     return (0==fseek(mFile, nbytes, SEEK_CUR));
   }
-
+  
   bool FileInStream::close()
   {
     int ret = 0;
@@ -88,20 +88,20 @@ namespace HadoopUtils {
     mFile = NULL;
     return (ret==0);
   }
-
+  
   FileInStream::~FileInStream()
   {
     if (mFile != NULL) {
       close();
     }
   }
-
+  
   FileOutStream::FileOutStream()
   {
     mFile = NULL;
     isOwned = false;
   }
-
+  
   bool FileOutStream::open(const std::string& name, bool overwrite)
   {
     if (!overwrite) {
@@ -115,26 +115,26 @@ namespace HadoopUtils {
     isOwned = true;
     return (mFile != NULL);
   }
-
+  
   bool FileOutStream::open(FILE* file)
   {
     mFile = file;
     isOwned = false;
     return (mFile != NULL);
   }
-
+  
   void FileOutStream::write(const void* buf, size_t len)
   {
     size_t result = fwrite(buf, len, 1, mFile);
     HADOOP_ASSERT(result == 1,
                   string("write error to file: ") + strerror(errno));
   }
-
+  
   bool FileOutStream::advance(size_t nbytes)
   {
     return (0==fseek(mFile, nbytes, SEEK_CUR));
   }
-
+  
   bool FileOutStream::close()
   {
     int ret = 0;
@@ -144,23 +144,23 @@ namespace HadoopUtils {
     mFile = NULL;
     return (ret == 0);
   }
-
+  
   void FileOutStream::flush()
   {
     fflush(mFile);
   }
-
+  
   FileOutStream::~FileOutStream()
   {
     if (mFile != NULL) {
       close();
     }
   }
-
+  
   StringInStream::StringInStream(const std::string& str): buffer(str) {
     itr = buffer.begin();
   }
-
+  
   void StringInStream::read(void *buf, size_t buflen) {
     size_t bytes = 0;
     char* output = (char*) buf;
@@ -174,11 +174,11 @@ namespace HadoopUtils {
     }
     HADOOP_ASSERT(bytes == buflen, "unexpected end of string reached");
   }
-
+  
   void serializeInt(int32_t t, OutStream& stream) {
     serializeLong(t,stream);
   }
-
+  
   void serializeLong(int64_t t, OutStream& stream)
   {
     if (t >= -112 && t <= 127) {
@@ -186,22 +186,22 @@ namespace HadoopUtils {
       stream.write(&b, 1);
       return;
     }
-        
+    
     int8_t len = -112;
     if (t < 0) {
       t ^= -1ll; // reset the sign bit
       len = -120;
     }
-        
+    
     uint64_t tmp = t;
     while (tmp != 0) {
       tmp = tmp >> 8;
       len--;
     }
-  
-    stream.write(&len, 1);      
+    
+    stream.write(&len, 1);
     len = (len < -120) ? -(len + 120) : -(len + 112);
-        
+    
     for (uint32_t idx = len; idx != 0; idx--) {
       uint32_t shiftbits = (idx - 1) * 8;
       uint64_t mask = 0xFFll << shiftbits;
@@ -209,11 +209,11 @@ namespace HadoopUtils {
       stream.write(&b, 1);
     }
   }
-
+  
   int32_t deserializeInt(InStream& stream) {
     return deserializeLong(stream);
   }
-
+  
   int64_t deserializeLong(InStream& stream)
   {
     int8_t b;
@@ -242,7 +242,7 @@ namespace HadoopUtils {
     }
     return t;
   }
-
+  
   void serializeFloat(float t, OutStream& stream)
   {
     char buf[sizeof(float)];
@@ -251,16 +251,38 @@ namespace HadoopUtils {
     xdr_float(&xdrs, &t);
     stream.write(buf, sizeof(float));
   }
-
-  void deserializeFloat(float& t, InStream& stream)
+  
+  float deserializeFloat(InStream& stream)
   {
+    float t;
     char buf[sizeof(float)];
     stream.read(buf, sizeof(float));
     XDR xdrs;
     xdrmem_create(&xdrs, buf, sizeof(float), XDR_DECODE);
     xdr_float(&xdrs, &t);
+    return t;
   }
-
+  
+  void serializeDouble(double t, OutStream& stream)
+  {
+    char buf[sizeof(double)];
+    XDR xdrs;
+    xdrmem_create(&xdrs, buf, sizeof(double), XDR_ENCODE);
+    xdr_double(&xdrs, &t);
+    stream.write(buf, sizeof(double));
+  }
+  
+  double deserializeDouble(InStream& stream)
+  {
+    double t;
+    char buf[sizeof(double)];
+    stream.read(buf, sizeof(double));
+    XDR xdrs;
+    xdrmem_create(&xdrs, buf, sizeof(double), XDR_DECODE);
+    xdr_double(&xdrs, &t);
+    return t;
+  }
+  
   void serializeString(const std::string& t, OutStream& stream)
   {
     serializeInt(t.length(), stream);
@@ -268,9 +290,10 @@ namespace HadoopUtils {
       stream.write(t.data(), t.length());
     }
   }
-
-  void deserializeString(std::string& t, InStream& stream)
+  
+  std::string deserializeString(InStream& stream)
   {
+    std::string t;
     int32_t len = deserializeInt(stream);
     if (len > 0) {
       // resize the string to the right length
@@ -289,6 +312,6 @@ namespace HadoopUtils {
     } else {
       t.clear();
     }
+    return t;
   }
-
 }

Modified: hama/trunk/c++/src/main/native/utils/impl/StringUtils.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/impl/StringUtils.cc?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/utils/impl/StringUtils.cc (original)
+++ hama/trunk/c++/src/main/native/utils/impl/StringUtils.cc Sat Nov 23 08:41:09 2013
@@ -32,14 +32,14 @@ using std::string;
 using std::vector;
 
 namespace HadoopUtils {
-
+  
   // Added by Apache Hama Pipes
   string toString(double x) {
     std::stringstream ss;
     ss << std::setprecision(16) << x;
     return ss.str();
   }
-
+  
   // Added by Apache Hama Pipes
   double toDouble(const string& val) {
     const char* begin = val.c_str();
@@ -47,36 +47,36 @@ namespace HadoopUtils {
     double result = strtod(begin, &end);
     size_t s = end - begin;
     if(s < val.size()) {
-      throw Error("Problem converting "+val+" to double. (result:"
+      throw Error("Problem converting '"+val+"' to double. (result:"
                   +toString(result)+")");
     }
     return result;
   }
-    
+  
   string toString(int32_t x) {
     char str[100];
     sprintf(str, "%d", x);
     return str;
   }
-
+  
   int toInt(const string& val) {
     int result;
     char trash;
     int num = sscanf(val.c_str(), "%d%c", &result, &trash);
     HADOOP_ASSERT(num == 1,
-                  "Problem converting " + val + " to integer.");
+                  "Problem converting '" + val + "' to integer.");
     return result;
   }
-
+  
   float toFloat(const string& val) {
     float result;
     char trash;
     int num = sscanf(val.c_str(), "%f%c", &result, &trash);
     HADOOP_ASSERT(num == 1,
-                  "Problem converting " + val + " to float.");
+                  "Problem converting '" + val + "' to float.");
     return result;
   }
-
+  
   bool toBool(const string& val) {
     if (val == "true") {
       return true;
@@ -84,10 +84,10 @@ namespace HadoopUtils {
       return false;
     } else {
       HADOOP_ASSERT(false,
-                    "Problem converting " + val + " to boolean.");
+                    "Problem converting '" + val + "' to boolean.");
     }
   }
-
+  
   /**
    * Get the current time in the number of milliseconds since 1970.
    */
@@ -98,15 +98,15 @@ namespace HadoopUtils {
     HADOOP_ASSERT(sys != -1, strerror(errno));
     return tv.tv_sec * 1000 + tv.tv_usec / 1000;
   }
-
+  
   vector<string> splitString(const std::string& str,
-			     const char* separator) {
+                             const char* separator) {
     vector<string> result;
     string::size_type prev_pos=0;
     string::size_type pos=0;
     while ((pos = str.find_first_of(separator, prev_pos)) != string::npos) {
       if (prev_pos < pos) {
-	result.push_back(str.substr(prev_pos, pos-prev_pos));
+        result.push_back(str.substr(prev_pos, pos-prev_pos));
       }
       prev_pos = pos + 1;
     }
@@ -115,7 +115,7 @@ namespace HadoopUtils {
     }
     return result;
   }
-
+  
   string quoteString(const string& str,
                      const char* deliminators) {
     
@@ -123,31 +123,31 @@ namespace HadoopUtils {
     for(int i=result.length() -1; i >= 0; --i) {
       char ch = result[i];
       if (!isprint(ch) ||
-          ch == '\\' || 
+          ch == '\\' ||
           strchr(deliminators, ch)) {
         switch (ch) {
-        case '\\':
-          result.replace(i, 1, "\\\\");
-          break;
-        case '\t':
-          result.replace(i, 1, "\\t");
-          break;
-        case '\n':
-          result.replace(i, 1, "\\n");
-          break;
-        case ' ':
-          result.replace(i, 1, "\\s");
-          break;
-        default:
-          char buff[4];
-          sprintf(buff, "\\%02x", static_cast<unsigned char>(result[i]));
-          result.replace(i, 1, buff);
+          case '\\':
+            result.replace(i, 1, "\\\\");
+            break;
+          case '\t':
+            result.replace(i, 1, "\\t");
+            break;
+          case '\n':
+            result.replace(i, 1, "\\n");
+            break;
+          case ' ':
+            result.replace(i, 1, "\\s");
+            break;
+          default:
+            char buff[4];
+            sprintf(buff, "\\%02x", static_cast<unsigned char>(result[i]));
+            result.replace(i, 1, buff);
         }
       }
     }
     return result;
   }
-
+  
   string unquoteString(const string& str) {
     string result(str);
     string::size_type current = result.find('\\');
@@ -158,8 +158,8 @@ namespace HadoopUtils {
         if (isxdigit(result[current+1])) {
           num_chars = 2;
           HADOOP_ASSERT(current + num_chars < result.size(),
-                     "escape pattern \\<hex><hex> is missing second digit in '"
-                     + str + "'");
+                        "escape pattern \\<hex><hex> is missing second digit in '"
+                        + str + "'");
           char sub_str[3];
           sub_str[0] = result[current+1];
           sub_str[1] = result[current+2];
@@ -167,27 +167,27 @@ namespace HadoopUtils {
           char* end_ptr = NULL;
           long int int_val = strtol(sub_str, &end_ptr, 16);
           HADOOP_ASSERT(*end_ptr == '\0' && int_val >= 0,
-                     "escape pattern \\<hex><hex> is broken in '" + str + "'");
+                        "escape pattern \\<hex><hex> is broken in '" + str + "'");
           new_ch = static_cast<char>(int_val);
         } else {
           num_chars = 1;
           switch(result[current+1]) {
-          case '\\':
-            new_ch = '\\';
-            break;
-          case 't':
-            new_ch = '\t';
-            break;
-          case 'n':
-            new_ch = '\n';
-            break;
-          case 's':
-            new_ch = ' ';
-            break;
-          default:
-            string msg("unknow n escape character '");
-            msg += result[current+1];
-            HADOOP_ASSERT(false, msg + "' found in '" + str + "'");
+            case '\\':
+              new_ch = '\\';
+              break;
+            case 't':
+              new_ch = '\t';
+              break;
+            case 'n':
+              new_ch = '\n';
+              break;
+            case 's':
+              new_ch = ' ';
+              break;
+            default:
+              string msg("unknow n escape character '");
+              msg += result[current+1];
+              HADOOP_ASSERT(false, msg + "' found in '" + str + "'");
           }
         }
         result.replace(current, 1 + num_chars, 1, new_ch);
@@ -198,5 +198,5 @@ namespace HadoopUtils {
     }
     return result;
   }
-
+  
 }

Added: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java?rev=1544761&view=auto
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java (added)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/KeyValueWritable.java Sat Nov 23 08:41:09 2013
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.commons.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * KeyValueWritable containing a key-value WritableComparable pair.
+ * 
+ * @param <K> the class of key
+ * @param <V> the class of value
+ */
+public class KeyValueWritable<K extends WritableComparable, V extends WritableComparable>
+    implements WritableComparable<KeyValueWritable<K, V>> {
+
+  protected K key = null;
+  protected V value = null;
+
+  public KeyValueWritable() {
+  }
+
+  public KeyValueWritable(K key, V value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public void setKey(K key) {
+    this.key = key;
+  }
+
+  public V getValue() {
+    return value;
+  }
+
+  public void setValue(V value) {
+    this.value = value;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    key.readFields(in);
+    value.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    key.write(out);
+    value.write(out);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((key == null) ? 0 : key.hashCode());
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+
+    KeyValueWritable<K, V> other = (KeyValueWritable<K, V>) obj;
+    if (key == null) {
+      if (other.key != null) {
+        return false;
+      }
+    } else if (!key.equals(other.key)) {
+      return false;
+    }
+
+    if (value == null) {
+      if (other.value != null) {
+        return false;
+      }
+    } else if (!value.equals(other.value)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int compareTo(KeyValueWritable<K, V> obj) {
+    int cmp = key.compareTo(obj.key);
+    if (cmp != 0) {
+      return cmp;
+    }
+    // if keys are equal compare value
+    return value.compareTo(obj.value);
+  }
+
+}

Added: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java?rev=1544761&view=auto
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java (added)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesKeyValueWritable.java Sat Nov 23 08:41:09 2013
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.commons.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * PipesKeyValueWritable containing a Text key-value WritableComparable pair.
+ * Format: delimiter | key | delimiter | value e.g., :Key:Value with delimiter
+ * ':' or |Key|Value with delimiter '|'
+ */
+public final class PipesKeyValueWritable extends KeyValueWritable<Text, Text> {
+
+  // private static final Log LOG =
+  // LogFactory.getLog(PipesKeyValueWritable.class);
+
+  /**
+   * Delimiter between key and value
+   */
+  private char keyValueDelimiter;
+
+  public PipesKeyValueWritable() {
+    super();
+  }
+
+  public PipesKeyValueWritable(Text key, Text value, char keyValueDelimiter) {
+    super(key, value);
+    this.keyValueDelimiter = keyValueDelimiter;
+  }
+
+  public char getKeyValueDelimiter() {
+    return keyValueDelimiter;
+  }
+
+  public void setKeyValueDelimiter(char keyValueDelimiter) {
+    this.keyValueDelimiter = keyValueDelimiter;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    String str = Text.readString(in);
+    // LOG.debug("readFields: '" + str + "'");
+
+    this.keyValueDelimiter = str.charAt(0);
+    str = str.substring(1);
+    String[] result = str.split(String.valueOf(this.keyValueDelimiter), 2);
+    super.setKey(new Text(result[0]));
+    super.setValue(new Text(result[1]));
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // LOG.debug("write: '" + this.toString() + "'");
+    Text.writeString(out, this.toString());
+  }
+
+  @Override
+  public String toString() {
+    return this.keyValueDelimiter + super.getKey().toString()
+        + this.keyValueDelimiter + super.getValue().toString();
+  }
+
+  @Override
+  public int compareTo(KeyValueWritable<Text, Text> obj) {
+    // if key is numeric compare numbers
+    if ((isNumeric(key.toString())) && (isNumeric(obj.key.toString()))) {
+      double val1 = Double.parseDouble(key.toString());
+      double val2 = Double.parseDouble(obj.key.toString());
+      int cmp = Double.compare(val1, val2);
+      if (cmp != 0) {
+        return cmp;
+      }
+    } else { // else compare key string
+      int cmp = key.compareTo(obj.key);
+      if (cmp != 0) {
+        return cmp;
+      }
+    }
+    // if keys are equal compare value
+    return value.compareTo(obj.value);
+  }
+
+  public final static boolean isNumeric(String s) {
+    return s.matches("[-+]?\\d*\\.?\\d+");
+  }
+}

Added: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java?rev=1544761&view=auto
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java (added)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/PipesVectorWritable.java Sat Nov 23 08:41:09 2013
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.commons.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleVector;
+
+/**
+ * VectorWritable for Hama Pipes dense vectors.
+ */
+public final class PipesVectorWritable extends VectorWritable {
+
+  // private static final Log LOG =
+  // LogFactory.getLog(PipesVectorWritable.class);
+
+  public PipesVectorWritable() {
+    super();
+  }
+
+  public PipesVectorWritable(VectorWritable v) {
+    super(v);
+  }
+
+  public PipesVectorWritable(DoubleVector v) {
+    super(v);
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    writeVector(super.getVector(), out);
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    super.set(readVector(in));
+  }
+
+  public static void writeVector(DoubleVector vector, DataOutput out)
+      throws IOException {
+    String str = "";
+    for (int i = 0; i < vector.getLength(); i++) {
+      str += (i < vector.getLength() - 1) ? vector.get(i) + ", " : vector
+          .get(i);
+    }
+
+    // LOG.debug("writeVector: '" + str + "'");
+    Text.writeString(out, str);
+  }
+
+  public static DoubleVector readVector(DataInput in) throws IOException {
+    String str = Text.readString(in);
+    // LOG.debug("readVector: '" + str + "'");
+
+    String[] values = str.split(",");
+    int len = values.length;
+    DoubleVector vector = new DenseDoubleVector(len);
+    for (int i = 0; i < len; i++) {
+      vector.set(i, Double.parseDouble(values[i]));
+    }
+    return vector;
+  }
+}

Modified: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java (original)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/VectorWritable.java Sat Nov 23 08:41:09 2013
@@ -28,7 +28,7 @@ import org.apache.hama.commons.math.Doub
 /**
  * Writable for dense vectors.
  */
-public final class VectorWritable implements WritableComparable<VectorWritable> {
+public class VectorWritable implements WritableComparable<VectorWritable> {
 
   private DoubleVector vector;
 
@@ -45,12 +45,12 @@ public final class VectorWritable implem
   }
 
   @Override
-  public final void write(DataOutput out) throws IOException {
+  public void write(DataOutput out) throws IOException {
     writeVector(this.vector, out);
   }
 
   @Override
-  public final void readFields(DataInput in) throws IOException {
+  public void readFields(DataInput in) throws IOException {
     this.vector = readVector(in);
   }
 

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Sat Nov 23 08:41:09 2013
@@ -209,6 +209,7 @@
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.8</version>
         <executions>
           <execution>
             <id>add-source</id>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Sat Nov 23 08:41:09 2013
@@ -22,10 +22,7 @@ import java.net.URL;
 import java.net.URLDecoder;
 import java.util.Enumeration;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -34,10 +31,6 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
-import org.apache.hama.pipes.PipesApplicable;
-import org.apache.hama.pipes.PipesApplication;
-import org.apache.hama.pipes.PipesPartitioner;
-import org.apache.hama.pipes.util.DistributedCacheUtil;
 
 /**
  * A BSP job configuration.
@@ -54,9 +47,6 @@ public class BSPJob extends BSPJobContex
   private BSPJobClient jobClient;
   private RunningJob info;
 
-  private PipesApplication<?, ?, ?, ?, ?> pipesApp = null;
-  private static final Log LOG = LogFactory.getLog(BSPJob.class);
-
   public BSPJob() throws IOException {
     this(new HamaConfiguration());
   }
@@ -260,23 +250,6 @@ public class BSPJob extends BSPJobContex
     return conf.getBoolean(name, defaultValue);
   }
 
-  public final <K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable> PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> getPipesApplication() {
-    if (pipesApp == null)
-      pipesApp = new PipesApplication<K1, V1, K2, V2, BytesWritable>();
-
-    return pipesApp;
-  }
-
-  public void cleanup() {
-    try {
-      // Close client pipesApplication
-      if (this.getPipesApplication() != null)
-        this.getPipesApplication().cleanup();
-    } catch (IOException e) {
-      LOG.error(e);
-    }
-  }
-
   public void setNumBspTask(int tasks) {
     conf.setInt("bsp.peers.num", tasks);
   }
@@ -416,36 +389,6 @@ public class BSPJob extends BSPJobContex
   }
 
   @SuppressWarnings("rawtypes")
-  public Partitioner getPartitioner() {
-
-    Class<? extends Partitioner> partitionerClass = conf.getClass(
-        Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
-        Partitioner.class);
-
-    LOG.info("DEBUG: " + Constants.RUNTIME_PARTITIONING_CLASS + ": "
-        + partitionerClass.toString());
-
-    Partitioner partitioner = ReflectionUtils.newInstance(partitionerClass,
-        conf);
-
-    /* PipesPartitioner usage */
-    if (PipesPartitioner.class.equals(partitionerClass)) {
-      ((PipesApplicable) partitioner)
-          .setApplication(this.getPipesApplication());
-
-      try {
-        DistributedCacheUtil.moveLocalFiles(conf);
-        this.getPipesApplication().start(conf);
-      } catch (IOException e) {
-        LOG.error(e);
-      } catch (InterruptedException e) {
-        LOG.error(e);
-      }
-    }
-    return partitioner;
-  }
-
-  @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {
     return ReflectionUtils.newInstance(conf.getClass(
         Constants.OUTPUT_FORMAT_CLASS, TextOutputFormat.class,

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Sat Nov 23 08:41:09 2013
@@ -52,7 +52,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hama.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Tool;
@@ -61,6 +60,7 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.hama.ipc.RPC;
 
 /**
  * BSPJobClient is the primary interface for the user-job to interact with the
@@ -429,7 +429,8 @@ public class BSPJobClient extends Config
               job.get(Constants.RUNTIME_PARTITIONING_CLASS));
         }
         BSPJob partitioningJob = new BSPJob(conf);
-        LOG.debug("partitioningJob input: " + partitioningJob.get(Constants.JOB_INPUT_DIR));
+        LOG.debug("partitioningJob input: "
+            + partitioningJob.get(Constants.JOB_INPUT_DIR));
         partitioningJob.setInputFormat(job.getInputFormat().getClass());
         partitioningJob.setInputKeyClass(job.getInputKeyClass());
         partitioningJob.setInputValueClass(job.getInputValueClass());
@@ -768,9 +769,6 @@ public class BSPJobClient extends Config
     // TODO if error found, kill job
     // running.killJob();
     jc.close();
-
-    // Added cleanup for Client PipesApp and DistributedCache
-    job.cleanup();
   }
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Sat Nov 23 08:41:09 2013
@@ -33,8 +33,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.pipes.PipesApplicable;
-import org.apache.hama.pipes.PipesBSP;
 
 /**
  * Base class for tasks.
@@ -166,10 +164,6 @@ public final class BSPTask extends Task 
 
     LOG.debug("bsp.work.class: " + workClass.toString());
 
-    /* Setup PipesApplication if workClass is matching */
-    if (PipesBSP.class.equals(workClass))
-      ((PipesApplicable) bsp).setApplication(job.getPipesApplication());
-
     // The policy is to throw the first exception and log the remaining.
     Exception firstException = null;
     try {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Sat Nov 23 08:41:09 2013
@@ -18,6 +18,7 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -37,6 +38,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.commons.util.KeyValuePair;
+import org.apache.hama.pipes.PipesPartitioner;
 
 public class PartitioningRunner extends
     BSP<Writable, Writable, Writable, Writable, NullWritable> {
@@ -47,7 +49,8 @@ public class PartitioningRunner extends
   private FileSystem fs = null;
   private Path partitionDir;
   private RecordConverter converter;
-  private Map<Integer,LinkedList<KeyValuePair<Writable,Writable>>> values = new HashMap<Integer, LinkedList<KeyValuePair<Writable,Writable>>>();
+  private Map<Integer, LinkedList<KeyValuePair<Writable, Writable>>> values = new HashMap<Integer, LinkedList<KeyValuePair<Writable, Writable>>>();
+  private PipesPartitioner<?, ?> pipesPartitioner = null;
 
   @Override
   public final void setup(
@@ -103,7 +106,7 @@ public class PartitioningRunner extends
      *         needed.
      */
     public Map<Writable, Writable> newMap();
-    
+
     /**
      * @return a list implementation, so order will not be changed in subclasses
      */
@@ -143,7 +146,7 @@ public class PartitioningRunner extends
 
     @Override
     public List<KeyValuePair<Writable, Writable>> newList() {
-      return new LinkedList<KeyValuePair<Writable,Writable>>();
+      return new LinkedList<KeyValuePair<Writable, Writable>>();
     }
   }
 
@@ -172,17 +175,20 @@ public class PartitioningRunner extends
 
       int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
           desiredNum);
-      
+
       LinkedList<KeyValuePair<Writable, Writable>> list = values.get(index);
       if (list == null) {
-        list = (LinkedList<KeyValuePair<Writable, Writable>>) converter.newList();
+        list = (LinkedList<KeyValuePair<Writable, Writable>>) converter
+            .newList();
         values.put(index, list);
       }
-      list.add(new KeyValuePair<Writable, Writable>(pair.getKey(), pair.getValue()));
+      list.add(new KeyValuePair<Writable, Writable>(pair.getKey(), pair
+          .getValue()));
     }
 
     // The reason of use of Memory is to reduce file opens
-    for (Map.Entry<Integer, LinkedList<KeyValuePair<Writable, Writable>>> e : values.entrySet()) {
+    for (Map.Entry<Integer, LinkedList<KeyValuePair<Writable, Writable>>> e : values
+        .entrySet()) {
       Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
           + peer.getPeerIndex());
       SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
@@ -240,15 +246,43 @@ public class PartitioningRunner extends
     }
   }
 
+  @Override
+  public void cleanup(
+      BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+      throws IOException {
+    if (this.pipesPartitioner != null) {
+      this.pipesPartitioner.cleanup();
+    }
+  }
+
   public static int getMergeProcessorID(int partitionID, int peerNum) {
     return partitionID % peerNum;
   }
 
   @SuppressWarnings("rawtypes")
   public Partitioner getPartitioner() {
-    return ReflectionUtils.newInstance(conf.getClass(
+    Class<? extends Partitioner> partitionerClass = conf.getClass(
         Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
-        Partitioner.class), conf);
+        Partitioner.class);
+
+    LOG.debug(Constants.RUNTIME_PARTITIONING_CLASS + ": "
+        + partitionerClass.toString());
+
+    // Check for Hama Pipes Partitioner
+    Partitioner partitioner = null;
+    if (PipesPartitioner.class.equals(partitionerClass)) {
+      try {
+        Constructor<? extends Partitioner> ctor = partitionerClass
+            .getConstructor(Configuration.class);
+        partitioner = ctor.newInstance(conf);
+        this.pipesPartitioner = (PipesPartitioner) partitioner;
+      } catch (Exception e) {
+        LOG.error(e);
+      }
+    } else {
+      partitioner = ReflectionUtils.newInstance(partitionerClass, conf);
+    }
+    return partitioner;
   }
 
   private static String getPartitionName(int i) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Sat Nov 23 08:41:09 2013
@@ -37,7 +37,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hama.bsp.BSPPeer;
@@ -54,10 +53,9 @@ import org.apache.hama.pipes.protocol.St
  * Adapted from Hadoop Pipes.
  * 
  */
-public class PipesApplication<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable> {
+public class PipesApplication<K1, V1, K2, V2, M extends Writable> {
 
-  private static final Log LOG = LogFactory.getLog(PipesApplication.class
-      .getName());
+  private static final Log LOG = LogFactory.getLog(PipesApplication.class);
   private ServerSocket serverSocket;
   private Process process;
   private Socket clientSocket;
@@ -72,7 +70,7 @@ public class PipesApplication<K1 extends
   }
 
   /* Build Environment based on the Configuration */
-  private Map<String, String> setupEnvironment(Configuration conf)
+  public Map<String, String> setupEnvironment(Configuration conf)
       throws IOException {
 
     Map<String, String> env = new HashMap<String, String>();
@@ -220,7 +218,7 @@ public class PipesApplication<K1 extends
         clientSocket = serverSocket.accept();
         LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
 
-        downlink = new BinaryProtocol<K1, V1, K2, V2>(conf,
+        downlink = new BinaryProtocol<K1, V1, K2, V2, M>(conf,
             clientSocket.getOutputStream(), clientSocket.getInputStream());
 
         downlink.start();
@@ -249,8 +247,8 @@ public class PipesApplication<K1 extends
    * @throws IOException
    */
   @SuppressWarnings({ "rawtypes", "unchecked" })
-  public void start(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
-      throws IOException, InterruptedException {
+  public void start(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+      InterruptedException {
 
     Map<String, String> env = setupEnvironment(peer.getConfiguration());
     List<String> cmd = setupCommand(peer.getConfiguration());
@@ -291,7 +289,7 @@ public class PipesApplication<K1 extends
         clientSocket = serverSocket.accept();
         LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
 
-        downlink = new BinaryProtocol<K1, V1, K2, V2>(peer,
+        downlink = new BinaryProtocol<K1, V1, K2, V2, M>(peer,
             clientSocket.getOutputStream(), clientSocket.getInputStream());
       }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Sat Nov 23 08:41:09 2013
@@ -32,10 +32,10 @@ import org.apache.hama.bsp.sync.SyncExce
  * runtimes.
  */
 public class PipesBSP<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable>
-    extends BSP<K1, V1, K2, V2, BytesWritable> implements PipesApplicable {
+    extends BSP<K1, V1, K2, V2, BytesWritable> {
 
   private static final Log LOG = LogFactory.getLog(PipesBSP.class);
-  private PipesApplication<K1, V1, K2, V2, BytesWritable> application;
+  private PipesApplication<K1, V1, K2, V2, BytesWritable> application = new PipesApplication<K1, V1, K2, V2, BytesWritable>();
 
   @Override
   public void setup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
@@ -97,12 +97,4 @@ public class PipesBSP<K1 extends Writabl
     }
   }
 
-  @SuppressWarnings("unchecked")
-  @Override
-  public void setApplication(
-      PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> pipesApp) {
-
-    this.application = (PipesApplication<K1, V1, K2, V2, BytesWritable>) pipesApp;
-  }
-
 }