You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by mi...@apache.org on 2013/12/13 11:19:47 UTC

svn commit: r1550674 - in /hama/trunk: ./ c++/ c++/src/main/native/examples/ c++/src/main/native/pipes/api/hama/ c++/src/main/native/pipes/impl/ core/src/main/java/org/apache/hama/pipes/ core/src/main/java/org/apache/hama/pipes/protocol/ core/src/main/...

Author: millecker
Date: Fri Dec 13 10:19:46 2013
New Revision: 1550674

URL: http://svn.apache.org/r1550674
Log:
HAMA-829: Improve code and fix Javadoc warnings in org.apache.hama.pipes

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/c++/pom.xml
    hama/trunk/c++/src/main/native/examples/README.txt
    hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
    hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Dec 13 10:19:46 2013
@@ -17,6 +17,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-829: Improve code and fix Javadoc warnings in org.apache.hama.pipes (Martin Illecker)
    HAMA-808: Hama Pipes Testcase (Martin Illecker)
    HAMA-828: Improve code, fix typo and modify unclear comment in org.apache.hama.ml.ann package (Yexi Jiang)
    HAMA-699: Add commons module (Martin Illecker)

Modified: hama/trunk/c++/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pom.xml?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/c++/pom.xml (original)
+++ hama/trunk/c++/pom.xml Fri Dec 13 10:19:46 2013
@@ -79,7 +79,7 @@
                         </else>
                       </if>
                     </target>
-                  <exportAntProperties>true</exportAntProperties>
+                    <exportAntProperties>true</exportAntProperties>
                   </configuration>
                 </execution>
                 <!-- TODO wire here native testcases
@@ -103,7 +103,7 @@
                 </dependency>
               </dependencies>
             </plugin>
-          <plugin>
+            <plugin>
               <groupId>org.codehaus.groovy.maven</groupId>
               <artifactId>gmaven-plugin</artifactId>
               <version>1.0</version>

Modified: hama/trunk/c++/src/main/native/examples/README.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/README.txt?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/README.txt (original)
+++ hama/trunk/c++/src/main/native/examples/README.txt Fri Dec 13 10:19:46 2013
@@ -37,7 +37,7 @@ Run summation example
 View input and output data
 
 % hadoop fs -cat /examples/input/summation/input.txt
-% hama seqdumper -seqFile /examples/output/summation/part-00000
+% hama seqdumper -file /examples/output/summation/part-00000
 
 You should see
 # Input Path: /examples/output/summation/part-00000
@@ -68,7 +68,7 @@ Run piestimator example because no input
 
 View output data
 
-% hama seqdumper -seqFile /examples/output/piestimator/part-00001
+% hama seqdumper -file /examples/output/piestimator/part-00001
 
 You should see
 # Input Path: /examples/output/piestimator/part-00001
@@ -111,14 +111,14 @@ Run matrixmultiplication example
 View input and output data
 
 % hama seqdumper \
- -seqFile /examples/input/matrixmultiplication/MatrixA.seq
+ -file /examples/input/matrixmultiplication/MatrixA.seq
 
 % hama seqdumper \
- -seqFile  \
+ -file  \
  /examples/input/matrixmultiplication/MatrixB_transposed.seq
 
-% hama seqdumper -seqFile \
-  /examples/output/matrixmultiplication/part-00001
+% hama seqdumper \
+ -file /examples/output/matrixmultiplication/part-00001
 
 Delete output folder
 

