You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/16 20:13:11 UTC

git commit: updated refs/heads/trunk to 27f234f

Repository: giraph
Updated Branches:
  refs/heads/trunk b9d20edec -> 27f234f1f


[GIRAPH-1013] Add migration library

Summary:
Add library that simplifies migration to Blocks Framework

Copied one of the example tests, that use both master, computation and worker context,
to show it all works without any code change

Test Plan: mvn clean install -Phadoop_facebook

Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D39891


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/27f234f1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/27f234f1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/27f234f1

Branch: refs/heads/trunk
Commit: 27f234f1f2f7e5f30dd6dfaffaa0109a98be49ac
Parents: b9d20ed
Author: Igor Kabiljo <ik...@fb.com>
Authored: Wed Jun 10 10:33:04 2015 -0700
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Tue Jun 16 11:12:30 2015 -0700

----------------------------------------------------------------------
 findbugs-exclude.xml                            |   4 +
 .../migration/MigrationAbstractComputation.java | 183 +++++++++
 .../migration/MigrationFullBlockFactory.java    | 107 +++++
 .../migration/MigrationMasterCompute.java       | 261 ++++++++++++
 .../block_app/migration/MigrationPiece.java     | 399 +++++++++++++++++++
 .../migration/MigrationSuperstepStage.java      |  26 ++
 .../migration/MigrationSuperstepStageImpl.java  |  50 +++
 .../migration/MigrationWorkerContext.java       | 103 +++++
 .../block_app/migration/package-info.java       |  24 ++
 .../apache/giraph/function/ObjectHolder.java    |  56 +++
 .../apache/giraph/function/ObjectTransfer.java  | 113 ++++++
 .../function/primitive/PrimitiveRefs.java       | 107 +++++
 giraph-examples/pom.xml                         |   4 +
 .../SimpleMigrationMasterBlockFactory.java      | 160 ++++++++
 .../giraph/examples/block_app/package-info.java |  21 +
 .../block_app/TestMigrationBspBasic.java        |  60 +++
 .../giraph/examples/block_app/package-info.java |  21 +
 17 files changed, 1699 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index 0c2ab96..cc1a05a 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -97,6 +97,10 @@
     <Bug pattern="UL_UNRELEASED_LOCK"/>
   </Match>
   <Match>
+    <Class name="~org.apache.giraph.function.primitive.PrimitiveRefs.*Ref"/>
+    <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
+  </Match>
+  <Match>
     <!-- Java Serialization is not used, so this is never an actual issue.
       On the other hand, Kryo needs lambdas to be Serializable to work. -->
     <Bug pattern="SE_BAD_FIELD,SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationAbstractComputation.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationAbstractComputation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationAbstractComputation.java
