You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/02/12 05:53:21 UTC

svn commit: r1659146 - in /hama/trunk: core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ graph/src/main/java/org/apache/hama/graph/

Author: edwardyoon
Date: Thu Feb 12 04:53:20 2015
New Revision: 1659146

URL: http://svn.apache.org/r1659146
Log:
Minor code refactor

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Thu Feb 12 04:53:20 2015
@@ -105,6 +105,7 @@ public interface Constants {
   // Job configuration related parameters.
   // /////////////////////////////////////////////
   public static final String JOB_INPUT_DIR = "bsp.input.dir";
+  public static final String JOB_OUTPUT_DIR = "bsp.output.dir";
   public static final String JOB_PEERS_COUNT = "bsp.peers.num";
   public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class";
   public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class";

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Feb 12 04:53:20 2015
@@ -331,7 +331,7 @@ public class BSPJobClient extends Config
     short replication = (short) job.getInt("bsp.submit.replication", 10);
 
     // only create the splits if we have an input
-    if ((job.get("bsp.input.dir") != null)
+    if ((job.get(Constants.JOB_INPUT_DIR) != null)
         || (job.get("bsp.join.expr") != null)) {
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb 12 04:53:20 2015
@@ -257,7 +257,7 @@ public final class BSPPeerImpl<K1, V1, K
       wrap.initCause(exp);
       throw wrap;
     }
-
+    
     if (inputSplit != null) {
       DataInputBuffer splitBuffer = new DataInputBuffer();
       splitBuffer.reset(split.getBytes(), 0, split.getLength());
@@ -314,12 +314,14 @@ public final class BSPPeerImpl<K1, V1, K
   @SuppressWarnings("unchecked")
   public final void initializeIO() throws Exception {
 
-    initInput();
+    if (conf.get(Constants.JOB_INPUT_DIR) != null) {
+      initInput();
+    }
 
     String outdir = null;
-    if (conf.get("bsp.output.dir") != null) {
-      Path outputDir = new Path(conf.get("bsp.output.dir",
-          "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
+    if (conf.get(Constants.JOB_OUTPUT_DIR) != null) {
+      Path outputDir = new Path(conf.get(Constants.JOB_OUTPUT_DIR, "tmp-"
+          + System.currentTimeMillis()), Task.getOutputName(partition));
       outdir = outputDir.makeQualified(fs).toString();
     }
     outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Thu Feb 12 04:53:20 2015
@@ -17,14 +17,11 @@
  */
 package org.apache.hama.graph;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.io.DataInputBuffer;
@@ -111,29 +108,6 @@ public final class GraphJobMessage imple
     return vertexId;
   }
 
-  private ByteArrayInputStream bis = null;
-  private DataInputStream dis = null;
-  List<Writable> valuesCache = new ArrayList<Writable>();
-
-  public List<Writable> getValues() {
-    bis = new ByteArrayInputStream(byteBuffer.toByteArray());
-    dis = new DataInputStream(bis);
-
-    if (valuesCache.isEmpty()) {
-      for (int i = 0; i < numOfValues; i++) {
-        try {
-          Writable v = GraphJobRunner.createVertexValue();
-          v.readFields(dis);
-          valuesCache.add(v);
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    }
-
-    return valuesCache;
-  }
-
   public byte[] getValuesBytes() {
     return byteBuffer.toByteArray();
   }
@@ -163,6 +137,10 @@ public final class GraphJobMessage imple
       add(v);
   }
 
+  public int getNumOfValues() {
+    return this.numOfValues;
+  }
+
   public GraphJobMessage(IntWritable size) {
     this.flag = VERTICES_SIZE_FLAG;
     this.integerMessage = size;
@@ -222,7 +200,6 @@ public final class GraphJobMessage imple
       byte[] temp = new byte[bytesLength];
       in.readFully(temp);
       bufferDos.write(temp);
-      bufferDos.flush();
     } else if (isMapMessage()) {
       map = new MapWritable();
       map.readFields(in);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Feb 12 04:53:20 2015
@@ -17,11 +17,12 @@
  */
 package org.apache.hama.graph;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
@@ -230,19 +231,22 @@ public final class GraphJobRunner<V exte
     notComputedVertices = new HashSet();
     notComputedVertices.addAll(vertices.keySet());
 
-    List<M> msgs = null;
+    Iterable<Writable> msgs = null;
     Vertex<V, E, M> vertex = null;
 
     while (currentMessage != null) {
       vertex = vertices.get((V) currentMessage.getVertexId());
 
-      msgs = (List<M>) currentMessage.getValues();
+      final int numOfValues = currentMessage.getNumOfValues();
+      final byte[] serializedMsgs = currentMessage.getValuesBytes();
+      msgs = getIterableMessages(numOfValues, serializedMsgs);
+
       if (vertex.isHalted()) {
         vertex.setActive();
       }
 
       if (!vertex.isHalted()) {
-        vertex.compute(msgs);
+        vertex.compute((Iterable<M>) msgs);
         notComputedVertices.remove(vertex.getVertexID());
         activeVertices++;
       }
@@ -644,6 +648,42 @@ public final class GraphJobRunner<V exte
     return (X) ReflectionUtils.newInstance(EDGE_VALUE_CLASS);
   }
 
+  public static Iterable<Writable> getIterableMessages(final int numOfValues,
+      final byte[] msgBytes) {
+
+    return new Iterable<Writable>() {
+      @Override
+      public Iterator<Writable> iterator() {
+        return new Iterator<Writable>() {
+          ByteArrayInputStream bis = new ByteArrayInputStream(msgBytes);
+          DataInputStream dis = new DataInputStream(bis);
+          int index = 0;
+
+          @Override
+          public boolean hasNext() {
+            return (index < numOfValues) ? true : false;
+          }
+
+          @Override
+          public Writable next() {
+            Writable v = GraphJobRunner.createVertexValue();
+            try {
+              v.readFields(dis);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+            index++;
+            return v;
+          }
+
+          @Override
+          public void remove() {
+          }
+        };
+      }
+    };
+  }
+
   public int getChangedVertexCnt() {
     return changedVertexCnt;
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java Thu Feb 12 04:53:20 2015
@@ -17,6 +17,11 @@
  */
 package org.apache.hama.graph;
 
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+
 import org.apache.directmemory.DirectMemory;
 import org.apache.directmemory.cache.CacheService;
 import org.apache.directmemory.memory.Pointer;
@@ -29,11 +34,6 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.util.ReflectionUtils;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
-
 /**
  * An off heap version of a {@link org.apache.hama.graph.Vertex} storage.
  */

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Thu Feb 12 04:53:20 2015
@@ -39,7 +39,7 @@ public class OutgoingVertexMessageManage
       .getLog(OutgoingVertexMessageManager.class);
 
   private Combiner<Writable> combiner;
-
+  private Iterable<Writable> msgs;
   private HashMap<InetSocketAddress, MessagePerVertex> storage = new HashMap<InetSocketAddress, MessagePerVertex>();
 
   @SuppressWarnings("unchecked")
@@ -72,12 +72,15 @@ public class OutgoingVertexMessageManage
       msgPerVertex.add(vertexID, msg);
 
       // Combining messages
-      if (combiner != null
-          && msgPerVertex.get(vertexID).getValues().size() > 1) {
-        storage.get(targetPeerAddress).put(
-            vertexID,
-            new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
-                vertexID).getValues())));
+      if (combiner != null && msgPerVertex.get(vertexID).getNumOfValues() > 1) {
+
+        final int numOfValues = msgPerVertex.get(vertexID).getNumOfValues();
+        final byte[] msgBytes = msgPerVertex.get(vertexID).getValuesBytes();
+        msgs = GraphJobRunner.getIterableMessages(numOfValues, msgBytes);
+
+        // Overwrite
+        storage.get(targetPeerAddress).put(vertexID,
+            new GraphJobMessage(vertexID, combiner.combine(msgs)));
       }
     } else {
       outgoingBundles.get(targetPeerAddress).addMessage(msg);