You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/13 05:36:33 UTC

git commit: updated refs/heads/trunk to 17355f5

Repository: giraph
Updated Branches:
  refs/heads/trunk 79e7f1c98 -> 17355f558


[GIRAPH-1013] Add BlockExecutionTest

Summary:
Add support for executing single blocks, as well as adding a test for core of the framework

Equivalent to internal https://phabricator.fb.com/D2137589 diff.

Test Plan: mvn clean install

Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov

Reviewed By: sergey.edunov

Differential Revision: https://reviews.facebook.net/D39873


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/17355f55
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/17355f55
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/17355f55

Branch: refs/heads/trunk
Commit: 17355f55811be1b1392c3ca066fb9adf803846d3
Parents: 79e7f1c
Author: Igor Kabiljo <ik...@fb.com>
Authored: Mon Jun 8 16:24:45 2015 -0700
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Fri Jun 12 20:35:52 2015 -0700

----------------------------------------------------------------------
 .../giraph/block_app/framework/BlockUtils.java  |  52 ++++---
 .../framework/api/local/LocalBlockRunner.java   | 106 ++++++++++---
 .../framework/internal/BlockMasterLogic.java    |  23 ++-
 .../block_app/framework/BlockExecutionTest.java | 156 +++++++++++++++++++
 4 files changed, 290 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
index df260f5..3175a55 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
@@ -54,7 +54,8 @@ public class BlockUtils {
   /** Property describing BlockFactory to use for current application run */
   public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS =
       ClassConfOption.create(
-          "digraph.block_worker_context_value_class", null, Object.class,
+          "digraph.block_worker_context_value_class",
+          Object.class, Object.class,
           "block worker context value class");
 
   private static final Logger LOG = Logger.getLogger(BlockUtils.class);
@@ -136,6 +137,34 @@ public class BlockUtils {
     // Create blocks to detect issues before creating a Giraph job
     // They will not be used here
     Block executionBlock = blockFactory.createBlock(immConf);
+    checkBlockTypes(
+        executionBlock, blockFactory.createExecutionStage(immConf),
+        conf, immConf);
+
+    // check for non 'static final' fields in BlockFactories
+    Class<?> bfClass = blockFactory.getClass();
+    while (!bfClass.equals(Object.class)) {
+      for (Field field : bfClass.getDeclaredFields()) {
+        if (!Modifier.isStatic(field.getModifiers()) ||
+            !Modifier.isFinal(field.getModifiers())) {
+          throw new IllegalStateException("BlockFactory (" + bfClass +
+              ") cannot have any mutable (non 'static final') fields as a " +
+              "safety measure, as createBlock function is called from a " +
+              "different context then all other functions, use conf argument " +
+              "instead, or make it 'static final'. Field present: " + field);
+        }
+      }
+      bfClass = bfClass.getSuperclass();
+    }
+
+    // Register outputs
+    blockFactory.registerOutputs(conf);
+  }
+
+  public static void checkBlockTypes(
+      Block executionBlock, Object executionStage,
+      GiraphConfiguration conf,
+      final ImmutableClassesGiraphConfiguration immConf) {
     LOG.info("Executing application - " + executionBlock);
 
     final Class<?> vertexIdClass = GiraphConstants.VERTEX_ID_CLASS.get(conf);
@@ -146,7 +175,7 @@ public class BlockUtils {
     final Class<?> workerContextValueClass =
         BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.get(conf);
     final Class<?> executionStageClass =
-        blockFactory.createExecutionStage(conf).getClass();
+        executionStage.getClass();
 
     // Check for type inconsistencies
     executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
@@ -183,24 +212,5 @@ public class BlockUtils {
         }
       }
     });