new file mode 100644
index 0000000..5520558
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationAbstractComputation.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.migration;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.conf.TypesHolder;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Replacement for AbstractComputation when migrating to
+ * Blocks Framework, disallowing functions that are tied to
+ * execution order.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M1> Incoming Message Value
+ * @param <M2> Outgoing Message Value
+ */
+@SuppressWarnings("rawtypes")
+public class MigrationAbstractComputation<I extends WritableComparable,
+    V extends Writable, E extends Writable, M1 extends Writable,
+    M2 extends Writable> extends WorkerAggregatorDelegator<I, V, E>
+    implements TypesHolder<I, V, E, M1, M2>, Writable {
+  private BlockWorkerSendApi<I, V, E, M2> api;
+  private MigrationWorkerContext workerContext;
+  private long superstep;
+
+  final void init(
+      BlockWorkerSendApi<I, V, E, M2> workerApi,
+      MigrationWorkerContext workerContext,
+      long superstep) {
+    this.api = workerApi;
+    this.workerContext = workerContext;
+    this.superstep = superstep;
+    setWorkerGlobalCommUsage((WorkerGlobalCommUsage) workerApi);
+    setConf(workerApi.getConf());
+  }
+
+  public void compute(Vertex<I, V, E> vertex,
+      Iterable<M1> messages) throws IOException {
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+  }
+
+  public void preSuperstep() {
+  }
+
+  public void postSuperstep() {
+  }
+
+  @SuppressWarnings("deprecation")
+  public long getTotalNumVertices() {
+    return api.getTotalNumVertices();
+  }
+
+  @SuppressWarnings("deprecation")
+  public long getTotalNumEdges() {
+    return api.getTotalNumEdges();
+  }
+
+  public void sendMessage(I id, M2 message) {
+    api.sendMessage(id, message);
+  }
+
+  public final void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
+    api.sendMessageToAllEdges(vertex, message);
+  }
+
+  public final void sendMessageToMultipleEdges(
+      Iterator<I> vertexIdIterator, M2 message) {
+    api.sendMessageToMultipleEdges(vertexIdIterator, message);
+  }
+
+  public final void addVertexRequest(I id, V value,
+      OutEdges<I, E> edges) throws IOException {
+    api.addVertexRequest(id, value, edges);
+  }
+
+  public final void addVertexRequest(I id, V value) throws IOException {
+    api.addVertexRequest(id, value);
+  }
+
+  public final void removeVertexRequest(I vertexId) throws IOException {
+    api.removeVertexRequest(vertexId);
+  }
+
+  public final void addEdgeRequest(I sourceVertexId,
+      Edge<I, E> edge) throws IOException {
+    api.addEdgeRequest(sourceVertexId, edge);
+  }
+
+  public final void removeEdgesRequest(I sourceVertexId,
+      I targetVertexId) throws IOException {
+    api.removeEdgesRequest(sourceVertexId, targetVertexId);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <W extends MigrationWorkerContext> W getWorkerContext() {
+    return (W) workerContext;
+  }
+
+  /**
+   * Drop-in replacement for BasicComputation when migrating to
+   * Blocks Framework, disallowing functions that are tied to
+   * execution order.
+   *
+   * @param <I> Vertex ID
+   * @param <V> Vertex Value
+   * @param <E> Edge Value
+   * @param <M> Message type
+   */
+  public static class MigrationBasicComputation<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      extends MigrationAbstractComputation<I, V, E, M, M> {
+  }
+
+  /**
+   * Drop-in replacement for AbstractComputation when migrating to
+   * Blocks Framework.
+   *
+   * @param <I> Vertex ID
+   * @param <V> Vertex Value
+   * @param <E> Edge Value
+   * @param <M1> Incoming Message Value
+   * @param <M2> Outgoing Message Value
+   */
+  public static class MigrationFullAbstractComputation
+      <I extends WritableComparable, V extends Writable, E extends Writable,
+      M1 extends Writable, M2 extends Writable>
+      extends MigrationAbstractComputation<I, V, E, M1, M2> {
+    public long getSuperstep() {
+      return super.superstep;
+    }
+  }
+
+  /**
+   * Drop-in replacement for BasicComputation when migrating to
+   * Blocks Framework.
+   *
+   * @param <I> Vertex ID
+   * @param <V> Vertex Value
+   * @param <E> Edge Value
+   * @param <M> Message type
+   */
+  public static class MigrationFullBasicComputation
+      <I extends WritableComparable, V extends Writable, E extends Writable,
+      M extends Writable>
+      extends MigrationFullAbstractComputation<I, V, E, M, M> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationFullBlockFactory.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationFullBlockFactory.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationFullBlockFactory.java
new file mode 100644
index 0000000..3a8c9e9
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationFullBlockFactory.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.migration;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.AbstractBlockFactory;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
+import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.function.Consumer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+/**
+ * BlockFactory to extend when using drop-in migration
+ */
+public abstract class MigrationFullBlockFactory
+    extends AbstractBlockFactory<MigrationSuperstepStage> {
+
+  @Override
+  public MigrationSuperstepStage createExecutionStage(
+      GiraphConfiguration conf) {
+    return new MigrationSuperstepStageImpl();
+  }
+
+  @Override
+  protected Class<? extends MigrationWorkerContext> getWorkerContextValueClass(
+      GiraphConfiguration conf) {
+    return MigrationWorkerContext.class;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public <I extends WritableComparable, V extends Writable, E extends Writable,
+  MR extends Writable, MS extends Writable>
+  Block createMigrationAppBlock(
+      Class<? extends MigrationFullAbstractComputation<I, V, E, MR, MS>>
+        computationClass,
+      MigrationFullMasterCompute masterCompute,
+      Class<MS> messageClass,
+      Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass,
+      GiraphConfiguration conf) {
+    final MigrationPiece<I, V, E, MR, MS> piece =
+        MigrationPiece.createFirstFullMigrationPiece(
+            computationClass, masterCompute, messageClass,
+            messageCombinerClass);
+    piece.sanityTypeChecks(conf, null);
+
+    return new SequenceBlock(
+        new Piece<WritableComparable, Writable, Writable,
+            Writable, MigrationSuperstepStage>() {
+          @Override
+          public MigrationSuperstepStage nextExecutionStage(
+              MigrationSuperstepStage executionStage) {
+            return executionStage.changedMigrationSuperstep(0);
+          }
+        },
+        new Block() {
+          private MigrationPiece curPiece = piece;
+
+          @Override
+          public Iterator<AbstractPiece> iterator() {
+            return Iterators.concat(
+                Iterators.singletonIterator(curPiece),
+                new AbstractIterator<AbstractPiece>() {
+                  @Override
+                  protected AbstractPiece computeNext() {
+                    curPiece = curPiece.getNextPiece();
+                    if (curPiece == null) {
+                      endOfData();
+                    }
+                    return curPiece;
+                  }
+                });
+          }
+
+          @Override
+          public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+            consumer.apply(curPiece);
+          }
+        }
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationMasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationMasterCompute.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationMasterCompute.java
new file mode 100644
index 0000000..67d25c8
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationMasterCompute.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.migration;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.StatusReporter;
+import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.TypesHolder;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Replacement for MasterCompute when migrating to Blocks Framework,
+ * disallowing functions that are tied to execution order.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public abstract class MigrationMasterCompute
+    extends DefaultImmutableClassesGiraphConfigurable implements Writable {
+  private BlockMasterApi api;
+
+  final void init(BlockMasterApi masterApi) {
+    this.api = masterApi;
+    setConf(masterApi.getConf());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+  }
+
+  public void compute() {
+  }
+
+  public void initialize() throws InstantiationException,
+      IllegalAccessException {
+  }
+
+  @SuppressWarnings("deprecation")
+  public long getTotalNumVertices() {
+    return api.getTotalNumVertices();
+  }
+
+  @SuppressWarnings("deprecation")
+  public long getTotalNumEdges() {
+    return api.getTotalNumEdges();
+  }
+
+  public final <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp) {
+    api.registerReducer(name, reduceOp);
+  }
+
+  public final <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+    api.registerReducer(
+        name, reduceOp, globalInitialValue);
+  }
+
+  public final <T extends Writable> T getReduced(String name) {
+    return api.getReduced(name);
+  }
+
+  public final void broadcast(String name, Writable object) {
+    api.broadcast(name, object);
+  }
+
+  public final <A extends Writable> boolean registerAggregator(
+    String name, Class<? extends Aggregator<A>> aggregatorClass)
+    throws InstantiationException, IllegalAccessException {
+    return api.registerAggregator(
+        name, aggregatorClass);
+  }
+
+  @SuppressWarnings("deprecation")
+  public final <A extends Writable> boolean registerPersistentAggregator(
+      String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    return api.registerPersistentAggregator(name, aggregatorClass);
+  }
+
+  public final <A extends Writable> A getAggregatedValue(String name) {
+    return api.<A>getAggregatedValue(name);
+  }
+
+  public final <A extends Writable> void setAggregatedValue(
+      String name, A value) {
+    api.setAggregatedValue(name, value);
+  }
+
+  public final void logToCommandLine(String line) {
+    api.logToCommandLine(line);
+  }
+
+  public final StatusReporter getContext() {
+    return api;
+  }
+
+  /**
+   * Drop-in replacement for MasterCompute when migrating
+   * to Blocks Framework.
+   */
+  public static class MigrationFullMasterCompute
+      extends MigrationMasterCompute {
+    private long superstep;
+    private boolean halt;
+    private Class<? extends MigrationAbstractComputation> computationClass;
+    private Class<? extends MigrationAbstractComputation> newComputationClass;
+    private Class<? extends Writable> originalMessage;
+    private Class<? extends Writable> newMessage;
+    private Class<? extends MessageCombiner> originalMessageCombiner;
+    private Class<? extends MessageCombiner> newMessageCombiner;
+
+    final void init(
+        long superstep,
+        Class<? extends MigrationAbstractComputation> computationClass,
+        Class<? extends Writable> message,
+        Class<? extends MessageCombiner> messageCombiner) {
+      this.superstep = superstep;
+      this.halt = false;
+      this.computationClass = computationClass;
+      this.newComputationClass = null;
+      this.originalMessage = message;
+      this.newMessage = null;
+      this.originalMessageCombiner = messageCombiner;
+      this.newMessageCombiner = null;
+    }
+
+    public final long getSuperstep() {
+      return superstep;
+    }
+
+    @Override
+    public final long getTotalNumVertices() {
+      if (superstep == 0) {
+        throw new RuntimeException(
+            "getTotalNumVertices not available in superstep=0");
+      }
+      return super.getTotalNumVertices();
+    }
+
+    @Override
+    public final long getTotalNumEdges() {
+      if (superstep == 0) {
+        throw new RuntimeException(
+            "getTotalNumEdges not available in superstep=0");
+      }
+      return super.getTotalNumEdges();
+    }
+
+
+    public final void haltComputation() {
+      halt = true;
+    }
+
+    public final boolean isHalted() {
+      return halt;
+    }
+
+    public final void setComputation(
+        Class<? extends MigrationFullAbstractComputation> computation) {
+      if (computation != null) {
+        newComputationClass = computation;
+      } else {
+        // TODO
+        this.computationClass = null;
+      }
+    }
+
+    public final
+    Class<? extends MigrationAbstractComputation> getComputation() {
+      if (newComputationClass != null) {
+        return newComputationClass;
+      }
+      if (computationClass != null) {
+        return computationClass;
+      }
+      return null;
+    }
+
+    public final void setMessageCombiner(
+        Class<? extends MessageCombiner> combinerClass) {
+      this.newMessageCombiner = combinerClass;
+    }
+
+    public final Class<? extends MessageCombiner> getMessageCombiner() {
+      return newMessageCombiner != null ?
+        newMessageCombiner : originalMessageCombiner;
+    }
+
+    public final void setIncomingMessage(
+        Class<? extends Writable> incomingMessageClass) {
+      if (!originalMessage.equals(incomingMessageClass)) {
+        throw new IllegalArgumentException(
+            originalMessage + " and " + incomingMessageClass + " must be same");
+      }
+    }
+
+    public final void setOutgoingMessage(
+        Class<? extends Writable> outgoingMessageClass) {
+      newMessage = outgoingMessageClass;
+    }
+
+    final Class<? extends Writable> getOutgoingMessage() {
+      if (newMessage != null) {
+        return newMessage;
+      }
+
+      if (newComputationClass == null) {
+        return originalMessage;
+      }
+      Class[] computationTypes = ReflectionUtils.getTypeArguments(
+          TypesHolder.class, newComputationClass);
+      return computationTypes[4];
+    }
+
+    final Class<? extends MigrationAbstractComputation> getComputationClass() {
+      return newComputationClass != null ?
+        newComputationClass : computationClass;
+    }
+
+    final
+    Class<? extends MigrationAbstractComputation> getNewComputationClass() {
+      return newComputationClass;
+    }
+
+    final Class<? extends Writable> getNewMessage() {
+      return newMessage;
+    }
+
+    final Class<? extends MessageCombiner> getNewMessageCombiner() {
+      return newMessageCombiner;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java
new file mode 100644
index 0000000..c60ea2c
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.migration;
+
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
+import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
+import org.apache.giraph.block_app.migration.MigrationWorkerContext.MigrationFullWorkerContext;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.conf.DefaultMessageClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.conf.TypesHolder;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Piece used when migrating applications to Blocks Framework.
+ *
+ * There are two migration levels:
+ * <ul>
+ * <li>
+ * drop-in replacement migration is completely compatible with previous code.
+ * Only necessary thing is to change parent classes from (AbstractComputation,
+ * MasterCompute, WorkerContext) to (MigrationFullAbstractComputation,
+ * MigrationFullMasterCompute and MigrationFullWorkerContext).
+ * After that, all you need to do is extend MigrationBlockFactory, and pass
+ * appropriate types and call createMigrationAppBlock with initial computations.
+ * <br>
+ * You can now combine multiple applications, or use any library written in the
+ * framework, but your application is left as one whole indivisible block.
+ * </li>
+ * <li>
+ * Piece-wise migration - which gives a set of independent pieces, which can
+ * then be combined with appropriate ordering logic within a BlockFactory.
+ * You need to modify parent classes in your code to
+ * (MigrationAbstractComputation, MigrationMasterCompute and
+ * MigrationWorkerContext), which don't have any methods that affect computation
+ *  ordering - and so calling those methods should be
+ * moved to logic within BlockFactory.
+ * Calling MigrationPiece.createMigrationPiece and passing appropriate
+ * computations, gives you an independent piece, that you can then use in the
+ * same way as before, but also combine it in any other way with other pieces
+ * you have or are written within a library.
+ * </li>
+ * </ul>
+ *
+ * Generally, migration path can be to first move to drop-in replacement without
+ * any effort, and then see which parts need to be modified to be able to use
+ * piece-wise migration. At the end, it should be trivial to move from
+ * piece-wise migration to directly using pieces, by just moving code around,
+ * if you want to.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <MPrev> Previous piece message type
+ * @param <M> Message type
+ */
+@SuppressWarnings("rawtypes")
+public final class MigrationPiece<I extends WritableComparable,
+    V extends Writable, E extends Writable, MPrev extends Writable,
+    M extends Writable> extends PieceWithWorkerContext<I, V, E, M,
+    MigrationWorkerContext, Writable, MigrationSuperstepStage> {
+
+  private final Class<? extends MigrationAbstractComputation<I, V, E, MPrev, M>>
+  computationClass;
+
+  private final transient MigrationMasterCompute masterCompute;
+  private final Supplier<Iterable<MPrev>> previousMessagesSupplier;
+  private final Consumer<Iterable<M>> currentMessagesConsumer;
+  private final transient Class<M> messageClass;
+  private final transient Class<? extends MessageCombiner<? super I, M>>
+  messageCombinerClass;
+
+  private final boolean isFullMigration;
+  private final boolean isFirstStep;
+
+  private transient MigrationPiece nextPiece;
+  private boolean isHalted;
+
+  private MigrationPiece(
+      Class<? extends MigrationAbstractComputation<I, V, E, MPrev, M>>
+        computationClass,
+      MigrationMasterCompute masterCompute, Supplier<Iterable<MPrev>>
+        previousMessagesSupplier,
+      Consumer<Iterable<M>> currentMessagesConsumer, Class<M> messageClass,
+      Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
+      boolean isFullMigration, boolean isFirstStep) {
+    this.computationClass = computationClass;
+    this.masterCompute = masterCompute;
+    this.previousMessagesSupplier = previousMessagesSupplier;
+    this.currentMessagesConsumer = currentMessagesConsumer;
+    this.messageClass = messageClass;
+    this.messageCombinerClass = messageCombinerClass;
+    this.isFullMigration = isFullMigration;
+    this.isFirstStep = isFirstStep;
+    isHalted = false;
+    nextPiece = null;
+    sanityChecks();
+  }
+
+
+  @SuppressWarnings("unchecked")
+  static <I extends WritableComparable, V extends Writable, E extends Writable,
+  MR extends Writable, MS extends Writable>
+  MigrationPiece<I, V, E, MR, MS> createFirstFullMigrationPiece(
+      Class<? extends MigrationAbstractComputation<I, V, E, MR, MS>>
+        computationClass,
+      MigrationFullMasterCompute masterCompute,
+      Class<MS> messageClass,
+      Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass) {
+    ObjectTransfer transfer = new ObjectTransfer();
+    return new MigrationPiece<>(
+        computationClass, masterCompute, transfer, transfer, messageClass,
+        messageCombinerClass,
+        true, true);
+  }
+
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, MR extends Writable, MS extends Writable>
+  MigrationPiece<I, V, E, MR, MS> createMigrationPiece(
+      Class<? extends MigrationAbstractComputation<I, V, E, MR, MS>>
+        computationClass,
+      MigrationMasterCompute masterCompute,
+      Supplier<Iterable<MR>> previousMessagesSupplier,
+      Consumer<Iterable<MS>> currentMessagesConsumer,
+      Class<MS> messageClass,
+      Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass) {
+    return new MigrationPiece<>(
+        computationClass, masterCompute, previousMessagesSupplier,
+        currentMessagesConsumer, messageClass, messageCombinerClass,
+        false, false);
+  }
+
+
+  private void sanityChecks() {
+    Preconditions.checkState(isFullMigration ==
+        MigrationFullAbstractComputation.class
+          .isAssignableFrom(computationClass));
+  }
+
+  void sanityTypeChecks(
+      GiraphConfiguration conf, Class<?> previousMessageClass) {
+    if (computationClass != null) {
+      final Class<?> vertexIdClass = GiraphConstants.VERTEX_ID_CLASS.get(conf);
+      final Class<?> vertexValueClass =
+          GiraphConstants.VERTEX_VALUE_CLASS.get(conf);
+      final Class<?> edgeValueClass =
+          GiraphConstants.EDGE_VALUE_CLASS.get(conf);
+
+      Class<?>[] classList = getTypeArguments(
+          TypesHolder.class, computationClass);
+      Preconditions.checkArgument(classList.length == 5);
+
+      ReflectionUtils.verifyTypes(
+          vertexIdClass, classList[0], "vertexId", computationClass);
+      ReflectionUtils.verifyTypes(
+          vertexValueClass, classList[1], "vertexValue", computationClass);
+      ReflectionUtils.verifyTypes(
+          edgeValueClass, classList[2], "edgeValue", computationClass);
+      if (previousMessageClass != null) {
+        ReflectionUtils.verifyTypes(
+            previousMessageClass, classList[3], "recvMessage",
+            computationClass);
+      }
+      ReflectionUtils.verifyTypes(
+          messageClass, classList[4], "sendMessage", computationClass);
+    }
+  }
+
+  @Override
+  public void registerAggregators(BlockMasterApi masterApi)
+      throws InstantiationException, IllegalAccessException {
+    if (masterCompute != null) {
+      masterCompute.init(masterApi);
+      masterCompute.initialize();
+    }
+  }
+
+  @Override
+  public VertexSender<I, V, E> getVertexSender(
+      BlockWorkerSendApi<I, V, E, M> workerApi,
+      MigrationSuperstepStage executionStage) {
+    if (computationClass == null || isFirstStep) {
+      return null;
+    }
+
+    final MigrationAbstractComputation<I, V, E, MPrev, M> computation =
+        ReflectionUtils.newInstance(computationClass);
+    computation.init(
+        workerApi, getWorkerValue(workerApi),
+        executionStage.getMigrationSuperstep() - 1);
+    computation.preSuperstep();
+
+    return new InnerVertexSender() {
+      @Override
+      public void vertexSend(Vertex<I, V, E> vertex) {
+        try {
+          Iterable<MPrev> messages = null;
+          if (previousMessagesSupplier != null) {
+            messages = previousMessagesSupplier.get();
+          }
+          if (messages == null) {
+            messages = Collections.<MPrev>emptyList();
+          }
+          computation.compute(vertex, messages);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void postprocess() {
+        computation.postSuperstep();
+      }
+    };
+  }
+
+  @Override
+  public void workerContextSend(
+      BlockWorkerContextSendApi<Writable> workerContextApi,
+      MigrationSuperstepStage executionStage,
+      MigrationWorkerContext workerValue) {
+    if (workerValue != null && !isFirstStep) {
+      workerValue.setApi(workerContextApi);
+      workerValue.postSuperstep();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void masterCompute(BlockMasterApi masterApi,
+      MigrationSuperstepStage executionStage) {
+    MigrationFullMasterCompute masterComputeF =
+        isFullMigration ? (MigrationFullMasterCompute) masterCompute : null;
+
+    if (masterCompute != null) {
+      masterCompute.init(masterApi);
+
+      if (masterComputeF != null) {
+        masterComputeF.init(
+            executionStage.getMigrationSuperstep(),
+            computationClass, messageClass, messageCombinerClass);
+      }
+
+      masterCompute.compute();
+    }
+
+    if (isFullMigration) {
+      if (masterComputeF != null) {
+        isHalted = masterComputeF.isHalted();
+        if (masterComputeF.isHalted()) {
+          nextPiece = null;
+        } else {
+          if (masterComputeF.getNewComputationClass() != null ||
+              masterComputeF.getNewMessage() != null ||
+                  masterComputeF.getNewMessageCombiner() != null) {
+            nextPiece = new MigrationPiece(
+                masterComputeF.getComputationClass(),
+                masterComputeF,
+                previousMessagesSupplier,
+                currentMessagesConsumer,
+                masterComputeF.getOutgoingMessage(),
+                masterComputeF.getMessageCombiner(),
+                true, false);
+          } else {
+            nextPiece = this;
+          }
+        }
+      } else {
+        nextPiece = this;
+      }
+      if (nextPiece != null) {
+        if (nextPiece.isFirstStep) {
+          nextPiece = new MigrationPiece<>(
+              computationClass,
+              masterComputeF,
+              previousMessagesSupplier,
+              currentMessagesConsumer,
+              messageClass,
+              messageCombinerClass,
+              true, false);
+        }
+        nextPiece.sanityTypeChecks(masterApi.getConf(), messageClass);
+      }
+    } else {
+      Preconditions.checkState(!isHalted);
+      Preconditions.checkState(nextPiece == null);
+    }
+  }
+
+  @Override
+  public void workerContextReceive(
+      BlockWorkerContextReceiveApi workerContextApi,
+      MigrationSuperstepStage executionStage,
+      MigrationWorkerContext workerValue, List<Writable> workerMessages) {
+    if (workerValue != null) {
+      workerValue.setApi(workerContextApi);
+      workerValue.setReceivedMessages(workerMessages);
+
+      if (isFirstStep && workerValue instanceof MigrationFullWorkerContext) {
+        try {
+          ((MigrationFullWorkerContext) workerValue).preApplication();
+        } catch (InstantiationException | IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      if (!isHalted) {
+        workerValue.preSuperstep();
+      }
+
+      if (isHalted && workerValue instanceof MigrationFullWorkerContext) {
+        ((MigrationFullWorkerContext) workerValue).postApplication();
+      }
+    }
+  }
+
+  @Override
+  public VertexReceiver<I, V, E, M> getVertexReceiver(
+      BlockWorkerReceiveApi<I> workerApi,
+      MigrationSuperstepStage executionStage) {
+    if (currentMessagesConsumer == null || isHalted) {
+      return null;
+    }
+
+    return new InnerVertexReceiver() {
+      @Override
+      public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
+        currentMessagesConsumer.apply(messages);
+      }
+    };
+  }
+
+  @Override
+  public MessageClasses<I, M> getMessageClasses(
+      ImmutableClassesGiraphConfiguration conf) {
+    return new DefaultMessageClasses(
+        messageClass,
+        DefaultMessageValueFactory.class,
+        messageCombinerClass,
+        GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf));
+  }
+
+  @Override
+  public MigrationSuperstepStage nextExecutionStage(
+      MigrationSuperstepStage executionStage) {
+    return executionStage.changedMigrationSuperstep(
+        executionStage.getMigrationSuperstep() + 1);
+  }
+
+  public MigrationPiece getNextPiece() {
+    Preconditions.checkState(isFullMigration);
+    MigrationPiece res = nextPiece;
+    nextPiece = null;
+    return res;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationSuperstepStage.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationSuperstepStage.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationSuperstepStage.java
new file mode 100644
index 0000000..a473848
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationSuperstepStage.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.migration;
+
+/**
+ * Execution stage holding information about migration superstep.
+ */
+public interface MigrationSuperstepStage {
+  int getMigrationSuperstep();
+  MigrationSuperstepStage changedMigrationSuperstep(int superstep);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationSuperstepStageImpl.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationSuperstepStageImpl.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationSuperstepStageImpl.java
new file mode 100644
index 0000000..b8ceeaf
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationSuperstepStageImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.migration;
+
+
+/**
+ * Implementation of execution stage holding information about migration
+ * superstep.
+ */
+public class MigrationSuperstepStageImpl implements MigrationSuperstepStage {
+  private final int superstep;
+
+  public MigrationSuperstepStageImpl() {
+    this.superstep = 0;
+  }
+
+  public MigrationSuperstepStageImpl(int superstep) {
+    this.superstep = superstep;
+  }
+
+  @Override
+  public int getMigrationSuperstep() {
+    return superstep;
+  }
+
+  @Override
+  public MigrationSuperstepStageImpl changedMigrationSuperstep(int superstep) {
+    return new MigrationSuperstepStageImpl(superstep);
+  }
+
+  @Override
+  public String toString() {
+    return "MigrationSuperstep[" + superstep + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java
new file mode 100644
index 0000000..014c269
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.migration;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Replacement for WorkerContext when migrating to Blocks Framework,
+ * disallowing functions that are tied to execution order.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MigrationWorkerContext
+    extends DefaultImmutableClassesGiraphConfigurable
+    implements Writable {
+  private BlockWorkerContextApi api;
+  private List<Writable> receivedMessages;
+
+  public void setApi(BlockWorkerContextApi api) {
+    this.api = api;
+    this.setConf(api.getConf());
+  }
+
+  public void setReceivedMessages(List<Writable> receivedMessages) {
+    this.receivedMessages = receivedMessages;
+  }
+
+  public void preSuperstep() { }
+
+  public void postSuperstep() { }
+
+  @SuppressWarnings("deprecation")
+  public long getTotalNumVertices() {
+    return api.getTotalNumVertices();
+  }
+
+  @SuppressWarnings("deprecation")
+  public long getTotalNumEdges() {
+    return api.getTotalNumEdges();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+  }
+
+  public final int getWorkerCount() {
+    return api.getWorkerCount();
+  }
+
+  public final int getMyWorkerIndex() {
+    return api.getMyWorkerIndex();
+  }
+
+  public final List<Writable> getAndClearMessagesFromOtherWorkers() {
+    List<Writable> ret = receivedMessages;
+    receivedMessages = null;
+    return ret;
+  }
+
+  public final void sendMessageToWorker(Writable message, int workerIndex) {
+    ((BlockWorkerContextSendApi<Writable>) api).sendMessageToWorker(
+        message, workerIndex);
+  }
+
+  /**
+   * Drop-in replacement for WorkerContext when migrating to
+   * Blocks Framework.
+   */
+  public static class MigrationFullWorkerContext
+      extends MigrationWorkerContext {
+    public void preApplication()
+        throws InstantiationException, IllegalAccessException {
+    }
+
+    public void postApplication() { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/package-info.java
new file mode 100644
index 0000000..c5b06e1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Migration utility for transforming standard Giraph applications, into
+ * Block Applications.
+ *
+ * Read Javadoc in MigrationPiece.
+ */
+package org.apache.giraph.block_app.migration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/function/ObjectHolder.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/ObjectHolder.java b/giraph-block-app/src/main/java/org/apache/giraph/function/ObjectHolder.java
new file mode 100644
index 0000000..fd531bd
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/ObjectHolder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+/**
+ * Default object holder, intermediary between producers and consumers.
+ *
+ * Holds value in memory, so can be used only when producer and consumer
+ * are in the same context.
+ *
+ * Useful when value is set on the master, and later read in block logic
+ * (RepeatUntilBlock), or in a different Piece, either on worker or master.
+ * If it is read within the same piece - just use local field.
+ *
+ * @param <T> Type of object to hold.
+ */
+public class ObjectHolder<T> implements Supplier<T>, Consumer<T> {
+  private T value;
+
+  public ObjectHolder(T value) {
+    this.value = value;
+  }
+
+  public ObjectHolder() {
+  }
+
+  @Override
+  public T get() {
+    return value;
+  }
+
+  @Override
+  public void apply(T value) {
+    this.value = value;
+  }
+
+  @Override
+  public String toString() {
+    return getClass() + " [value=" + value + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/function/ObjectTransfer.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/ObjectTransfer.java b/giraph-block-app/src/main/java/org/apache/giraph/function/ObjectTransfer.java
new file mode 100644
index 0000000..8d72017
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/ObjectTransfer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Default object transfer, intermediary between producers and consumers.
+ *
+ * Holds value temporarily in memory, so can be used only when producer and
+ * consumer are in the same context. Nulls it out after supplying, so each
+ * object is returned only once, and second consecutive call to 'get' will
+ * return null.
+ *
+ * Useful for both:
+ *
+ * - passing data from vertexReceive function of WorkerReceivePiece of previous
+ * Piece to vertexSend function WorkerSendPiece of next Piece, of the same
+ * vertex.
+ * - when value is set on the master, and later read in block logic
+ * (RepeatUntilBlock), or in a different Piece, either on worker or master.
+ * If it is read within the same piece - just use local field.
+ *
+ * @param <T> Type of object to transfer.
+ */
+public class ObjectTransfer<T> implements Supplier<T>, Consumer<T> {
+  private T value;
+
+  public ObjectTransfer(T value) {
+    this.value = value;
+  }
+
+  public ObjectTransfer() {
+  }
+
+  @Override
+  public T get() {
+    T result = value;
+    value = null;
+    return result;
+  }
+
+  @Override
+  public void apply(T value) {
+    this.value = value;
+  }
+
+  @Override
+  public String toString() {
+    return getClass() + " [value=" + value + "]";
+  }
+
+  /**
+   * To be called when needing to pass it as a Supplier - making it
+   * obvious that K, V and E on supplier side can be any types,
+   * and to make code work without compile errors/warnings.
+   *
+   * In Java7, some callsites might need explicit types:
+   * object.<LongWritable, DoubleWritable, Writable>castToSupplier()
+   * In Java8, object.castToSupplier() is always going to be enough.
+   */
+  // TODO Java8: cleanup callers
+  @SuppressWarnings("rawtypes")
+  public <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, T> castToSupplier() {
+    return new SupplierFromVertex<I, V, E, T>() {
+      @Override
+      public T get(Vertex<I, V, E> vertex) {
+        return ObjectTransfer.this.get();
+      }
+    };
+  }
+
+  /**
+   * To be called when needing to pass it as a Consumer - making it
+   * obvious that K, V and E on consumer side can be any types,
+   * and to make code work without compile errors/warnings.
+   *
+   * In Java7, some callsites might need explicit types:
+   * object.<LongWritable, DoubleWritable, Writable>castToConsumer()
+   * In Java8, object.castToConsumer() is always going to be enough.
+   */
+  // TODO Java8: cleanup callers
+  @SuppressWarnings("rawtypes")
+  public <I extends WritableComparable, V extends Writable, E extends Writable>
+  ConsumerWithVertex<I, V, E, T> castToConsumer() {
+    return new ConsumerWithVertex<I, V, E, T>() {
+      @Override
+      public void apply(Vertex<I, V, E> vertex, T value) {
+        ObjectTransfer.this.apply(value);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/PrimitiveRefs.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/PrimitiveRefs.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/PrimitiveRefs.java
new file mode 100644
index 0000000..9a9f742
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/PrimitiveRefs.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.primitive;
+
+/**
+ * Convenience classes holding primitive values - a reference
+ * to a mutable value.
+ * For use when lambdas need to mutate capturing primitive local variable.
+ * (since lambdas cannot capture and modify local variables)
+ */
+public interface PrimitiveRefs {
+
+  // Have public field for convenience, since classes have no logic inside
+  // CHECKSTYLE: stop VisibilityModifierCheck
+
+  /**
+   * Convenience class holding int value,
+   * for use when lambdas need to mutate capturing int local variable.
+   */
+  public class IntRef {
+    public int value;
+
+    public IntRef(int value) {
+      this.value = value;
+    }
+  }
+
+  /**
+   * Convenience class holding long value,
+   * for use when lambdas need to mutate capturing long local variable.
+   */
+  public class LongRef {
+    public long value;
+
+    public LongRef(long value) {
+      this.value = value;
+    }
+  }
+
+  /**
+   * Convenience class holding int value,
+   * for use when lambdas need to mutate capturing int local variable.
+   */
+  public class ShortRef {
+    public short value;
+
+    public ShortRef(short value) {
+      this.value = value;
+    }
+  }
+
+
+  /**
+   * Convenience class holding float value,
+   * for use when lambdas need to mutate capturing float local variable.
+   */
+  public class FloatRef {
+    public float value;
+
+    public FloatRef(float value) {
+      this.value = value;
+    }
+  }
+
+  /**
+   * Convenience class holding double value,
+   * for use when lambdas need to mutate capturing double local variable.
+   */
+  public class DoubleRef {
+    public double value;
+
+    public DoubleRef(double value) {
+      this.value = value;
+    }
+  }
+
+  /**
+   * Convenience class holding object values,
+   * for use when lambdas need to mutate capturing object local variable.
+   *
+   * <O> object
+   */
+  public class ObjRef<O> {
+    public O value;
+
+    public ObjRef(O value) {
+      this.value = value;
+    }
+  }
+
+  // CHECKSTYLE: resume VisibilityModifierCheck
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-examples/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml
index 414c8c9..934f984 100644
--- a/giraph-examples/pom.xml
+++ b/giraph-examples/pom.xml
@@ -432,6 +432,10 @@ under the License.
       <version>1.2.0-SNAPSHOT</version>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.apache.giraph</groupId>
+      <artifactId>giraph-block-app</artifactId>
+    </dependency>
 
     <!-- runtime dependency -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-examples/src/main/java/org/apache/giraph/examples/block_app/SimpleMigrationMasterBlockFactory.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/block_app/SimpleMigrationMasterBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/block_app/SimpleMigrationMasterBlockFactory.java
new file mode 100644
index 0000000..38bcba5
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/block_app/SimpleMigrationMasterBlockFactory.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.examples.block_app;
+
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation;
+import org.apache.giraph.block_app.migration.MigrationFullBlockFactory;
+import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
+import org.apache.giraph.block_app.migration.MigrationWorkerContext.MigrationFullWorkerContext;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+/**
+ * Demonstrates using migration library for Blocks Framework,
+ * as an drop-in replacement, without any changes.
+ */
+public class SimpleMigrationMasterBlockFactory
+    extends MigrationFullBlockFactory {
+  @Override
+  public Block createBlock(GiraphConfiguration conf) {
+    return createMigrationAppBlock(
+        SimpleMigrationMasterComputation.class,
+        new SimpleMigrationMasterCompute(),
+        DoubleWritable.class,
+        null,
+        conf);
+  }
+
+  @Override
+  protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) {
+    return LongWritable.class;
+  }
+
+  @Override
+  protected Class<DoubleWritable> getVertexValueClass(
+      GiraphConfiguration conf) {
+    return DoubleWritable.class;
+  }
+
+  @Override
+  protected Class<FloatWritable> getEdgeValueClass(GiraphConfiguration conf) {
+    return FloatWritable.class;
+  }
+
+  @Override
+  protected
+  Class<SimpleMigrationMasterWorkerContext> getWorkerContextValueClass(
+      GiraphConfiguration conf) {
+    return SimpleMigrationMasterWorkerContext.class;
+  }
+
+  // Full copy from org.apache.giraph.examples.SimpleMasterComputeComputation
+  // Just extending MigrationFull drop-in replacements instead.
+
+  /**
+   * Demonstrates a computation with a centralized part implemented via a
+   * MasterCompute.
+   */
+  public static class SimpleMigrationMasterComputation
+      extends MigrationFullBasicComputation<LongWritable, DoubleWritable,
+      FloatWritable, DoubleWritable> {
+    /** Aggregator to get values from the master to the workers */
+    public static final String SMC_AGG = "simplemastercompute.aggregator";
+
+    /** Logger */
+    private static final Logger LOG =
+        Logger.getLogger(SimpleMigrationMasterComputation.class);
+
+    @Override
+    public void compute(
+        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+        Iterable<DoubleWritable> messages) throws IOException {
+      double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
+      double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
+      double newSum = oldSum + newValue;
+      vertex.setValue(new DoubleWritable(newSum));
+      SimpleMigrationMasterWorkerContext workerContext = getWorkerContext();
+      workerContext.setFinalSum(newSum);
+      LOG.info("Current sum: " + newSum);
+    }
+  }
+
+  /**
+   * Worker context used with {@link SimpleMigrationMasterComputation}.
+   */
+  public static class SimpleMigrationMasterWorkerContext
+      extends MigrationFullWorkerContext {
+    /** Final sum value for verification for local jobs */
+    private static double FINAL_SUM;
+
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+    }
+
+    @Override
+    public void preSuperstep() {
+    }
+
+    @Override
+    public void postSuperstep() {
+    }
+
+    @Override
+    public void postApplication() {
+    }
+
+    public static void setFinalSum(double sum) {
+      FINAL_SUM = sum;
+    }
+
+    public static double getFinalSum() {
+      return FINAL_SUM;
+    }
+  }
+
+  /**
+   * MasterCompute used with {@link SimpleMigrationMasterComputation}.
+   */
+  public static class SimpleMigrationMasterCompute
+      extends MigrationFullMasterCompute {
+    @Override
+    public void compute() {
+      setAggregatedValue(SimpleMigrationMasterComputation.SMC_AGG,
+          new DoubleWritable(((double) getSuperstep()) / 2 + 1));
+      if (getSuperstep() == 10) {
+        haltComputation();
+      }
+    }
+
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(SimpleMigrationMasterComputation.SMC_AGG,
+          DoubleOverwriteAggregator.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-examples/src/main/java/org/apache/giraph/examples/block_app/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/block_app/package-info.java b/giraph-examples/src/main/java/org/apache/giraph/examples/block_app/package-info.java
new file mode 100644
index 0000000..881f2c7
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/block_app/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Giraph examples for Block Applications.
+ */
+package org.apache.giraph.examples.block_app;

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-examples/src/test/java/org/apache/giraph/examples/block_app/TestMigrationBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/block_app/TestMigrationBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/examples/block_app/TestMigrationBspBasic.java
new file mode 100644
index 0000000..cb30ab0
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/block_app/TestMigrationBspBasic.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples.block_app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
+import org.apache.giraph.job.GiraphJob;
+import org.junit.Test;
+
+/**
+ * Unit test for many simple BSP applications.
+ */
+public class
+    TestMigrationBspBasic extends BspCase {
+
+  public TestMigrationBspBasic() {
+    super(TestMigrationBspBasic.class.getName());
+  }
+
+  /**
+   * Run a sample BSP job locally and test using migration
+   * library for Blocks Framework, as an drop-in replacement.
+   */
+  @Test
+  public void testBspMigrationToBlocksFramework() throws Exception {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+    BlockUtils.setAndInitBlockFactoryClass(
+        conf, SimpleMigrationMasterBlockFactory.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), conf);
+    assertTrue(job.run(true));
+    if (!runningInDistributedMode()) {
+      double finalSum =
+          SimpleMigrationMasterBlockFactory.SimpleMigrationMasterWorkerContext.getFinalSum();
+      System.out.println("testBspMasterCompute: finalSum=" + finalSum);
+      assertEquals(32.5, finalSum, 0d);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/27f234f1/giraph-examples/src/test/java/org/apache/giraph/examples/block_app/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/block_app/package-info.java b/giraph-examples/src/test/java/org/apache/giraph/examples/block_app/package-info.java
new file mode 100644
index 0000000..d6912e6
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/block_app/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of tests for Giraph examples for Block Applications.
+ */
+package org.apache.giraph.examples.block_app;