You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/11 04:50:41 UTC

[2/5] git commit: updated refs/heads/trunk to 819d6d3

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java
new file mode 100644
index 0000000..f06dd89
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.worker.WorkerReduceUsage;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * All logic for transforming Giraph's reducer API to reducer handles.
+ * Contains state of active reducers, and is kept within a Piece.
+ */
+public class ReducersForPieceHandler implements VertexSenderObserver {
+  private static final AtomicInteger HANDLER_COUNTER = new AtomicInteger();
+  private static final AtomicInteger BROADCAST_COUNTER = new AtomicInteger();
+
+  private final int handleIndex = HANDLER_COUNTER.incrementAndGet();
+  private final AtomicInteger reduceCounter = new AtomicInteger();
+
+  private final ArrayList<VertexSenderObserver> observers = new ArrayList<>();
+
+  @Override
+  public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
+    for (VertexSenderObserver observer : observers) {
+      observer.vertexSenderWorkerPreprocess(usage);
+    }
+  }
+
+  @Override
+  public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
+    for (VertexSenderObserver observer : observers) {
+      observer.vertexSenderWorkerPostprocess(usage);
+    }
+  }
+
+  public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer(
+      MasterGlobalCommUsage master,  ReduceOperation<S, R> reduceOp,
+      R globalInitialValue) {
+    LocalReduceHandle<S, R> handle = new LocalReduceHandle<>(reduceOp);
+    master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
+    observers.add(handle);
+    return handle;
+  }
+
+  public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer(
+      MasterGlobalCommUsage master,  ReduceOperation<S, R> reduceOp,
+      R globalInitialValue) {
+    ReduceHandleImpl<S, R> handle = new GlobalReduceHandle<>(reduceOp);
+    master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
+    observers.add(handle);
+    return handle;
+  }
+
+  /**
+   * Implementation of BroadcastHandle
+   *
+   * @param <T> Value type
+   */
+  public static class BroadcastHandleImpl<T> implements BroadcastHandle<T> {
+    private final String name;
+
+    public BroadcastHandleImpl() {
+      this.name = "_utils.broadcast." + BROADCAST_COUNTER.incrementAndGet();
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public T getBroadcast(WorkerBroadcastUsage worker) {
+      return worker.getBroadcast(name);
+    }
+  }
+
+  /**
+   * Parent implementation of ReducerHandle
+   *
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  public abstract class ReduceHandleImpl<S, R extends Writable>
+      implements ReducerHandle<S, R>, VertexSenderObserver {
+    protected final ReduceOperation<S, R> reduceOp;
+    private final String name;
+
+    private ReduceHandleImpl(ReduceOperation<S, R> reduceOp) {
+      this.reduceOp = reduceOp;
+      name = "_utils." + handleIndex +
+          ".reduce." + reduceCounter.incrementAndGet();
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public R getReducedValue(MasterGlobalCommUsage master) {
+      return master.getReduced(name);
+    }
+
+    @Override
+    public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
+      return unwrapHandle(master.broadcast(
+          new WrappedReducedValue<>(reduceOp, getReducedValue(master))));
+    }
+  }
+
+  private static <R extends Writable> BroadcastHandle<R> unwrapHandle(
+      final BroadcastHandle<WrappedReducedValue<R>> handle) {
+    return new BroadcastHandle<R>() {
+      @Override
+      public R getBroadcast(WorkerBroadcastUsage worker) {
+        return handle.getBroadcast(worker).getValue();
+      }
+    };
+  }
+
+  /**
+   * Wrapper that makes reduced values self-serializable,
+   * and allows them to be broadcasted.
+   *
+   * @param <R> Reduced value type
+   */
+  public static class WrappedReducedValue<R extends Writable>
+      implements Writable {
+    private ReduceOperation<?, R> reduceOp;
+    private R value;
+
+    public WrappedReducedValue() {
+    }
+
+    public WrappedReducedValue(ReduceOperation<?, R> reduceOp, R value) {
+      this.reduceOp = reduceOp;
+      this.value = value;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeWritableObject(reduceOp, out);
+      value.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      reduceOp = WritableUtils.readWritableObject(in, null);
+      value = reduceOp.createInitialValue();
+      value.readFields(in);
+    }
+
+    public R getValue() {
+      return value;
+    }
+  }
+
+  /**
+   * Global Reduce Handle is implementation of ReducerHandle, that will keep
+   * only one value for each worker, and each call to reduce will have
+   * to obtain a global lock, and incur synchronization costs.
+   * Use only when objects are so large, that having many copies cannot fit
+   * into memory.
+   *
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  public class GlobalReduceHandle<S, R extends Writable>
+      extends ReduceHandleImpl<S, R> {
+    private transient WorkerReduceUsage usage;
+
+    public GlobalReduceHandle(ReduceOperation<S, R> reduceOp) {
+      super(reduceOp);
+    }
+
+    @Override
+    public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
+      this.usage = usage;
+    }
+
+    @Override
+    public void reduce(S valueToReduce) {
+      usage.reduce(getName(), valueToReduce);
+    }
+
+    @Override
+    public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
+    }
+  }
+
+  /**
+   * Local Reduce Handle is implementation of ReducerHandle, that will make a
+   * partially reduced value on each worker thread, which are at the end
+   * reduced all together.
+   * This is preferred implementation, unless it cannot be used due to memory
+   * overhead, because all partially reduced values will not fit the memory.
+   *
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  public class LocalReduceHandle<S, R extends Writable>
+      extends ReduceHandleImpl<S, R> {
+    private transient Reducer<S, R> reducer;
+
+    public LocalReduceHandle(ReduceOperation<S, R> reduceOp) {
+      super(reduceOp);
+    }
+
+    @Override
+    public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
+      this.reducer = new Reducer<>(reduceOp);
+    }
+
+    @Override
+    public void reduce(S valueToReduce) {
+      reducer.reduce(valueToReduce);
+    }
+
+    @Override
+    public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
+      usage.reduceMerge(getName(), reducer.getCurrentValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java
new file mode 100644
index 0000000..5b3485f
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.internal;
+
+import org.apache.giraph.worker.WorkerReduceUsage;
+
+/**
+ * Observer able to hook into vertex sender pre/post processing
+ */
+public interface VertexSenderObserver {
+  void vertexSenderWorkerPreprocess(WorkerReduceUsage usage);
+  void vertexSenderWorkerPostprocess(WorkerReduceUsage usage);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java
new file mode 100644
index 0000000..1ba7c8f
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Reducer and Broadcast Handles internal implementation for automatic handling
+ * of global communication within Pieces, hiding a lot of it's complexities.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.internal;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java
new file mode 100644
index 0000000..50d7818
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.map;
+
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * Handle to map of broadcasts
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ */
+public interface BroadcastMapHandle<K, V>
+  extends MapHandle<K, BroadcastHandle<V>> {
+
+  /**
+   * Number of elements that were broadcasted.
+   */
+  int getBroadcastedSize(WorkerBroadcastUsage worker);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java
new file mode 100644
index 0000000..db01e77
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.map;
+
+/**
+ * Handle to map of handles underneath
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ */
+public interface MapHandle<K, V> {
+  /**
+   * Get value for key.
+   */
+  V get(K key);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java
new file mode 100644
index 0000000..5c31179
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.map;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+
+/**
+ * Handle to array of reducers
+ *
+ * @param <I> Key type
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public interface ReducerMapHandle<I, S, R>
+    extends MapHandle<I, ReducerHandle<S, R>> {
+  /**
+   * Number of elements that were reduced.
+   */
+  int getReducedSize(BlockMasterApi master);
+
+  /**
+   * Broadcast whole map of reducers to master
+   *
+   * @return Handle to the broadcasted map.
+   */
+  BroadcastMapHandle<I, R> broadcastValue(BlockMasterApi master);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java
new file mode 100644
index 0000000..092f864
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Interfaces representing map of individual handles
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.map;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java
new file mode 100644
index 0000000..0d40741
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Interfaces for Reducer and Broadcast Handles for automatic handling
+ * of global communication within Pieces, hiding a lot of it's complexities.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java
new file mode 100644
index 0000000..b6cc749
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.interfaces;
+
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+
+/**
+ * Interface containing a single function - postprocess.
+ *
+ * Marked to not allow seriazliation, as it should be created on the worker,
+ * so should never be serialiized, disallow only for catching problems early.
+ */
+public interface VertexPostprocessor extends NonKryoWritable {
+  /**
+   * Override to finish computation. This method is executed exactly once
+   * after computation for all vertices in the partition is complete.
+   */
+  void postprocess();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java
new file mode 100644
index 0000000..26912ee
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.interfaces;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface representing actions that happen on worker, for each vertex,
+ * during receive phase:
+ * <ul>
+ * <li> to receive messages from vertices </li>
+ * <li> to receive data from master through aggregators </li>
+ * </ul>
+ *
+ * Marked to not allow seriazliation, as it should be created on the worker,
+ * so should never be serialiized, disallow only for catching problems early.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexReceiver<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends NonKryoWritable {
+  /**
+   * Must be defined by user to do computation on a single Vertex.
+   *
+   * @param vertex   Vertex
+   * @param messages Messages that were sent to this vertex in the previous
+   *                 superstep.  Each message is only guaranteed to have
+   *                 a life expectancy as long as next() is not called.
+   */
+  void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java
new file mode 100644
index 0000000..0587032
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.interfaces;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface representing actions that happen on worker, for each vertex,
+ * during send phase:
+ * <ul>
+ * <li> to send messages to vertices </li>
+ * <li> to send data for aggregation on master </li>
+ * </ul>
+ *
+ * Marked to not allow seriazliation, as it should be created on the worker,
+ * so should never be serialiized, disallow only for catching problems early.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexSender<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends NonKryoWritable {
+  /** Must be defined by user to do computation on a single Vertex. */
+  void vertexSend(Vertex<I, V, E> vertex);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java
new file mode 100644
index 0000000..db05e78
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Vertex processing functions for Pieces
+ */
+package org.apache.giraph.block_app.framework.piece.interfaces;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java
new file mode 100644
index 0000000..dd977e6
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.messages;
+
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.writable.kryo.KryoWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * MessageClasses implementation that provides factory and combiner instances
+ * through a provided supplier.
+ *
+ * @param <I> Vertex id type
+ * @param <M> Message type
+ */
+public class ObjectMessageClasses<I extends WritableComparable,
+    M extends Writable> extends KryoWritable implements MessageClasses<I, M> {
+  private final Class<M> messageClass;
+  private final SupplierFromConf<MessageValueFactory<M>>
+  messageValueFactorySupplier;
+  private final SupplierFromConf<? extends MessageCombiner<? super I, M>>
+  messageCombinerSupplier;
+  private final MessageEncodeAndStoreType messageEncodeAndStoreType;
+
+  public ObjectMessageClasses() {
+    this(null, null, null, null);
+  }
+
+  public ObjectMessageClasses(Class<M> messageClass,
+      SupplierFromConf<MessageValueFactory<M>> messageValueFactorySupplier,
+      SupplierFromConf<? extends MessageCombiner<? super I, M>>
+        messageCombinerSupplier,
+      MessageEncodeAndStoreType messageEncodeAndStoreType) {
+    this.messageClass = messageClass;
+    this.messageValueFactorySupplier = messageValueFactorySupplier;
+    this.messageCombinerSupplier = messageCombinerSupplier;
+    this.messageEncodeAndStoreType = messageEncodeAndStoreType;
+  }
+
+  @Override
+  public Class<M> getMessageClass() {
+    return messageClass;
+  }
+
+  @Override
+  public MessageValueFactory<M> createMessageValueFactory(
+      ImmutableClassesGiraphConfiguration conf) {
+    return Preconditions.checkNotNull(messageValueFactorySupplier.apply(conf));
+  }
+
+  @Override
+  public MessageCombiner<? super I, M> createMessageCombiner(
+      ImmutableClassesGiraphConfiguration<I, ? extends Writable,
+        ? extends Writable> conf) {
+    return messageCombinerSupplier != null ?
+      Preconditions.checkNotNull(messageCombinerSupplier.apply(conf)) : null;
+  }
+
+  @Override
+  public boolean useMessageCombiner() {
+    return messageCombinerSupplier != null;
+  }
+
+  @Override
+  public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
+    return messageEncodeAndStoreType;
+  }
+
+  @Override
+  public MessageClasses<I, M> createCopyForNewSuperstep() {
+    return new ObjectMessageClasses<>(
+        messageClass, messageValueFactorySupplier,
+        messageCombinerSupplier, messageEncodeAndStoreType);
+  }
+
+  @Override
+  public void verifyConsistent(ImmutableClassesGiraphConfiguration conf) {
+    MessageValueFactory<M> messageValueFactory =
+        messageValueFactorySupplier.apply(conf);
+    Preconditions.checkState(
+        messageValueFactory.newInstance().getClass().equals(messageClass));
+
+    if (messageCombinerSupplier != null) {
+      MessageCombiner<? super I, M> messageCombiner =
+          messageCombinerSupplier.apply(conf);
+      Preconditions.checkState(messageCombiner.createInitialMessage()
+          .getClass().equals(messageClass));
+      Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
+          MessageCombiner.class, messageCombiner.getClass());
+      ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
+          "Vertex id", messageCombiner.getClass());
+      ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
+          "Outgoing message", messageCombiner.getClass());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java
new file mode 100644
index 0000000..00c86cd
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.messages;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.function.Function;
+import org.apache.giraph.writable.kryo.HadoopKryo;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Supplier from configuration
+ * @param <T> Type of object returned
+ */
+public interface SupplierFromConf<T>
+    extends Function<ImmutableClassesGiraphConfiguration, T> {
+
+  /**
+   * Supplier from configuration, by copying given instance every time.
+   *
+   * @param <T> Type of object returned
+   */
+  public static class SupplierFromConfByCopy<T> implements SupplierFromConf<T> {
+    private final T value;
+
+    public SupplierFromConfByCopy(T value) {
+      this.value = value;
+    }
+
+    @Override
+    public T apply(ImmutableClassesGiraphConfiguration conf) {
+      return HadoopKryo.createCopy(value);
+    }
+  }
+
+  /**
+   * Supplier from configuration returning DefaultMessageValueFactory instances.
+   *
+   * @param <M> Message type
+   */
+  public static class DefaultMessageFactorySupplierFromConf<M extends Writable>
+      implements SupplierFromConf<MessageValueFactory<M>> {
+    private final Class<M> messageClass;
+
+    public DefaultMessageFactorySupplierFromConf(Class<M> messageClass) {
+      this.messageClass = messageClass;
+    }
+
+    @Override
+    public MessageValueFactory<M> apply(
+        ImmutableClassesGiraphConfiguration conf) {
+      return new DefaultMessageValueFactory<>(messageClass, conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java
new file mode 100644
index 0000000..ba3014c
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utility classes for handling of messages within Pieces
+ */
+package org.apache.giraph.block_app.framework.piece.messages;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java
new file mode 100644
index 0000000..fbc6e92
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Single execution object - Piece, and related classes.
+ *
+ * AbstractPiece is parent class of all Pieces. Most frequentlly
+ * users should extend Piece class itself
+ */
+package org.apache.giraph.block_app.framework.piece;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java
new file mode 100644
index 0000000..2a0e36a
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+
+/**
+ * Function:
+ * (T) -> void
+ *
+ * @param <T> Argument type
+ */
+public interface Consumer<T> extends Serializable {
+  /**
+   * Applies this function to {@code input}
+   */
+  void apply(T input);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java
new file mode 100644
index 0000000..41046ba
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+
+/**
+ * Function:
+ * (F) -> T
+ *
+ * @param <F> Argument type
+ * @param <T> Result type
+ */
+public interface Function<F, T> extends Serializable {
+  /**
+   * Returns the result of applying this function to given {@code input}.
+   *
+   * The returned object may or may not be a new instance,
+   * depending on the implementation.
+   */
+  T apply(F input);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java b/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java
new file mode 100644
index 0000000..012ec82
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+/**
+ * Function:
+ * (T1, T2) -> void
+ *
+ * @param <T1> First argument type
+ * @param <T2> Second argument type
+ */
+public interface PairConsumer<T1, T2> extends Serializable {
+  /**
+   * Applies this function to {@code input1} and {@code input2}
+   */
+  void apply(T1 input1, T2 input2);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java
new file mode 100644
index 0000000..bfff400
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+
+/**
+ * Function:
+ * (F1, F2) -> T
+ *
+ * @param <F1> First argument type
+ * @param <F2> Second argument type
+ * @param <T> Result type
+ */
+public interface PairFunction<F1, F2, T> extends Serializable {
+  /**
+   * Returns the result of applying this function to given
+   * {@code input1} and {@code input2}.
+   *
+   * The returned object may or may not be a new instance,
+   * depending on the implementation.
+   */
+  T apply(F1 input1, F2 input2);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java
new file mode 100644
index 0000000..1813b54
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+/**
+ * Function:
+ * () -> T
+ * <br>
+ * Specialization of com.google.common.base.Supplier, that is also
+ * Serializable.
+ *
+ * @param <T> Result type
+ */
+public interface Supplier<T> extends Serializable {
+  /**
+   * Retrieves an instance of the appropriate type. The returned object may or
+   * may not be a new instance, depending on the implementation.
+   */
+  T get();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java
new file mode 100644
index 0000000..b089da6
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for interfaces representing different functions,
+ * that all extends Serializable in order for Kryo to be able
+ * to serialize them.
+ *
+ * Even when same interface is present in Guava, we do not extend it
+ * due to @Nullable annotations adding requirement of handling nulls.
+ */
+package org.apache.giraph.function;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java
new file mode 100644
index 0000000..2cf74e1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.primitive;
+
+/**
+ * Primitive specialization of Function:
+ * () -> int
+ */
+public interface IntSupplier {
+  /** Retrieves an int value. */
+  int get();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java
new file mode 100644
index 0000000..fdf40ff
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Primitive specializations of interfaces from org.apache.giraph.function
+ * package.
+ */
+package org.apache.giraph.function.primitive;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java
new file mode 100644
index 0000000..87e7f9b
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.vertex;
+
+import java.io.Serializable;
+
+import org.apache.giraph.function.PairConsumer;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Function:
+ * (vertex, T) -> void
+ *
+ * A class that can consume objects of a single type, when given a vertex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <T> Argument type
+ */
+@SuppressWarnings("rawtypes")
+public interface ConsumerWithVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, T>
+    extends PairConsumer<Vertex<I, V, E>, T>, Serializable  {
+  /**
+   * Applies this function to {@code vertex} and {@code input}
+   */
+  @Override
+  void apply(Vertex<I, V, E> vertex, T value);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java
new file mode 100644
index 0000000..fdab5de
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.vertex;
+
+import java.io.Serializable;
+
+import org.apache.giraph.function.PairFunction;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Function:
+ * (vertex, F) -> T
+ *
+ * Determines an output value based on a vertex and an input value.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <F> Argument type
+ * @param <T> Result type
+ */
+@SuppressWarnings("rawtypes")
+public interface FunctionWithVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, F, T>
+    extends PairFunction<Vertex<I, V, E>, F, T>, Serializable {
+  /**
+   * Returns the result of applying this function to given
+   * {@code vertex} and {@code input}.
+   *
+   * The returned object may or may not be a new instance,
+   * depending on the implementation.
+   */
+  @Override
+  T apply(Vertex<I, V, E> vertex, F input);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java
new file mode 100644
index 0000000..bc0f9c1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.vertex;
+
+import java.io.Serializable;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Function:
+ * (vertex) -> T
+ *
+ * A class that can supply objects of a single type, when given a vertex.
+ *
+ * (doesn't extend Function<Vertex<I, V, E>, T>, because of different
+ * method names)
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <T> Result type
+ */
+@SuppressWarnings("rawtypes")
+public interface SupplierFromVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, T> extends Serializable {
+  /**
+   * Retrieves an instance of the appropriate type, given a vertex.
+   * The returned object may or may not be a new instance,
+   * depending on the implementation.
+   */
+  T get(Vertex<I, V, E> vertex);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java
new file mode 100644
index 0000000..bd5b019
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for interfaces representing functions additionally
+ * performed on vertex values.
+ */
+package org.apache.giraph.function.vertex;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
new file mode 100644
index 0000000..6487d95
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import static org.junit.Assert.assertEquals;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class BlockTestingUtils {
+
+  BlockTestingUtils() { }
+
+  private static final int NUM_TRIALS = 10;
+  private static final int REPEAT_TIMES = 10;
+
+  private static int testSequential(Iterator<? extends AbstractPiece> referenceImpl,
+                                    Iterator<? extends AbstractPiece> testImpl) {
+    int length = 0;
+
+    CheckIterator checkIterator = new CheckIterator(referenceImpl, testImpl);
+    while (checkIterator.hasNext()) {
+      checkIterator.next();
+      length++;
+    }
+
+    System.out.println("Length is : " + length);
+    return length;
+  }
+
+  private static boolean anyHasNext(ArrayList<? extends Iterator> arr) {
+    for (Iterator t : arr) {
+      if (t.hasNext()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static void testRandom(int length,
+                                 Iterable<? extends AbstractPiece> referenceImpl,
+                                 Iterable<? extends AbstractPiece> testImpl) {
+    Random rand = new Random();
+
+    ArrayList<CheckIterator<AbstractPiece>> arr = new ArrayList<>();
+    IntArrayList lengths = new IntArrayList(NUM_TRIALS);
+    for (int i = 0; i < NUM_TRIALS; i++) {
+      lengths.add(0);
+    }
+    for (int i = 0; i < NUM_TRIALS; i++) {
+      arr.add(new CheckIterator(referenceImpl.iterator(), testImpl.iterator()));
+    }
+
+    int totalCount = 0;
+    while (anyHasNext(arr)) {
+      int index = rand.nextInt(NUM_TRIALS);
+      while (!arr.get(index).hasNext()) {
+        index = rand.nextInt(NUM_TRIALS);
+      }
+      CheckIterator it = arr.get(index);
+      it.next();
+      int itLength = lengths.getInt(index);
+      lengths.set(index, itLength + 1);
+      totalCount++;
+    }
+    assertEquals("TotalCount should be length * NUM_TRIALS", length * NUM_TRIALS, totalCount);
+    System.out.println("Final count is : " + totalCount);
+  }
+
+  /**
+   * Tests both the length of the iterator returned by the block, as-well as the deterministic behavior
+   * expected by calling .iterator() against the referenceImpl.
+   * @param referenceImpl : A list of pieces in the expected order
+   * @param testImpl : A list of pieces to test against (the Block)
+   */
+  public static void testIndependence(Iterable<? extends AbstractPiece> referenceImpl,
+                                      Iterable<? extends AbstractPiece> testImpl) {
+    int length = testSequential(referenceImpl.iterator(), testImpl.iterator());
+    testRandom(length, referenceImpl, testImpl);
+  }
+
+  /**
+   * Test how the block interacts with a repeatBlock. The expected result is to
+   * see the pieces in referenceImpl show up REPEAT_TIMES many times.
+   * @param referenceImpl : A list of pieces in the expected order
+   * @param block : The block to test
+   */
+  public static void testNestedRepeatBlock(Iterable<? extends AbstractPiece> referenceImpl, Block block) {
+    Block repeatBlock = new RepeatBlock(
+      REPEAT_TIMES,
+      block
+    );
+    testIndependence(
+            Iterables.concat(Collections.nCopies(REPEAT_TIMES, referenceImpl)),
+            repeatBlock
+    );
+  }
+
+  public static class CheckIterator<T> implements Iterator {
+
+    private final Iterator<T> fst;
+    private final Iterator<T> snd;
+
+    public CheckIterator(Iterator<T> fst, Iterator<T> snd) {
+      this.fst = fst;
+      this.snd = snd;
+    }
+
+    @Override
+    public boolean hasNext() {
+      boolean fstHasNxt = fst.hasNext();
+      boolean sndHasNxt = snd.hasNext();
+      Preconditions.checkArgument(fstHasNxt == sndHasNxt, "Expect hasNext() on " +
+              "both iterators to be identical. Got: " + fst.hasNext() + " and " + snd.hasNext());
+      return fstHasNxt;
+    }
+
+    @Override
+    public Object next() {
+      T fstNxt = fst.next();
+      T sndNxt = snd.next();
+      Preconditions.checkArgument(fstNxt == sndNxt, "Expect objs returned by " +
+              "both iterators to be identical. Got: " + fstNxt + " and " + sndNxt);
+      return fstNxt;
+    }
+
+    @Override
+    public void remove() {
+      throw new RuntimeException("Not implemented");
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
new file mode 100644
index 0000000..0dacae1
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+
+import java.util.Arrays;
+
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.function.Supplier;
+import org.junit.Test;
+
+public class TestIfBlock {
+
+  private static final Supplier<Boolean> TRUE_SUPPLIER = new Supplier<Boolean>() {
+    @Override
+    public Boolean get() {
+      return true;
+    }
+  };
+
+  private static final Supplier<Boolean> FALSE_SUPPLIER = new Supplier<Boolean>() {
+    @Override
+    public Boolean get() {
+      return false;
+    }
+  };
+
+  @Test
+  // Test short-circuiting the if -> then
+  public void testIfBlockThen() throws Exception {
+    Piece piece1 = new Piece();
+    Piece piece2 = new Piece();
+    Block ifBlock = new IfBlock(
+      TRUE_SUPPLIER,
+      new SequenceBlock(piece1, piece2)
+    );
+
+    BlockTestingUtils.testIndependence(
+        Arrays.asList(piece1, piece2),
+        ifBlock);
+  }
+
+  @Test
+  // Test short-circuiting the if -> else
+  public void testIfBlockElse() throws Exception {
+    Piece piece1 = new Piece();
+    Piece piece2 = new Piece();
+    Block ifBlock = new IfBlock(
+      FALSE_SUPPLIER,
+      new EmptyBlock(),
+      new SequenceBlock(piece1, piece2)
+    );
+
+    BlockTestingUtils.testIndependence(
+        Arrays.asList(piece1, piece2),
+        ifBlock);
+  }
+
+  @Test
+  public void testIfNestedInRepeat() throws Exception {
+    Piece piece1 = new Piece();
+    Piece piece2 = new Piece();
+    Block ifBlock = new IfBlock(
+      TRUE_SUPPLIER,
+      new SequenceBlock(piece1, piece2)
+    );
+
+    BlockTestingUtils.testNestedRepeatBlock(
+            Arrays.asList(piece1, piece2),
+            ifBlock);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
new file mode 100644
index 0000000..1e096ba
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests repeatBlock's correctness
+ */
+public class TestRepeatBlock {
+
+  public static final int REPEAT_TIMES = 5;
+
+  @Test
+  public void testRepeatBlockBasic() throws Exception {
+    Piece piece1 = new Piece();
+    Piece piece2 = new Piece();
+    Block innerBlock = new SequenceBlock(piece1, piece2);
+    Block repeatBlock = new RepeatBlock(
+            REPEAT_TIMES,
+            innerBlock
+    );
+    BlockTestingUtils.testIndependence(
+            Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
+            repeatBlock);
+  }
+
+  @Test
+  public void testNestedRepeatBlock() throws Exception {
+    Piece piece1 = new Piece();
+    Piece piece2 = new Piece();
+    Block innerBlock = new SequenceBlock(piece1, piece2);
+    Block repeatBlock = new RepeatBlock(
+            REPEAT_TIMES,
+            innerBlock
+    );
+    BlockTestingUtils.testNestedRepeatBlock(
+            Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
+            repeatBlock);
+  }
+
+  @Test
+  public void testRepeatBlockEmpty() throws Exception {
+    Block innerBlock = new EmptyBlock();
+    Block repeatBlock = new RepeatBlock(
+            REPEAT_TIMES,
+            innerBlock
+    );
+    List<? extends AbstractPiece> referenceImpl = Collections.emptyList();
+    BlockTestingUtils.testIndependence(
+            // Concatenating EmptyIterator = just EmptyIterator. No obj's to
+            // compare against either
+            referenceImpl,
+            repeatBlock);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
new file mode 100644
index 0000000..242d376
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.function.Supplier;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests repeatUntilBlock's correctness
+ */
+public class TestRepeatUntilBlock {
+
+  public static final int REPEAT_TIMES = 5;
+
+  private static final Supplier<Boolean> falseSupplier = new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return false;
+      }
+  };
+
+  @Test
+  public void testRepeatUntilBlockBasic() throws Exception {
+    Piece piece1 = new Piece();
+    Piece piece2 = new Piece();
+    Block innerBlock = new SequenceBlock(piece1, piece2);
+    Block repeatBlock = new RepeatUntilBlock(
+      REPEAT_TIMES,
+      innerBlock,
+      falseSupplier
+    );
+    BlockTestingUtils.testIndependence(
+      Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
+      repeatBlock);
+  }
+
+  @Test
+  public void testNestedRepeatUntilBlock() throws Exception {
+    Piece piece1 = new Piece();
+    Piece piece2 = new Piece();
+    Block innerBlock = new SequenceBlock(piece1, piece2);
+    Block repeatBlock = new RepeatUntilBlock(
+      REPEAT_TIMES,
+      innerBlock,
+      falseSupplier
+    );
+    BlockTestingUtils.testNestedRepeatBlock(
+      Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
+      repeatBlock);
+  }
+
+  @Test
+  public void testRepeatUntilBlockUnlimited() throws Exception {
+    Block innerBlock = new SequenceBlock(new Piece());
+    // Can't test with testIndependence - spin up our own test inline
+    Supplier<Boolean> countingSupplier = new Supplier<Boolean>() {
+      private int i = 0;
+
+      @Override
+      public Boolean get() {
+        i++;
+        return i > REPEAT_TIMES;
+      }
+    };
+    Block repeatBlock = RepeatUntilBlock.unlimited(
+      innerBlock,
+      countingSupplier
+    );
+    int count = 0;
+    Iterator<AbstractPiece> it = repeatBlock.iterator();
+    while (it.hasNext()) {
+      it.next();
+      count++;
+    }
+    assertEquals("Count must be equal to REPEAT_TIMES", REPEAT_TIMES, count);
+  }
+
+}