Modified: hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/api/hama/Pipes.hh?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh (original)
+++ hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh Fri Dec 13 10:19:46 2013
@@ -238,6 +238,7 @@ namespace HamaPipes {
      * Register a counter with the given group and name.
      */
     //virtual Counter* getCounter(const string& group, const string& name) = 0;
+    virtual long getCounter(const string& group, const string& name) = 0;
     
     /**
      * Increment the value of the counter with the given amount.
@@ -329,6 +330,7 @@ namespace HamaPipes {
     }
     
     virtual void nextEvent() = 0;
+    virtual bool verifyResult(int32_t expected_response_cmd) = 0;
     virtual UpwardProtocol<BinaryUpwardProtocol>* getUplink() = 0;
     virtual ~Protocol(){}
   };

Modified: hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/Pipes.cc?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc (original)
+++ hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc Fri Dec 13 10:19:46 2013
@@ -28,9 +28,9 @@ public:
     HADOOP_ASSERT(out_stream_->open(out_stream), "problem opening stream");
   }
   
-  /* local function */
+  /* local sendCommand function */
   void sendCommand(int32_t cmd, bool flush) {
-    serializeInt(cmd, *out_stream_);
+    serialize<int32_t>(cmd, *out_stream_);
     if (flush) {
       out_stream_->flush();
     }
@@ -111,25 +111,25 @@ public:
   /*
    virtual void registerCounter(int id, const string& group,
    const string& name) {
-   serializeInt(REGISTER_COUNTER, *stream);
-   serializeInt(id, *stream);
-   serializeString(group, *stream);
-   serializeString(name, *stream);
+   serialize<int32_t>(REGISTER_COUNTER, *stream);
+   serialize<int32_t>(id, *stream);
+   serialize<string>(group, *stream);
+   serialize<string>(name, *stream);
    }
    
    virtual void incrementCounter(const TaskContext::Counter* counter,
    uint64_t amount) {
-   serializeInt(INCREMENT_COUNTER, *stream);
-   serializeInt(counter->getId(), *stream);
-   serializeLong(amount, *stream);
+   serialize<int32_t>(INCREMENT_COUNTER, *stream);
+   serialize<int32_t>(counter->getId(), *stream);
+   serialize<int64_t>(amount, *stream);
    }
    */
   
   virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
-    serializeInt(INCREMENT_COUNTER, *out_stream_);
-    serializeString(group, *out_stream_);
-    serializeString(name, *out_stream_);
-    serializeLong(amount, *out_stream_);
+    serialize<int32_t>(INCREMENT_COUNTER, *out_stream_);
+    serialize<string>(group, *out_stream_);
+    serialize<string>(name, *out_stream_);
+    serialize<int64_t>(amount, *out_stream_);
     out_stream_->flush();
     if(logging) {
       fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n");
@@ -173,7 +173,7 @@ public:
     cmd = deserializeInt(*in_stream_);
     
     switch (cmd) {
-      
+        
       case START_MESSAGE: {
         int32_t protocol_version;
         protocol_version = deserialize<int32_t>(*in_stream_);
@@ -293,6 +293,17 @@ public:
   }
   
   /**
+   * Check for valid response command
+   */
+  bool verifyResult(int32_t expected_response_cmd) {
+    int32_t response = deserialize<int32_t>(*in_stream_);
+    if (response != expected_response_cmd) {
+      return false;
+    }
+    return true;
+  }
+  
+  /**
    * Wait for next event, which should be a response for
    * a previously sent command (expected_response_cmd)
    * and return the generic result
@@ -303,14 +314,13 @@ public:
     T result = T();
     
     // read response command
-    int32_t cmd;
-    cmd = deserializeInt(*in_stream_);
+    int32_t cmd = deserialize<int32_t>(*in_stream_);
     
     // check if response is expected
     if (expected_response_cmd == cmd) {
       
       switch (cmd) {
-        
+          
         case GET_MSG_COUNT: {
           T msg_count;
           msg_count = deserialize<T>(*in_stream_);
@@ -362,7 +372,7 @@ public:
           }
           return superstep_count;
         }
-        
+          
         case SEQFILE_OPEN: {
           T file_id = deserialize<T>(*in_stream_);
           if(logging) {
@@ -421,8 +431,7 @@ public:
     vector<T> results;
     
     // read response command
-    int32_t cmd;
-    cmd = deserializeInt(*in_stream_);
+    int32_t cmd = deserialize<int32_t>(*in_stream_);
     
     // check if response is expected
     if (expected_response_cmd == cmd) {
@@ -467,14 +476,13 @@ public:
     KeyValuePair<K,V> key_value_pair;
     
     // read response command
-    int32_t cmd;
-    cmd = deserializeInt(*in_stream_);
+    int32_t cmd = deserialize<int32_t>(*in_stream_);
     
     // check if response is expected or END_OF_DATA
     if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) {
       
       switch (cmd) {
-        
+          
         case READ_KEYVALUE: {
           K key = deserialize<K>(*in_stream_);
           V value = deserialize<V>(*in_stream_);
@@ -727,20 +735,27 @@ public:
   /**
    * Register a counter with the given group and name.
    */
-  /*
-   virtual Counter* getCounter(const std::string& group,
-   const std::string& name) {
-   int id = registeredCounterIds.size();
-   registeredCounterIds.push_back(id);
-   uplink->registerCounter(id, group, name);
-   return new Counter(id);
-   }*/
+  virtual long getCounter(const string& group, const string& name) {
+   // TODO
+   // int id = registeredCounterIds.size();
+   // registeredCounterIds.push_back(id);
+   // uplink->registerCounter(id, group, name);
+   // return new Counter(id);
+   return 0;
+  }
   
   /**
-   * Increment the value of the counter with the given amount.
+   * Increments the counter identified by the group and counter name by the
+   * specified amount.
    */
   virtual void incrementCounter(const string& group, const string& name, uint64_t amount)  {
     uplink_->incrementCounter(group, name, amount);
+    
+    // Verify response command
+    bool response = protocol_->verifyResult(INCREMENT_COUNTER);
+    if (response == false) {
+      throw Error("incrementCounter received wrong response!");
+    }
   }
   
   /********************************************/
@@ -775,6 +790,12 @@ public:
    */
   virtual void sendMessage(const string& peer_name, const M& msg) {
     uplink_->sendCommand<string,M>(SEND_MSG, peer_name, msg);
+    
+    // Verify response command
+    bool response = protocol_->verifyResult(SEND_MSG);
+    if (response == false) {
+      throw Error("sendMessage received wrong response!");
+    }
   }
   
   /**
@@ -815,6 +836,12 @@ public:
    */
   virtual void sync() {
     uplink_->sendCommand(SYNC);
+    
+    // Verify response command
+    bool response = protocol_->verifyResult(SYNC);
+    if (response == false) {
+      throw Error("sync received wrong response!");
+    }
   }
   
   /**
@@ -911,6 +938,12 @@ public:
    */
   virtual void clear() {
     uplink_->sendCommand(CLEAR);
+    
+    // Verify response command
+    bool response = protocol_->verifyResult(CLEAR);
+    if (response == false) {
+      throw Error("clear received wrong response!");
+    }
   }
   
   /**
@@ -918,10 +951,16 @@ public:
    */
   virtual void write(const K2& key, const V2& value) {
     if (writer_ != NULL) {
-      writer_->emit(key, value);
+      writer_->emit(key, value); // TODO writer not implemented
     } else {
       uplink_->sendCommand<K2,V2>(WRITE_KEYVALUE, key, value);
     }
+
+    // Verify response command
+    bool response = protocol_->verifyResult(WRITE_KEYVALUE);
+    if (response == false) {
+      throw Error("write received wrong response!");
+    }
   }
   
   /**
@@ -952,6 +991,12 @@ public:
    */
   virtual void reopenInput() {
     uplink_->sendCommand(REOPEN_INPUT);
+    
+    // Verify response command
+    bool response = protocol_->verifyResult(REOPEN_INPUT);
+    if (response == false) {
+      throw Error("reopenInput received wrong response!");
+    }
   }
   
   

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hama.pipes;
 
 import java.io.BufferedReader;
@@ -361,12 +360,12 @@ public class PipesApplication<K1, V1, K2
    * 
    * @throws IOException
    */
-  public void cleanup() throws IOException {
+  public void cleanup(boolean sendClose) throws IOException {
     if (serverSocket != null) {
       serverSocket.close();
     }
     try {
-      if (downlink != null) {
+      if ((downlink != null) && (sendClose)) {
         downlink.close();
       }
     } catch (InterruptedException ie) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Fri Dec 13 10:19:46 2013
@@ -93,7 +93,7 @@ public class PipesBSP<K1 extends Writabl
     } catch (InterruptedException e) {
       e.printStackTrace();
     } finally {
-      this.application.cleanup();
+      this.application.cleanup(true);
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesNonJavaInputFormat.java Fri Dec 13 10:19:46 2013
@@ -31,12 +31,11 @@ import org.apache.hama.bsp.TextInputForm
 
 /**
  * Dummy input format used when non-Java a {@link RecordReader} is used by the
- * Pipes' application.
+ * Pipes application.
  * 
- * The only useful thing this does is set up the Map-Reduce job to get the
- * {@link PipesDummyRecordReader}, everything else left for the 'actual'
- * InputFormat specified by the user which is given by
- * <i>mapred.pipes.user.inputformat</i>.
+ * The only useful thing this does is set up the BSP job to get the
+ * PipesDummyRecordReader, everything else left for the 'actual' InputFormat
+ * specified by the user which is given by <i>hama.pipes.user.inputformat</i>.
  * 
  * Adapted from Hadoop Pipes.
  * 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java Fri Dec 13 10:19:46 2013
@@ -49,7 +49,7 @@ public class PipesPartitioner<K, V> impl
 
   public void cleanup() {
     try {
-      application.cleanup();
+      application.cleanup(true);
     } catch (IOException e) {
       LOG.error(e);
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Fri Dec 13 10:19:46 2013
@@ -214,7 +214,7 @@ public class Submitter implements Tool {
    * Submit a job to the cluster. All of the necessary modifications to the job
    * to run under pipes are made to the configuration.
    * 
-   * @param conf the job to submit to the cluster (MODIFIED)
+   * @param job to submit to the cluster
    * @throws IOException
    */
   public static void runJob(BSPJob job) throws IOException {

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hama.pipes.protocol;
 
 import java.io.BufferedOutputStream;
@@ -50,8 +49,7 @@ import org.apache.hama.pipes.Submitter;
 public class BinaryProtocol<K1, V1, K2, V2, M extends Writable> implements
     DownwardProtocol<K1, V1, K2, V2> {
 
-  protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class
-      .getName());
+  protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class);
   public static final int CURRENT_PROTOCOL_VERSION = 0;
   /**
    * The buffer size for the command socket
@@ -74,7 +72,7 @@ public class BinaryProtocol<K1, V1, K2, 
    * Upward messages are passed on the specified handler and downward downward
    * messages are public methods on this object.
    * 
-   * @param jobConfig The job's configuration
+   * @param conf The job's configuration
    * @param out The output stream to communicate on.
    * @param in The input stream to communicate on.
    * @throws IOException
@@ -362,8 +360,8 @@ public class BinaryProtocol<K1, V1, K2, 
    * @throws IOException
    */
   protected void writeObject(Writable obj) throws IOException {
-    // For basic types IntWritable, LongWritable, FloatWritable, DoubleWritable,
-    // Text and BytesWritable, encode them directly, so that they end up
+    // For basic types IntWritable, LongWritable, Text and BytesWritable,
+    // encode them directly, so that they end up
     // in C++ as the natural translations.
     if (obj instanceof Text) {
       Text t = (Text) obj;
@@ -383,20 +381,8 @@ public class BinaryProtocol<K1, V1, K2, 
     } else if (obj instanceof LongWritable) {
       WritableUtils.writeVLong(this.outStream, ((LongWritable) obj).get());
 
-      // else if ((obj instanceof FloatWritable) || (obj instanceof
-      // DoubleWritable))
-
     } else {
-      // Note: other types are transfered as String which should be implemented
-      // in Writable itself
-
-      // DataOutputBuffer buffer = new DataOutputBuffer();
-      // buffer.reset();
-      // obj.write(buffer);
-      // int length = buffer.getLength();
-      // WritableUtils.writeVInt(stream, length);
-      // stream.write(buffer.getData(), 0, length);
-
+      // Note: FloatWritable and DoubleWritable are written here
       obj.write(this.outStream);
     }
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hama.pipes.protocol;
 
 import java.io.IOException;
@@ -42,6 +41,7 @@ public interface DownwardProtocol<K1, V1
   /**
    * Set the BSP Job Configuration
    * 
+   * @param conf The job's configuration
    * @throws IOException
    */
   void setBSPJobConf(Configuration conf) throws IOException;
@@ -58,6 +58,8 @@ public interface DownwardProtocol<K1, V1
   /**
    * runSetup
    * 
+   * @param pipedInput use pipedInput
+   * @param pipedOutput use pipedOutput
    * @throws IOException
    */
   void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException;
@@ -65,6 +67,8 @@ public interface DownwardProtocol<K1, V1
   /**
    * runBsp
    * 
+   * @param pipedInput use pipedInput
+   * @param pipedOutput use pipedOutput
    * @throws IOException
    */
   void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException;
@@ -72,6 +76,8 @@ public interface DownwardProtocol<K1, V1
   /**
    * runCleanup
    * 
+   * @param pipedInput use pipedInput
+   * @param pipedOutput use pipedOutput
    * @throws IOException
    */
   void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException;
@@ -79,6 +85,9 @@ public interface DownwardProtocol<K1, V1
   /**
    * getPartition
    * 
+   * @param key
+   * @param value
+   * @param numTasks number of available tasks
    * @throws IOException
    */
   int getPartition(K1 key, V1 value, int numTasks) throws IOException;

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java Fri Dec 13 10:19:46 2013
@@ -15,13 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hama.pipes.protocol;
 
 /**
  * The integer codes to represent the different messages. These must match the
  * C++ codes or massive confusion will result.
- * 
  */
 public enum MessageType {
   START(0), SET_BSPJOB_CONF(1), SET_INPUT_TYPES(2), RUN_SETUP(3), RUN_BSP(4),

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Fri Dec 13 10:19:46 2013
@@ -45,8 +45,6 @@ import org.apache.hama.commons.util.KeyV
  * 
  * @param <K1> input key.
  * @param <V1> input value.
- * @param <K2> output key.
- * @param <V2> output value.
  */
 public class StreamingProtocol<K1 extends Writable, V1 extends Writable>
     extends BinaryProtocol<K1, V1, Text, Text, BytesWritable> {

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hama.pipes.protocol;
 
 import java.io.BufferedInputStream;
@@ -100,8 +99,9 @@ public class UplinkReader<KEYIN, VALUEIN
         }
 
         int cmd = readCommand();
-        if (cmd == -1)
+        if (cmd == -1) {
           continue;
+        }
         LOG.debug("Handling uplink command: " + MessageType.values()[cmd]);
 
         if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING
@@ -114,8 +114,8 @@ public class UplinkReader<KEYIN, VALUEIN
         } else if (cmd == MessageType.REGISTER_COUNTER.code
             && isPeerAvailable()) { // INCOMING
           /*
-           * Is not used in HAMA -> Hadoop Pipes - maybe for performance, skip
-           * transferring group and name each INCREMENT
+           * Is not used in Hama. Hadoop Pipes uses it - maybe for performance
+           * issue, skip transferring group and name each INCREMENT
            */
         } else if (cmd == MessageType.TASK_DONE.code) { // INCOMING
           synchronized (binProtocol.hasTaskLock) {
@@ -149,8 +149,8 @@ public class UplinkReader<KEYIN, VALUEIN
         } else if (cmd == MessageType.REOPEN_INPUT.code && isPeerAvailable()) { // INCOMING
           reopenInput();
         } else if (cmd == MessageType.CLEAR.code && isPeerAvailable()) { // INCOMING
-          LOG.debug("Got MessageType.CLEAR");
-          peer.clear();
+          clear();
+
           /* SequenceFileConnector Implementation */
         } else if (cmd == MessageType.SEQFILE_OPEN.code) { // OUTGOING
           seqFileOpen();
@@ -161,6 +161,7 @@ public class UplinkReader<KEYIN, VALUEIN
         } else if (cmd == MessageType.SEQFILE_CLOSE.code) { // OUTGOING
           seqFileClose();
           /* SequenceFileConnector Implementation */
+
         } else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING
           partitionResponse();
         } else {
@@ -192,7 +193,22 @@ public class UplinkReader<KEYIN, VALUEIN
 
   public void reopenInput() throws IOException {
     LOG.debug("Got MessageType.REOPEN_INPUT");
+
     peer.reopenInput();
+
+    WritableUtils.writeVInt(this.outStream, MessageType.REOPEN_INPUT.code);
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.REOPEN_INPUT");
+  }
+
+  public void clear() throws IOException {
+    LOG.debug("Got MessageType.CLEAR");
+
+    peer.clear();
+
+    WritableUtils.writeVInt(this.outStream, MessageType.CLEAR.code);
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.CLEAR");
   }
 
   public void getSuperstepCount() throws IOException {
@@ -257,7 +273,12 @@ public class UplinkReader<KEYIN, VALUEIN
 
   public void sync() throws IOException, SyncException, InterruptedException {
     LOG.debug("Got MessageType.SYNC");
+
     peer.sync(); // this call blocks
+
+    WritableUtils.writeVInt(this.outStream, MessageType.SYNC.code);
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.SYNC");
   }
 
   public void getMessage() throws IOException {
@@ -282,10 +303,20 @@ public class UplinkReader<KEYIN, VALUEIN
   }
 
   public void incrementCounter() throws IOException {
+    LOG.debug("Got MessageType.INCREMENT_COUNTER");
+
     String group = Text.readString(this.inStream);
     String name = Text.readString(this.inStream);
     long amount = WritableUtils.readVLong(this.inStream);
+
+    LOG.debug("Got MessageType.INCREMENT_COUNTER group: " + group + " name: "
+        + name + " amount: " + amount);
+
     peer.incrementCounter(group, name, amount);
+
+    WritableUtils.writeVInt(this.outStream, MessageType.INCREMENT_COUNTER.code);
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.INCREMENT_COUNTER");
   }
 
   @SuppressWarnings("unchecked")
@@ -303,7 +334,11 @@ public class UplinkReader<KEYIN, VALUEIN
 
     peer.send(peerName, message);
 
-    LOG.debug("Done MessageType.SEND_MSG to peerName: "
+    WritableUtils.writeVInt(this.outStream, MessageType.SEND_MSG.code);
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.SEND_MSG");
+
+    LOG.debug("Sent message to peerName: "
         + peerName
         + " messageClass: "
         + message.getClass().getName()
@@ -362,13 +397,18 @@ public class UplinkReader<KEYIN, VALUEIN
             Object.class), conf);
 
     LOG.debug("Got MessageType.WRITE_KEYVALUE keyOutClass: "
-        + keyOut.getClass().getName() + " valueOutClass: " + valueOut.getClass().getName());
+        + keyOut.getClass().getName() + " valueOutClass: "
+        + valueOut.getClass().getName());
 
     readObject((Writable) keyOut);
     readObject((Writable) valueOut);
 
     peer.write(keyOut, valueOut);
 
+    WritableUtils.writeVInt(this.outStream, MessageType.WRITE_KEYVALUE.code);
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.WRITE_KEYVALUE");
+    
     LOG.debug("Done MessageType.WRITE_KEYVALUE -"
         + " Key: "
         + ((keyOut.toString().length() < 10) ? keyOut.toString() : keyOut
@@ -385,6 +425,7 @@ public class UplinkReader<KEYIN, VALUEIN
     // key and value class stored in the SequenceFile
     String keyClass = Text.readString(this.inStream);
     String valueClass = Text.readString(this.inStream);
+    LOG.debug("GOT MessageType.SEQFILE_OPEN - Path: " + path);
     LOG.debug("GOT MessageType.SEQFILE_OPEN - Option: " + option);
     LOG.debug("GOT MessageType.SEQFILE_OPEN - KeyClass: " + keyClass);
     LOG.debug("GOT MessageType.SEQFILE_OPEN - ValueClass: " + valueClass);
@@ -418,8 +459,10 @@ public class UplinkReader<KEYIN, VALUEIN
                         sequenceKeyWritable, sequenceValueWritable)));
 
       } catch (IOException e) {
+        LOG.error("SEQFILE_OPEN - " + e.getMessage());
         fileID = -1;
       } catch (ClassNotFoundException e) {
+        LOG.error("SEQFILE_OPEN - " + e.getMessage());
         fileID = -1;
       }
 
@@ -451,10 +494,14 @@ public class UplinkReader<KEYIN, VALUEIN
                         sequenceKeyWritable, sequenceValueWritable)));
 
       } catch (IOException e) {
+        LOG.error("SEQFILE_OPEN - " + e.getMessage());
         fileID = -1;
       } catch (ClassNotFoundException e) {
+        LOG.error("SEQFILE_OPEN - " + e.getMessage());
         fileID = -1;
       }
+    } else { // wrong option
+      LOG.error("SEQFILE_OPEN - Wrong option: '" + option + "'");
     }
 
     WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_OPEN.code);
@@ -518,9 +565,9 @@ public class UplinkReader<KEYIN, VALUEIN
     // check if fileID is available in sequenceFileWriter
     if (sequenceFileWriters.containsKey(fileID)) {
 
-      Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue()
+      Writable sequenceKeyWritable = sequenceFileWriters.get(fileID).getValue()
           .getKey();
-      Writable sequenceValueWritable = sequenceFileReaders.get(fileID)
+      Writable sequenceValueWritable = sequenceFileWriters.get(fileID)
           .getValue().getValue();
 
       // try to read key and value
@@ -555,6 +602,7 @@ public class UplinkReader<KEYIN, VALUEIN
 
   public void seqFileClose() throws IOException {
     int fileID = WritableUtils.readVInt(this.inStream);
+    LOG.debug("GOT MessageType.SEQFILE_CLOSE - FileID: " + fileID);
 
     boolean result = false;
 
@@ -594,11 +642,9 @@ public class UplinkReader<KEYIN, VALUEIN
    */
   protected void readObject(Writable obj) throws IOException {
     byte[] buffer;
-
     // For BytesWritable and Text, use the specified length to set the length
     // this causes the "obvious" translations to work. So that if you emit
     // a string "abc" from C++, it shows up as "abc".
-
     if (obj instanceof Text) {
       int numBytes = WritableUtils.readVInt(this.inStream);
       buffer = new byte[numBytes];
@@ -612,29 +658,22 @@ public class UplinkReader<KEYIN, VALUEIN
       ((BytesWritable) obj).set(buffer, 0, numBytes);
 
     } else if (obj instanceof IntWritable) {
-      LOG.debug("read IntWritable");
       ((IntWritable) obj).set(WritableUtils.readVInt(this.inStream));
 
     } else if (obj instanceof LongWritable) {
       ((LongWritable) obj).set(WritableUtils.readVLong(this.inStream));
 
-      // else if ((obj instanceof FloatWritable) || (obj instanceof
-      // DoubleWritable))
-
     } else if (obj instanceof NullWritable) {
       throw new IOException("Cannot read data into NullWritable!");
 
     } else {
-      // Note: other types are transfered as String which should be implemented
-      // in Writable itself
       try {
-        LOG.debug("reading other type");
+        LOG.debug("reading type: " + obj.getClass().getName());
+
         // try reading object
         obj.readFields(this.inStream);
-        // String s = Text.readString(inStream);
 
       } catch (IOException e) {
-
         throw new IOException("Hama Pipes is not able to read "
             + obj.getClass().getName(), e);
       }

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java Fri Dec 13 10:19:46 2013
@@ -43,6 +43,7 @@ public class DistributedCacheUtil {
    * symbolic links for URIs specified with a fragment if
    * DistributedCache.getSymlinks() is true.
    * 
+   * @param conf The job's configuration
    * @throws IOException If a DistributedCache file cannot be found.
    */
   public static final void moveLocalFiles(Configuration conf)
@@ -81,8 +82,8 @@ public class DistributedCacheUtil {
     }
     if (files.length() > 0) {
       // I've replaced the use of the missing setLocalFiles and
-      // addLocalFiles methods (hadoop 0.23.x) with our own DistCacheUtils methods
-      // which set the cache configurations directly.
+      // addLocalFiles methods (hadoop 0.23.x) with our own DistCacheUtils
+      // methods which set the cache configurations directly.
       DistCacheUtils.addLocalFiles(conf, files.toString());
     }
   }
@@ -90,8 +91,8 @@ public class DistributedCacheUtil {
   /**
    * Add the Files to HDFS
    * 
-   * @param conf
-   * @param paths
+   * @param conf The job's configuration
+   * @param files Paths that should be transfered to HDFS
    */
   public static String addFilesToHDFS(Configuration conf, String files) {
     if (files == null)
@@ -139,8 +140,7 @@ public class DistributedCacheUtil {
   /**
    * Add the JARs from the given HDFS paths to the Classpath
    * 
-   * @param conf
-   * @param urls
+   * @param conf The job's configuration
    */
   public static URL[] addJarsToJobClasspath(Configuration conf) {
     URL[] classLoaderURLs = ((URLClassLoader) conf.getClassLoader()).getURLs();

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java?rev=1550674&r1=1550673&r2=1550674&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java Fri Dec 13 10:19:46 2013
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hama.pipes.util;
 
 import java.io.FileWriter;
@@ -36,7 +35,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
-//import org.apache.hama.util.GenericOptionsParser;
 
 public class SequenceFileDumper {
 
@@ -63,28 +61,23 @@ public class SequenceFileDumper {
       Option option = OptionBuilder.withArgName(name).hasArgs(1)
           .withDescription(description).isRequired(required).create();
       options.addOption(option);
-
     }
 
     Parser createParser() {
-      Parser result = new BasicParser();
-      return result;
+      return new BasicParser();
     }
 
     void printUsage() {
       // The CLI package should do this for us, but I can't figure out how
       // to make it print something reasonable.
       System.out.println("hama seqdumper");
+      System.out.println("  [-file <path>] // The sequence file to read");
       System.out
-          .println("  [-seqFile <path>] // The Sequence File containing the Clusters");
-      System.out
-          .println("  [-output <path>] // The output file.  If not specified, dumps to the console");
+          .println("  [-output <path>] // The output file. If not specified, dumps to the console");
       System.out
-          .println("  [-substring <number> // The number of chars of the FormatString() to print");
+          .println("  [-substring <number> // The number of chars of value to print");
       System.out.println("  [-count <true>] // Report the count only");
-      System.out.println("  [-help] // Print out help");
       System.out.println();
-      //GenericOptionsParser.printGenericCommandUsage(System.out);
     }
   }
 
@@ -95,36 +88,22 @@ public class SequenceFileDumper {
       return;
     }
 
-    LOG.info("DEBUG: Hama SequenceFileDumper started!");
-
-    cli.addOption("seqFile", false,
-        "The Sequence File containing the Clusters", "path");
+    // Add arguments
+    cli.addOption("file", false, "The Sequence File containing the Clusters",
+        "path");
     cli.addOption("output", false,
         "The output file.  If not specified, dumps to the console", "path");
-
     cli.addOption("substring", false,
         "The number of chars of the FormatString() to print", "number");
     cli.addOption("count", false, "Report the count only", "number");
-    cli.addOption("help", false, "Print out help", "class");
 
     Parser parser = cli.createParser();
-
     try {
       HamaConfiguration conf = new HamaConfiguration();
-
-      //GenericOptionsParser genericParser = new GenericOptionsParser(conf, args);
-
       CommandLine cmdLine = parser.parse(cli.options, args);
-      //    genericParser.getRemainingArgs());
-      LOG.debug("DEBUG: Arguments: " + args); //genericParser.getRemainingArgs());
-
-      if (cmdLine.hasOption("help")) {
-        cli.printUsage();
-        return;
-      }
 
-      if (cmdLine.hasOption("seqFile")) {
-        Path path = new Path(cmdLine.getOptionValue("seqFile"));
+      if (cmdLine.hasOption("file")) {
+        Path path = new Path(cmdLine.getOptionValue("file"));
 
         FileSystem fs = FileSystem.get(path.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
@@ -135,6 +114,7 @@ public class SequenceFileDumper {
         } else {
           writer = new OutputStreamWriter(System.out);
         }
+
         writer.append("Input Path: ").append(String.valueOf(path))
             .append(LINE_SEP);
 
@@ -143,10 +123,9 @@ public class SequenceFileDumper {
           sub = Integer.parseInt(cmdLine.getOptionValue("substring"));
         }
 
-        boolean countOnly = cmdLine.hasOption("count");
-
         Writable key = (Writable) reader.getKeyClass().newInstance();
         Writable value = (Writable) reader.getValueClass().newInstance();
+
         writer.append("Key class: ")
             .append(String.valueOf(reader.getKeyClass()))
             .append(" Value Class: ").append(String.valueOf(value.getClass()))
@@ -154,6 +133,7 @@ public class SequenceFileDumper {
         writer.flush();
 
         long count = 0;
+        boolean countOnly = cmdLine.hasOption("count");
         if (countOnly == false) {
           while (reader.next(key, value)) {
             writer.append("Key: ").append(String.valueOf(key));
@@ -166,7 +146,8 @@ public class SequenceFileDumper {
           }
           writer.append("Count: ").append(String.valueOf(count))
               .append(LINE_SEP);
-        } else {
+
+        } else { // count only
           while (reader.next(key, value)) {
             count++;
           }
@@ -179,10 +160,13 @@ public class SequenceFileDumper {
           writer.close();
         }
         reader.close();
+
+      } else {
+        cli.printUsage();
       }
 
     } catch (ParseException e) {
-      LOG.info("Error : " + e);
+      LOG.error(e.getMessage());
       cli.printUsage();
       return;
     }