You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2017/06/02 17:14:31 UTC

git commit: updated refs/heads/trunk to e2f82b2

Repository: giraph
Updated Branches:
  refs/heads/trunk 02e73b917 -> e2f82b25d


JIRA-1148

closes #39


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e2f82b25
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e2f82b25
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e2f82b25

Branch: refs/heads/trunk
Commit: e2f82b25d19faa263bbdbdf074948c6537d85e3a
Parents: 02e73b9
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Jun 2 10:14:18 2017 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Jun 2 10:14:18 2017 -0700

----------------------------------------------------------------------
 .../UndirectedConnectedComponents.java          | 11 ++-
 .../apache/giraph/block_app/library/Pieces.java | 90 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/e2f82b25/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
index fb04fa8..4398c06 100644
--- a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
+++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
@@ -352,10 +352,15 @@ public class UndirectedConnectedComponents {
     Pair<LongWritable, LongWritable> componentToReducePair = Pair.of(
         new LongWritable(), new LongWritable(1));
     LongWritable reusableLong = new LongWritable();
-    return Pieces.reduceAndBroadcast(
-        "CalcConnectedComponentSizes",
+    // This reduce operation is stateless so we can use a single instance
+    BasicMapReduce<LongWritable, LongWritable, LongWritable> reduceOperation =
         new BasicMapReduce<>(
-            LongTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG),
+            LongTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG);
+    return Pieces.reduceAndBroadcastWithArrayOfHandles(
+        "CalcConnectedComponentSizes",
+        3137, /* Just using some large prime number */
+        () -> reduceOperation,
+        vertex -> getComponent.get(vertex).get(),
         (Vertex<LongWritable, V, Writable> vertex) -> {
           componentToReducePair.getLeft().set(getComponent.get(vertex).get());
           return componentToReducePair;

http://git-wip-us.apache.org/repos/asf/giraph/blob/e2f82b25/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
index 614f4ba..587ae65 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
@@ -17,7 +17,9 @@
  */
 package org.apache.giraph.block_app.library;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
@@ -26,13 +28,16 @@ import org.apache.giraph.block_app.framework.api.CreateReducersApi;
 import org.apache.giraph.block_app.framework.piece.Piece;
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle;
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
 import org.apache.giraph.block_app.library.internal.SendMessagePiece;
 import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece;
+import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.function.Consumer;
 import org.apache.giraph.function.PairConsumer;
+import org.apache.giraph.function.Supplier;
 import org.apache.giraph.function.vertex.ConsumerWithVertex;
 import org.apache.giraph.function.vertex.SupplierFromVertex;
 import org.apache.giraph.graph.Vertex;
@@ -320,6 +325,91 @@ public class Pieces {
   }
 
   /**
+   * Like reduceAndBroadcast, but uses array of handles for reducers and
+   * broadcasts, to make it feasible and performant when values are large.
+   * Each supplied value to reduce will be reduced in the handle defined by
+   * handleHashSupplier%numHandles
+   *
+   * @param <S> Single value type, objects passed on workers
+   * @param <R> Reduced value type
+   * @param <I> Vertex id type
+   * @param <V> Vertex value type
+   * @param <E> Edge value type
+   */
+  public static
+  <S, R extends Writable, I extends WritableComparable, V extends Writable,
+      E extends Writable>
+  Piece<I, V, E, NoMessage, Object> reduceAndBroadcastWithArrayOfHandles(
+      final String name,
+      final int numHandles,
+      final Supplier<ReduceOperation<S, R>> reduceOp,
+      final SupplierFromVertex<I, V, E, Long> handleHashSupplier,
+      final SupplierFromVertex<I, V, E, S> valueSupplier,
+      final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
+    return new Piece<I, V, E, NoMessage, Object>() {
+      protected ArrayOfHandles.ArrayOfReducers<S, R> reducers;
+      protected BroadcastArrayHandle<R> broadcasts;
+
+      private int getHandleIndex(Vertex<I, V, E> vertex) {
+        return (int) Math.abs(handleHashSupplier.get(vertex) % numHandles);
+      }
+
+      @Override
+      public void registerReducers(
+          final CreateReducersApi reduceApi, Object executionStage) {
+        reducers = new ArrayOfHandles.ArrayOfReducers<>(
+            numHandles,
+            new Supplier<ReducerHandle<S, R>>() {
+              @Override
+              public ReducerHandle<S, R> get() {
+                return reduceApi.createLocalReducer(reduceOp.get());
+              }
+            });
+      }
+
+      @Override
+      public VertexSender<I, V, E> getVertexSender(
+          BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
+          Object executionStage) {
+        return new InnerVertexSender() {
+          @Override
+          public void vertexSend(Vertex<I, V, E> vertex) {
+            reducers.get(getHandleIndex(vertex)).reduce(
+                valueSupplier.get(vertex));
+          }
+        };
+      }
+
+      @Override
+      public void masterCompute(BlockMasterApi master, Object executionStage) {
+        broadcasts = reducers.broadcastValue(master);
+      }
+
+      @Override
+      public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
+          BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+        final List<R> values = new ArrayList<>();
+        for (int i = 0; i < numHandles; i++) {
+          values.add(broadcasts.get(i).getBroadcast(workerApi));
+        }
+        return new InnerVertexReceiver() {
+          @Override
+          public void vertexReceive(
+              Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
+            reducedValueConsumer.apply(
+                vertex, values.get(getHandleIndex(vertex)));
+          }
+        };
+      }
+
+      @Override
+      public String toString() {
+        return name;
+      }
+    };
+  }
+
+  /**
    * Creates Piece that for each vertex, sends message provided by
    * messageSupplier to all targets provided by targetsSupplier.
    * Received messages are then passed to and processed by provided