-
-    // check for non 'static final' fields in BlockFactories
-    Class<?> bfClass = blockFactory.getClass();
-    while (!bfClass.equals(Object.class)) {
-      for (Field field : bfClass.getDeclaredFields()) {
-        if (!Modifier.isStatic(field.getModifiers()) ||
-            !Modifier.isFinal(field.getModifiers())) {
-          throw new IllegalStateException("BlockFactory (" + bfClass +
-              ") cannot have any mutable (non 'static final') fields as a " +
-              "safety measure, as createBlock function is called from a " +
-              "different context then all other functions, use conf argument " +
-              "instead, or make it 'static final'. Field present: " + field);
-        }
-      }
-      bfClass = bfClass.getSuperclass();
-    }
-
-    // Register outputs
-    blockFactory.registerOutputs(conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
index bdf3233..ea6817f 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
@@ -27,7 +27,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.giraph.block_app.framework.BlockFactory;
+import org.apache.giraph.block_app.framework.BlockUtils;
 import org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi;
+import org.apache.giraph.block_app.framework.block.Block;
 import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
 import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
 import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic;
@@ -50,12 +53,20 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 
 /**
- * Local in-memory Block application job runner, used for testing.
+ * Local in-memory Block application job runner.
+ * Implementation should be faster then using InternalVertexRunner.
+ *
+ * Useful for fast testing.
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class LocalBlockRunner {
-  public static final IntConfOption NUM_WORKERS = new IntConfOption(
-      "test.LocalBlockRunner.NUM_WORKERS", 3, "");
+  /** Number of threads to use */
+  public static final IntConfOption NUM_THREADS = new IntConfOption(
+      "test.LocalBlockRunner.NUM_THREADS", 3, "");
+  /**
+   * Whether to run all supported checks. Disable if you are running this
+   * not within a unit test, and on a large graph, where performance matters.
+   */
   public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption(
       "test.LocalBlockRunner.RUN_ALL_CHECKS", true, "");
   // merge into RUN_ALL_CHECKS, after SERIALIZE_MASTER starts working
@@ -66,41 +77,77 @@ public class LocalBlockRunner {
   private LocalBlockRunner() { }
 
   /**
+   * Run Block Application specified within the conf, on a given graph,
+   * locally, in-memory.
+   *
    * With a boolean flag, you can switch between LocalBlockRunner and
-   * InternalVertexRunner for running the unit test.
+   * InternalVertexRunner implementations of local in-memory computation.
    */
   public static
   <I extends WritableComparable, V extends Writable, E extends Writable>
-  TestGraph<I, V, E> runWithInMemoryOutput(
+  TestGraph<I, V, E> runApp(
       TestGraph<I, V, E> graph, GiraphConfiguration conf,
       boolean useFullDigraphTests) throws Exception {
     if (useFullDigraphTests) {
       return InternalVertexRunner.runWithInMemoryOutput(conf, graph);
     } else {
-      runWithInMemoryOutput(graph, conf);
+      runApp(graph, conf);
       return graph;
     }
   }
 
+  /**
+   * Run Block Application specified within the conf, on a given graph,
+   * locally, in-memory.
+   */
   public static
   <I extends WritableComparable, V extends Writable, E extends Writable>
-  void runWithInMemoryOutput(
-      TestGraph<I, V, E> graph, GiraphConfiguration conf) throws Exception {
-    VertexSaver<I, V, E> noOpVertexSaver = new VertexSaver<I, V, E>() {
-      @Override
-      public void saveVertex(Vertex<I, V, E> vertex) {
-        // No-op
-      }
-    };
-    runWithVertexSaverOutput(graph, noOpVertexSaver, conf);
+  void runApp(TestGraph<I, V, E> graph, GiraphConfiguration conf) {
+    VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver();
+    runAppWithVertexOutput(graph, noOpVertexSaver, conf);
   }
 
+  /**
+   * Run Block from a specified execution stage on a given graph,
+   * locally, in-memory.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  void runBlock(
+      TestGraph<I, V, E> graph, Block block, Object executionStage,
+      GiraphConfiguration conf) {
+    VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver();
+    runBlockWithVertexOutput(
+        block, executionStage, graph, noOpVertexSaver, conf);
+  }
+
+
+  /**
+   * Run Block Application specified within the conf, on a given graph,
+   * locally, in-memory, with a given vertexSaver.
+   */
   public static
   <I extends WritableComparable, V extends Writable, E extends Writable>
-  void runWithVertexSaverOutput(
+  void runAppWithVertexOutput(
       TestGraph<I, V, E> graph, final VertexSaver<I, V, E> vertexSaver,
-      GiraphConfiguration conf) throws Exception {
-    int numWorkers = NUM_WORKERS.get(conf);
+      GiraphConfiguration conf) {
+    BlockFactory<?> factory = BlockUtils.createBlockFactory(conf);
+    runBlockWithVertexOutput(
+        factory.createBlock(conf), factory.createExecutionStage(conf),
+        graph, vertexSaver, conf);
+  }
+
+  /**
+   * Run Block from a specified execution stage on a given graph,
+   * locally, in-memory, with a given vertexSaver.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  void runBlockWithVertexOutput(
+      Block block, Object executionStage, TestGraph<I, V, E> graph,
+      final VertexSaver<I, V, E> vertexSaver, GiraphConfiguration conf
+  ) {
+    int numWorkers = NUM_THREADS.get(conf);
     boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
     boolean serializeMaster = SERIALIZE_MASTER.get(conf);
     final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
@@ -111,8 +158,10 @@ public class LocalBlockRunner {
         new InternalApi(graph, immConf, runAllChecks);
     final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi();
 
+    BlockUtils.checkBlockTypes(block, executionStage, conf, immConf);
+
     BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>();
-    blockMasterLogic.initialize(immConf, internalApi);
+    blockMasterLogic.initialize(block, executionStage, internalApi);
 
     BlockWorkerContextLogic workerContextLogic =
         internalApi.getWorkerContextLogic();
@@ -231,7 +280,12 @@ public class LocalBlockRunner {
           });
         }
 
-        latch.await();
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          throw new RuntimeException("Thread intentionally interrupted", e);
+        }
+
         if (exception.get() != null) {
           throw new RuntimeException("Worker failed", exception.get());
         }
@@ -244,4 +298,16 @@ public class LocalBlockRunner {
 
     workerContextLogic.postApplication();
   }
+
+  private static
+  <I extends WritableComparable, E extends Writable, V extends Writable>
+  VertexSaver<I, V, E> noOpVertexSaver() {
+    return new VertexSaver<I, V, E>() {
+      @Override
+      public void saveVertex(Vertex<I, V, E> vertex) {
+        // No-op
+      }
+    };
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
index 3b87372..4892a33 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
@@ -49,14 +49,26 @@ public class BlockMasterLogic<S> {
   private BlockWorkerPieces previousWorkerPieces;
   private boolean computationDone;
 
+  /**
+   * Initialize master logic to execute BlockFactory defined in
+   * the configuration.
+   */
   public void initialize(
-      GiraphConfiguration conf, final BlockMasterApi masterApi)
-    throws InstantiationException, IllegalAccessException {
+      GiraphConfiguration conf, final BlockMasterApi masterApi) {
+    BlockFactory<S> factory = BlockUtils.createBlockFactory(conf);
+    initialize(factory.createBlock(conf), factory.createExecutionStage(conf),
+        masterApi);
+  }
+
+  /**
+   * Initialize Master Logic to execute given block, starting
+   * with given executionStage.
+   */
+  public void initialize(
+      Block executionBlock, S executionStage, final BlockMasterApi masterApi) {
     this.masterApi = masterApi;
     this.computationDone = false;
 
-    BlockFactory<S> factory = BlockUtils.createBlockFactory(conf);
-    Block executionBlock = factory.createBlock(conf);
     LOG.info("Executing application - " + executionBlock);
 
     // We register all possible aggregators at the beginning
@@ -82,8 +94,7 @@ public class BlockMasterLogic<S> {
     // iterating. So passing piece as null, and initial state as current state,
     // so that nothing get's executed in first half, and calculateNextState
     // returns initial state.
-    previousPiece = new PairedPieceAndStage<>(
-        null, factory.createExecutionStage(conf));
+    previousPiece = new PairedPieceAndStage<>(null, executionStage);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
new file mode 100644
index 0000000..b77c797
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.types.NoMessage;
+import org.apache.giraph.utils.TestGraph;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.python.google.common.collect.Iterables;
+
+/**
+ * Test of barebones of Blocks Framework.
+ *
+ * Do not look as an example of unit test, or to learn about the Framework,
+ * there are utilities to do things simpler, that we are not trying to test
+ * here.
+ */
+public class BlockExecutionTest {
+
+  private static GiraphConfiguration createConf() {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
+    return conf;
+  }
+
+  private static TestGraph<LongWritable, LongWritable, NullWritable> createTestGraph(
+      GiraphConfiguration conf) {
+    TestGraph<LongWritable, LongWritable, NullWritable> graph =
+        new TestGraph<LongWritable, LongWritable, NullWritable>(conf);
+    graph.addVertex(new LongWritable(1), new LongWritable());
+    graph.addVertex(new LongWritable(2), new LongWritable());
+    graph.addVertex(new LongWritable(3), new LongWritable());
+    graph.addVertex(new LongWritable(4), new LongWritable());
+
+    graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
+    graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get());
+    graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get());
+    graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get());
+    return graph;
+  }
+
+  @Test
+  public void testMessageSending() {
+    GiraphConfiguration conf = createConf();
+    TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(conf);
+
+    LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, LongWritable, Writable, BooleanWritable, Object>() {
+      @Override
+      public VertexSender<WritableComparable, LongWritable, Writable> getVertexSender(
+          final BlockWorkerSendApi<WritableComparable, LongWritable, Writable, BooleanWritable> workerApi,
+          Object executionStage) {
+        return new InnerVertexSender() {
+          @Override
+          public void vertexSend(Vertex<WritableComparable, LongWritable, Writable> vertex) {
+            workerApi.sendMessageToAllEdges(vertex, new BooleanWritable());
+          }
+        };
+      }
+
+      @Override
+      public VertexReceiver<WritableComparable, LongWritable, Writable, BooleanWritable>
+          getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi,
+              Object executionStage) {
+        return new InnerVertexReceiver() {
+          @Override
+          public void vertexReceive(Vertex<WritableComparable, LongWritable, Writable> vertex,
+              Iterable<BooleanWritable> messages) {
+            vertex.getValue().set(Iterables.size(messages));
+          }
+        };
+      }
+
+      @Override
+      protected Class<BooleanWritable> getMessageClass() {
+        return BooleanWritable.class;
+      }
+    }, new Object(), conf);
+
+    Assert.assertEquals(1, graph.getVertex(new LongWritable(1)).getValue().get());
+    Assert.assertEquals(2, graph.getVertex(new LongWritable(2)).getValue().get());
+    Assert.assertEquals(1, graph.getVertex(new LongWritable(3)).getValue().get());
+    Assert.assertEquals(0, graph.getVertex(new LongWritable(4)).getValue().get());
+  }
+
+  @Test
+  public void testReducing() {
+    GiraphConfiguration conf = createConf();
+    TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(conf);
+
+    final LongWritable value = new LongWritable();
+
+    LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, Writable, Writable, NoMessage, Object>() {
+      private ReducerHandle<LongWritable, LongWritable> numVertices;
+
+      @Override
+      public void registerReducers(CreateReducersApi reduceApi, Object executionStage) {
+        numVertices = reduceApi.createLocalReducer(SumReduce.LONG);
+      }
+
+      @Override
+      public VertexSender<WritableComparable, Writable, Writable> getVertexSender(
+          BlockWorkerSendApi<WritableComparable, Writable, Writable, NoMessage> workerApi,
+          Object executionStage) {
+
+        return new InnerVertexSender() {
+          @Override
+          public void vertexSend(Vertex<WritableComparable, Writable, Writable> vertex) {
+            numVertices.reduce(new LongWritable(1));
+          }
+        };
+      }
+
+      @Override
+      public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
+        value.set(numVertices.getReducedValue(masterApi).get());
+      }
+    }, new Object(), conf);
+
+    Assert.assertEquals(4, value.get());
+  }
+}