You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/23 01:24:05 UTC

svn commit: r1675515 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Author: edwardyoon
Date: Wed Apr 22 23:24:05 2015
New Revision: 1675515

URL: http://svn.apache.org/r1675515
Log:
minor improvements

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675515&r1=1675514&r2=1675515&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Apr 22 23:24:05 2015
@@ -264,7 +264,7 @@ public final class GraphJobRunner<V exte
 
     executor.shutdown();
     try {
-      executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+      executor.awaitTermination(60, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       LOG.error(e);
     }
@@ -313,7 +313,7 @@ public final class GraphJobRunner<V exte
 
     executor.shutdown();
     try {
-      executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+      executor.awaitTermination(60, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       LOG.error(e);
     }
@@ -412,8 +412,7 @@ public final class GraphJobRunner<V exte
     EDGE_VALUE_CLASS = edgeValueClass;
   }
 
-  private final ConcurrentHashMap<String, GraphJobMessage> messages = new ConcurrentHashMap<String, GraphJobMessage>();
-  private VertexInputReader<Writable, Writable, V, E, M> reader;
+  private final ConcurrentHashMap<Integer, GraphJobMessage> messages = new ConcurrentHashMap<Integer, GraphJobMessage>();
 
   /**
    * Loads vertices into memory of each peer.
@@ -423,7 +422,7 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
-    reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
+    VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
         .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
             VertexInputReader.class));
 
@@ -435,17 +434,8 @@ public final class GraphJobRunner<V exte
     try {
       KeyValuePair<Writable, Writable> next = null;
       while ((next = peer.readNext()) != null) {
-        Vertex<V, E, M> vertex = GraphJobRunner
-            .<V, E, M> newVertexInstance(VERTEX_CLASS);
-
-        boolean vertexFinished = reader.parseVertex(next.getKey(),
-            next.getValue(), vertex);
 
-        if (!vertexFinished) {
-          continue;
-        }
-
-        Runnable worker = new LoadWorker(vertex);
+        Runnable worker = new Parser(next.getKey(), next.getValue(), reader);
         executor.execute(worker);
 
       }
@@ -454,58 +444,95 @@ public final class GraphJobRunner<V exte
     }
 
     executor.shutdown();
-    executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+    executor.awaitTermination(60, TimeUnit.SECONDS);
 
-    Iterator<Entry<String, GraphJobMessage>> it;
+    Iterator<Entry<Integer, GraphJobMessage>> it;
     it = messages.entrySet().iterator();
     while (it.hasNext()) {
-      Entry<String, GraphJobMessage> e = it.next();
+      Entry<Integer, GraphJobMessage> e = it.next();
       it.remove();
       GraphJobMessage msg = e.getValue();
       msg.setFlag(GraphJobMessage.PARTITION_FLAG);
-      peer.send(e.getKey(), msg);
+      peer.send(getHostName(e.getKey()), msg);
     }
 
     peer.sync();
 
+    executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
+    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+    executor.setRejectedExecutionHandler(retryHandler);
+
     GraphJobMessage msg;
     while ((msg = peer.getCurrentMessage()) != null) {
+      executor.execute(new AddVertex(msg));
+    }
+
+    executor.shutdown();
+    executor.awaitTermination(60, TimeUnit.SECONDS);
+
+    LOG.info(vertices.size() + " vertices are loaded into "
+        + peer.getPeerName());
+  }
+
+  class AddVertex implements Runnable {
+    GraphJobMessage msg;
+
+    public AddVertex(GraphJobMessage msg) {
+      this.msg = msg;
+    }
+
+    @Override
+    public void run() {
       ByteArrayInputStream bis = new ByteArrayInputStream(msg.getValuesBytes());
       DataInputStream dis = new DataInputStream(bis);
 
       for (int i = 0; i < msg.getNumOfValues(); i++) {
-        Vertex<V, E, M> vertex = newVertexInstance(VERTEX_CLASS);
-        vertex.readFields(dis);
+        try {
+          Vertex<V, E, M> vertex = newVertexInstance(VERTEX_CLASS);
+          vertex.readFields(dis);
 
-        addVertex(vertex);
+          addVertex(vertex);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
       }
     }
 
-    LOG.info(vertices.size() + " vertices are loaded into "
-        + peer.getPeerName());
   }
 
-  class LoadWorker implements Runnable {
-    Vertex<V, E, M> vertex;
-
-    public LoadWorker(Vertex<V, E, M> vertex) {
-      this.vertex = vertex;
+  class Parser implements Runnable {
+    Writable key;
+    Writable value;
+    VertexInputReader<Writable, Writable, V, E, M> reader;
+
+    public Parser(Writable key, Writable value,
+        VertexInputReader<Writable, Writable, V, E, M> reader) {
+      this.key = key;
+      this.value = value;
+      this.reader = reader;
     }
 
     @Override
     public void run() {
       try {
-        String dstHost = getHostName(vertex.getVertexID());
-        if (peer.getPeerName().equals(dstHost)) {
-          addVertex(vertex);
-        } else {
-          if (!messages.containsKey(dstHost)) {
-            messages.putIfAbsent(dstHost, new GraphJobMessage());
+        Vertex<V, E, M> vertex = GraphJobRunner
+            .<V, E, M> newVertexInstance(VERTEX_CLASS);
+
+        boolean vertexFinished = reader.parseVertex(key, value, vertex);
+
+        if (vertexFinished) {
+          int partition = getPartitionID(vertex.getVertexID());
+
+          if (peer.getPeerIndex() == partition) {
+            addVertex(vertex);
+          } else {
+            if (!messages.containsKey(partition)) {
+              messages.putIfAbsent(partition, new GraphJobMessage());
+            }
+            messages.get(partition).add(serialize(vertex));
           }
-          messages.get(dstHost).add(serialize(vertex));
         }
-
-      } catch (IOException e) {
+      } catch (Exception e) {
         e.printStackTrace();
       }
     }
@@ -751,6 +778,14 @@ public final class GraphJobRunner<V exte
         peer.getNumPeers()));
   }
 
+  public int getPartitionID(V vertexID) {
+    return partitioner.getPartition(vertexID, null, peer.getNumPeers());
+  }
+
+  public String getHostName(int partitionID) {
+    return peer.getPeerName(partitionID);
+  }
+
   /**
    * @return the number of vertices, globally accumulated.
    */