You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/13 05:36:33 UTC
git commit: updated refs/heads/trunk to 17355f5
Repository: giraph
Updated Branches:
refs/heads/trunk 79e7f1c98 -> 17355f558
[GIRAPH-1013] Add BlockExecutionTest
Summary:
Add support for executing single blocks, as well as adding a test for core of the framework
Equivalent to internal https://phabricator.fb.com/D2137589 diff.
Test Plan: mvn clean install
Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov
Reviewed By: sergey.edunov
Differential Revision: https://reviews.facebook.net/D39873
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/17355f55
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/17355f55
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/17355f55
Branch: refs/heads/trunk
Commit: 17355f55811be1b1392c3ca066fb9adf803846d3
Parents: 79e7f1c
Author: Igor Kabiljo <ik...@fb.com>
Authored: Mon Jun 8 16:24:45 2015 -0700
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Fri Jun 12 20:35:52 2015 -0700
----------------------------------------------------------------------
.../giraph/block_app/framework/BlockUtils.java | 52 ++++---
.../framework/api/local/LocalBlockRunner.java | 106 ++++++++++---
.../framework/internal/BlockMasterLogic.java | 23 ++-
.../block_app/framework/BlockExecutionTest.java | 156 +++++++++++++++++++
4 files changed, 290 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
index df260f5..3175a55 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
@@ -54,7 +54,8 @@ public class BlockUtils {
/** Property describing BlockFactory to use for current application run */
public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS =
ClassConfOption.create(
- "digraph.block_worker_context_value_class", null, Object.class,
+ "digraph.block_worker_context_value_class",
+ Object.class, Object.class,
"block worker context value class");
private static final Logger LOG = Logger.getLogger(BlockUtils.class);
@@ -136,6 +137,34 @@ public class BlockUtils {
// Create blocks to detect issues before creating a Giraph job
// They will not be used here
Block executionBlock = blockFactory.createBlock(immConf);
+ checkBlockTypes(
+ executionBlock, blockFactory.createExecutionStage(immConf),
+ conf, immConf);
+
+ // check for non 'static final' fields in BlockFactories
+ Class<?> bfClass = blockFactory.getClass();
+ while (!bfClass.equals(Object.class)) {
+ for (Field field : bfClass.getDeclaredFields()) {
+ if (!Modifier.isStatic(field.getModifiers()) ||
+ !Modifier.isFinal(field.getModifiers())) {
+ throw new IllegalStateException("BlockFactory (" + bfClass +
+ ") cannot have any mutable (non 'static final') fields as a " +
+ "safety measure, as createBlock function is called from a " +
+ "different context then all other functions, use conf argument " +
+ "instead, or make it 'static final'. Field present: " + field);
+ }
+ }
+ bfClass = bfClass.getSuperclass();
+ }
+
+ // Register outputs
+ blockFactory.registerOutputs(conf);
+ }
+
+ public static void checkBlockTypes(
+ Block executionBlock, Object executionStage,
+ GiraphConfiguration conf,
+ final ImmutableClassesGiraphConfiguration immConf) {
LOG.info("Executing application - " + executionBlock);
final Class<?> vertexIdClass = GiraphConstants.VERTEX_ID_CLASS.get(conf);
@@ -146,7 +175,7 @@ public class BlockUtils {
final Class<?> workerContextValueClass =
BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.get(conf);
final Class<?> executionStageClass =
- blockFactory.createExecutionStage(conf).getClass();
+ executionStage.getClass();
// Check for type inconsistencies
executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
@@ -183,24 +212,5 @@ public class BlockUtils {
}
}
});
-
- // check for non 'static final' fields in BlockFactories
- Class<?> bfClass = blockFactory.getClass();
- while (!bfClass.equals(Object.class)) {
- for (Field field : bfClass.getDeclaredFields()) {
- if (!Modifier.isStatic(field.getModifiers()) ||
- !Modifier.isFinal(field.getModifiers())) {
- throw new IllegalStateException("BlockFactory (" + bfClass +
- ") cannot have any mutable (non 'static final') fields as a " +
- "safety measure, as createBlock function is called from a " +
- "different context then all other functions, use conf argument " +
- "instead, or make it 'static final'. Field present: " + field);
- }
- }
- bfClass = bfClass.getSuperclass();
- }
-
- // Register outputs
- blockFactory.registerOutputs(conf);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
index bdf3233..ea6817f 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
@@ -27,7 +27,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.giraph.block_app.framework.BlockFactory;
+import org.apache.giraph.block_app.framework.BlockUtils;
import org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi;
+import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic;
@@ -50,12 +53,20 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
/**
- * Local in-memory Block application job runner, used for testing.
+ * Local in-memory Block application job runner.
+ * Implementation should be faster then using InternalVertexRunner.
+ *
+ * Useful for fast testing.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class LocalBlockRunner {
- public static final IntConfOption NUM_WORKERS = new IntConfOption(
- "test.LocalBlockRunner.NUM_WORKERS", 3, "");
+ /** Number of threads to use */
+ public static final IntConfOption NUM_THREADS = new IntConfOption(
+ "test.LocalBlockRunner.NUM_THREADS", 3, "");
+ /**
+ * Whether to run all supported checks. Disable if you are running this
+ * not within a unit test, and on a large graph, where performance matters.
+ */
public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption(
"test.LocalBlockRunner.RUN_ALL_CHECKS", true, "");
// merge into RUN_ALL_CHECKS, after SERIALIZE_MASTER starts working
@@ -66,41 +77,77 @@ public class LocalBlockRunner {
private LocalBlockRunner() { }
/**
+ * Run Block Application specified within the conf, on a given graph,
+ * locally, in-memory.
+ *
* With a boolean flag, you can switch between LocalBlockRunner and
- * InternalVertexRunner for running the unit test.
+ * InternalVertexRunner implementations of local in-memory computation.
*/
public static
<I extends WritableComparable, V extends Writable, E extends Writable>
- TestGraph<I, V, E> runWithInMemoryOutput(
+ TestGraph<I, V, E> runApp(
TestGraph<I, V, E> graph, GiraphConfiguration conf,
boolean useFullDigraphTests) throws Exception {
if (useFullDigraphTests) {
return InternalVertexRunner.runWithInMemoryOutput(conf, graph);
} else {
- runWithInMemoryOutput(graph, conf);
+ runApp(graph, conf);
return graph;
}
}
+ /**
+ * Run Block Application specified within the conf, on a given graph,
+ * locally, in-memory.
+ */
public static
<I extends WritableComparable, V extends Writable, E extends Writable>
- void runWithInMemoryOutput(
- TestGraph<I, V, E> graph, GiraphConfiguration conf) throws Exception {
- VertexSaver<I, V, E> noOpVertexSaver = new VertexSaver<I, V, E>() {
- @Override
- public void saveVertex(Vertex<I, V, E> vertex) {
- // No-op
- }
- };
- runWithVertexSaverOutput(graph, noOpVertexSaver, conf);
+ void runApp(TestGraph<I, V, E> graph, GiraphConfiguration conf) {
+ VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver();
+ runAppWithVertexOutput(graph, noOpVertexSaver, conf);
}
+ /**
+ * Run Block from a specified execution stage on a given graph,
+ * locally, in-memory.
+ */
+ public static
+ <I extends WritableComparable, V extends Writable, E extends Writable>
+ void runBlock(
+ TestGraph<I, V, E> graph, Block block, Object executionStage,
+ GiraphConfiguration conf) {
+ VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver();
+ runBlockWithVertexOutput(
+ block, executionStage, graph, noOpVertexSaver, conf);
+ }
+
+
+ /**
+ * Run Block Application specified within the conf, on a given graph,
+ * locally, in-memory, with a given vertexSaver.
+ */
public static
<I extends WritableComparable, V extends Writable, E extends Writable>
- void runWithVertexSaverOutput(
+ void runAppWithVertexOutput(
TestGraph<I, V, E> graph, final VertexSaver<I, V, E> vertexSaver,
- GiraphConfiguration conf) throws Exception {
- int numWorkers = NUM_WORKERS.get(conf);
+ GiraphConfiguration conf) {
+ BlockFactory<?> factory = BlockUtils.createBlockFactory(conf);
+ runBlockWithVertexOutput(
+ factory.createBlock(conf), factory.createExecutionStage(conf),
+ graph, vertexSaver, conf);
+ }
+
+ /**
+ * Run Block from a specified execution stage on a given graph,
+ * locally, in-memory, with a given vertexSaver.
+ */
+ public static
+ <I extends WritableComparable, V extends Writable, E extends Writable>
+ void runBlockWithVertexOutput(
+ Block block, Object executionStage, TestGraph<I, V, E> graph,
+ final VertexSaver<I, V, E> vertexSaver, GiraphConfiguration conf
+ ) {
+ int numWorkers = NUM_THREADS.get(conf);
boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
boolean serializeMaster = SERIALIZE_MASTER.get(conf);
final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
@@ -111,8 +158,10 @@ public class LocalBlockRunner {
new InternalApi(graph, immConf, runAllChecks);
final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi();
+ BlockUtils.checkBlockTypes(block, executionStage, conf, immConf);
+
BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>();
- blockMasterLogic.initialize(immConf, internalApi);
+ blockMasterLogic.initialize(block, executionStage, internalApi);
BlockWorkerContextLogic workerContextLogic =
internalApi.getWorkerContextLogic();
@@ -231,7 +280,12 @@ public class LocalBlockRunner {
});
}
- latch.await();
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Thread intentionally interrupted", e);
+ }
+
if (exception.get() != null) {
throw new RuntimeException("Worker failed", exception.get());
}
@@ -244,4 +298,16 @@ public class LocalBlockRunner {
workerContextLogic.postApplication();
}
+
+ private static
+ <I extends WritableComparable, E extends Writable, V extends Writable>
+ VertexSaver<I, V, E> noOpVertexSaver() {
+ return new VertexSaver<I, V, E>() {
+ @Override
+ public void saveVertex(Vertex<I, V, E> vertex) {
+ // No-op
+ }
+ };
+ }
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
index 3b87372..4892a33 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
@@ -49,14 +49,26 @@ public class BlockMasterLogic<S> {
private BlockWorkerPieces previousWorkerPieces;
private boolean computationDone;
+ /**
+ * Initialize master logic to execute BlockFactory defined in
+ * the configuration.
+ */
public void initialize(
- GiraphConfiguration conf, final BlockMasterApi masterApi)
- throws InstantiationException, IllegalAccessException {
+ GiraphConfiguration conf, final BlockMasterApi masterApi) {
+ BlockFactory<S> factory = BlockUtils.createBlockFactory(conf);
+ initialize(factory.createBlock(conf), factory.createExecutionStage(conf),
+ masterApi);
+ }
+
+ /**
+ * Initialize Master Logic to execute given block, starting
+ * with given executionStage.
+ */
+ public void initialize(
+ Block executionBlock, S executionStage, final BlockMasterApi masterApi) {
this.masterApi = masterApi;
this.computationDone = false;
- BlockFactory<S> factory = BlockUtils.createBlockFactory(conf);
- Block executionBlock = factory.createBlock(conf);
LOG.info("Executing application - " + executionBlock);
// We register all possible aggregators at the beginning
@@ -82,8 +94,7 @@ public class BlockMasterLogic<S> {
// iterating. So passing piece as null, and initial state as current state,
// so that nothing get's executed in first half, and calculateNextState
// returns initial state.
- previousPiece = new PairedPieceAndStage<>(
- null, factory.createExecutionStage(conf));
+ previousPiece = new PairedPieceAndStage<>(null, executionStage);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
new file mode 100644
index 0000000..b77c797
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.types.NoMessage;
+import org.apache.giraph.utils.TestGraph;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.python.google.common.collect.Iterables;
+
+/**
+ * Test of barebones of Blocks Framework.
+ *
+ * Do not look as an example of unit test, or to learn about the Framework,
+ * there are utilities to do things simpler, that we are not trying to test
+ * here.
+ */
+public class BlockExecutionTest {
+
+ private static GiraphConfiguration createConf() {
+ GiraphConfiguration conf = new GiraphConfiguration();
+ GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
+ GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
+ GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
+ return conf;
+ }
+
+ private static TestGraph<LongWritable, LongWritable, NullWritable> createTestGraph(
+ GiraphConfiguration conf) {
+ TestGraph<LongWritable, LongWritable, NullWritable> graph =
+ new TestGraph<LongWritable, LongWritable, NullWritable>(conf);
+ graph.addVertex(new LongWritable(1), new LongWritable());
+ graph.addVertex(new LongWritable(2), new LongWritable());
+ graph.addVertex(new LongWritable(3), new LongWritable());
+ graph.addVertex(new LongWritable(4), new LongWritable());
+
+ graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
+ graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get());
+ graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get());
+ graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get());
+ return graph;
+ }
+
+ @Test
+ public void testMessageSending() {
+ GiraphConfiguration conf = createConf();
+ TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(conf);
+
+ LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, LongWritable, Writable, BooleanWritable, Object>() {
+ @Override
+ public VertexSender<WritableComparable, LongWritable, Writable> getVertexSender(
+ final BlockWorkerSendApi<WritableComparable, LongWritable, Writable, BooleanWritable> workerApi,
+ Object executionStage) {
+ return new InnerVertexSender() {
+ @Override
+ public void vertexSend(Vertex<WritableComparable, LongWritable, Writable> vertex) {
+ workerApi.sendMessageToAllEdges(vertex, new BooleanWritable());
+ }
+ };
+ }
+
+ @Override
+ public VertexReceiver<WritableComparable, LongWritable, Writable, BooleanWritable>
+ getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi,
+ Object executionStage) {
+ return new InnerVertexReceiver() {
+ @Override
+ public void vertexReceive(Vertex<WritableComparable, LongWritable, Writable> vertex,
+ Iterable<BooleanWritable> messages) {
+ vertex.getValue().set(Iterables.size(messages));
+ }
+ };
+ }
+
+ @Override
+ protected Class<BooleanWritable> getMessageClass() {
+ return BooleanWritable.class;
+ }
+ }, new Object(), conf);
+
+ Assert.assertEquals(1, graph.getVertex(new LongWritable(1)).getValue().get());
+ Assert.assertEquals(2, graph.getVertex(new LongWritable(2)).getValue().get());
+ Assert.assertEquals(1, graph.getVertex(new LongWritable(3)).getValue().get());
+ Assert.assertEquals(0, graph.getVertex(new LongWritable(4)).getValue().get());
+ }
+
+ @Test
+ public void testReducing() {
+ GiraphConfiguration conf = createConf();
+ TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(conf);
+
+ final LongWritable value = new LongWritable();
+
+ LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, Writable, Writable, NoMessage, Object>() {
+ private ReducerHandle<LongWritable, LongWritable> numVertices;
+
+ @Override
+ public void registerReducers(CreateReducersApi reduceApi, Object executionStage) {
+ numVertices = reduceApi.createLocalReducer(SumReduce.LONG);
+ }
+
+ @Override
+ public VertexSender<WritableComparable, Writable, Writable> getVertexSender(
+ BlockWorkerSendApi<WritableComparable, Writable, Writable, NoMessage> workerApi,
+ Object executionStage) {
+
+ return new InnerVertexSender() {
+ @Override
+ public void vertexSend(Vertex<WritableComparable, Writable, Writable> vertex) {
+ numVertices.reduce(new LongWritable(1));
+ }
+ };
+ }
+
+ @Override
+ public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
+ value.set(numVertices.getReducedValue(masterApi).get());
+ }
+ }, new Object(), conf);
+
+ Assert.assertEquals(4, value.get());
+ }
+}