You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/11 04:50:42 UTC

[3/5] git commit: updated refs/heads/trunk to 819d6d3

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
new file mode 100644
index 0000000..fd38520
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.output;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.giraph.block_app.framework.api.BlockOutputApi;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Handler for blocks output - keeps track of outputs and writers created
+ */
+@SuppressWarnings("unchecked")
+public class BlockOutputHandle implements BlockOutputApi {
+  private transient Configuration conf;
+  private transient Progressable progressable;
+  private final Map<String, BlockOutputDesc> outputDescMap;
+  private final Map<String, Queue<BlockOutputWriter>> freeWriters;
+  private final Map<String, Queue<BlockOutputWriter>> occupiedWriters;
+
+  public BlockOutputHandle(String jobIdentifier, Configuration conf,
+      Progressable hadoopProgressable) {
+    outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(
+        conf, jobIdentifier);
+    freeWriters = new HashMap<>();
+    occupiedWriters = new HashMap<>();
+    for (String confOption : outputDescMap.keySet()) {
+      freeWriters.put(confOption,
+          new ConcurrentLinkedQueue<BlockOutputWriter>());
+      occupiedWriters.put(confOption,
+          new ConcurrentLinkedQueue<BlockOutputWriter>());
+    }
+    initialize(conf, hadoopProgressable);
+  }
+
+  public void initialize(Configuration conf, Progressable progressable) {
+    this.conf = conf;
+    this.progressable = progressable;
+  }
+
+
+  @Override
+  public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
+  OD getOutputDesc(String confOption) {
+    return (OD) outputDescMap.get(confOption);
+  }
+
+  @Override
+  public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
+    OW outputWriter = (OW) freeWriters.get(confOption).poll();
+    if (outputWriter == null) {
+      outputWriter = (OW) outputDescMap.get(confOption).createOutputWriter(
+          conf, progressable);
+    }
+    occupiedWriters.get(confOption).add(outputWriter);
+    return outputWriter;
+  }
+
+  public void returnAllWriters() {
+    for (Map.Entry<String, Queue<BlockOutputWriter>> entry :
+        occupiedWriters.entrySet()) {
+      freeWriters.get(entry.getKey()).addAll(entry.getValue());
+      entry.getValue().clear();
+    }
+  }
+
+  public void closeAllWriters() {
+    final Queue<BlockOutputWriter> allWriters = new ConcurrentLinkedQueue<>();
+    for (Queue<BlockOutputWriter> blockOutputWriters : freeWriters.values()) {
+      allWriters.addAll(blockOutputWriters);
+    }
+    if (allWriters.isEmpty()) {
+      return;
+    }
+    // Closing writers can take time - use multiple threads and call progress
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            BlockOutputWriter writer = allWriters.poll();
+            while (writer != null) {
+              writer.close();
+              writer = allWriters.poll();
+            }
+            return null;
+          }
+        };
+      }
+    };
+    ProgressableUtils.getResultsWithNCallables(callableFactory,
+        Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf),
+            allWriters.size()), "close-writers-%d", progressable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java
new file mode 100644
index 0000000..5f82612
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.output;
+
+import org.apache.giraph.block_app.framework.api.BlockOutputApi;
+import org.apache.giraph.conf.GiraphConfiguration;
+
+/**
+ * Block output option, with apis to use from application code
+ *
+ * @param <OD> Output description type
+ * @param <OW> Output writer type
+ */
+public class BlockOutputOption<OD extends BlockOutputDesc<OW>,
+    OW extends BlockOutputWriter> {
+  private final String key;
+
+  public BlockOutputOption(String key) {
+    this.key = key;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public void register(OD outputDesc, GiraphConfiguration conf) {
+    BlockOutputFormat.addOutputDesc(outputDesc, key, conf);
+  }
+
+  public OD getOutputDesc(BlockOutputApi outputApi) {
+    return outputApi.<OW, OD>getOutputDesc(key);
+  }
+
+  public OW getWriter(BlockOutputApi outputApi) {
+    return outputApi.getWriter(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java
new file mode 100644
index 0000000..5574ab4
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.output;
+
+import java.io.Closeable;
+
+/**
+ * Block output writer
+ */
+public interface BlockOutputWriter extends Closeable {
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java
new file mode 100644
index 0000000..11193b1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Multi-output support for Block Applications
+ */
+package org.apache.giraph.block_app.framework.output;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java
new file mode 100644
index 0000000..0b6934e
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Block application abstraction package.
+ *
+ * Giraph application is represented as a collection of pieces,
+ * aggregated via blocks, ultimately into a single block, that
+ * represents complete application execution.
+ */
+package org.apache.giraph.block_app.framework;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
new file mode 100644
index 0000000..882f4f1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.function.Consumer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Parent of all Pieces, contains comprehensive list of methods Piece
+ * can support. Specific subclasses should be extended directly,
+ * to simplify usage - most frequently for example Piece class.
+ *
+ * Single unit of execution, capturing:
+ * - sending and then receiving messages from vertices
+ * - sending data to be aggregated from workers to master
+ * - sending values from master, via aggregators, to workers
+ * - sending and receiving worker messages
+ *
+ *
+ * Order of execution is:
+ * - On master, once at the start of the application
+ * -- registerAggregators (deprecated, use registerReducers instead)
+ *
+ * - After masterCompute of previous piece, on master:
+ * -- registerReducers
+ *
+ * - Send logic on workers:
+ * -- getVertexSender per each worker thread, and on object returned by it:
+ * --- vertexSend on each vertex
+ * --- postprocess on each worker thread
+ * -- workerContextSend per worker
+ *
+ * - Logic on master:
+ * -- masterCompute
+ *
+ * - Receive logic on workers:
+ * -- workerContextReceive per worker
+ * -- getVertexReceiver per each worker thread, and on object returned by it:
+ * --- vertexReceive on each vertex
+ * --- postprocess on each worker thread
+ *
+ * And before everything, during initialization, registerAggregators.
+ *
+ * Only masterCompute and registerReducers/registerAggregators should modify
+ * the Piece, all of the worker methods should treat Piece as read-only.
+ *
+ * Each piece should be encapsulated unit of execution. Vertex value should be
+ * used as a single implicit "communication" channel between different pieces,
+ * all other dependencies should be explicitly defined and passed through
+ * constructor, via interfaces (as explained below).
+ * I.e. state of the vertex value is invariant that Pieces act upon.
+ * Best is not to depend on explicit vertex value class, but on interface that
+ * provides all needed functions, so that pieces can be freely combined,
+ * as long as vertex value implements appropriate ones.
+ * Similarly, use most abstract class you need - if Piece doesn't depend
+ * on edge value, don't use NullWritable, but Writable. Or if it doesn't
+ * depend on ExecutionStage, use Object for it.
+ *
+ * All other external dependencies should be explicitly passed through
+ * constructor, through interfaces.
+ *
+ * All Pieces will be created within one context - on the master.
+ * They are then going to be replicated across all workers, and across all
+ * threads within each worker, and will see everything that happens in global
+ * context (masterCompute) before them, including any state master has.
+ * Through ObjectHolder/ObjectTransfer, you can pass data between Pieces in
+ * global context, and from global context to worker functions of a Piece
+ * that happens in the future.
+ *
+ * VertexReceiver of previous Piece and VertexSender of next Piece live in
+ * the same context, and vertexReceive of the next Piece is executed
+ * immediately after vertexSend of the previous piece, before vertexSend is
+ * called on the next vertex.
+ * This detail allows you to have external dependency on each other through
+ * memory only mediator objects - like ObjectTransfer.
+ *
+ * All other logic going to live in different contexts,
+ * specifically VertexSender and VertexReceiver of the same Piece,
+ * or workerContextSend and VertexSender of the same Piece, and cannot interact
+ * with each other outside of changing the state of the graph or using
+ * global communication api.
+ *
+ * All methods on this class (or objects it returns) will be called serially,
+ * so there is no need for any Thread synchronization.
+ * Each Thread will have a complete deep copy of the Piece, to achieve that,
+ * so all static fields must be written to be Thread safe!
+ * (i.e. either immutable, or have synchronized/locked access to them)
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings({ "rawtypes" })
+public abstract class AbstractPiece<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable, WV,
+    WM extends Writable, S> implements Block {
+
+  // Overridable functions
+
+  // registerReducers(CreateReducersApi reduceApi, S executionStage)
+
+  /**
+   * Add automatic handling of reducers to registerReducers.
+   * Only for internal use.
+   */
+  public abstract void wrappedRegisterReducers(
+      BlockMasterApi masterApi, S executionStage);
+
+  // getVertexSender(BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage)
+
+  /**
+   * Add automatic handling of reducers to getVertexSender.
+   *
+   * Only for Framework internal use.
+   */
+  public abstract InnerVertexSender getWrappedVertexSender(
+      final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage);
+
+  /**
+   * Override to have worker context send computation.
+   *
+   * Called once per worker, after all vertices have been processed with
+   * getVertexSender.
+   */
+  public void workerContextSend(
+      BlockWorkerContextSendApi<WM> workerContextApi, S executionStage,
+      WV workerValue) {
+  }
+
+  /**
+   * Function that is called on master, after send phase, before receive phase.
+   *
+   * It can:
+   * - read aggregators sent from worker
+   * - do global processing
+   * - send data to workers through aggregators
+   */
+  public void masterCompute(BlockMasterApi masterApi, S executionStage) {
+  }
+
+  /**
+   * Override to have worker context receive computation.
+   *
+   * Called once per worker, before all vertices are going to be processed
+   * with getVertexReceiver.
+   */
+  public void workerContextReceive(
+      BlockWorkerContextReceiveApi workerContextApi, S executionStage,
+      WV workerValue, List<WM> workerMessages) {
+  }
+
+  /**
+   * Override to do vertex receive processing.
+   *
+   * Creates handler that defines what should be executed on worker
+   * for each vertex during receive phase.
+   *
+   * This logic executed last.
+   * This function is called once on each worker on each thread, in parallel,
+   * on their copy of Piece object to create functions handler.
+   *
+   * If returned object implements Postprocessor interface, then corresponding
+   * postprocess() function is going to be called once, after all vertices
+   * corresponding thread needed to process are done.
+   */
+  public VertexReceiver<I, V, E, M> getVertexReceiver(
+      BlockWorkerReceiveApi<I> workerApi, S executionStage) {
+    return null;
+  }
+
+  /**
+   * Returns MessageClasses definition for messages being sent by this Piece.
+   */
+  public abstract MessageClasses<I, M> getMessageClasses(
+      ImmutableClassesGiraphConfiguration conf);
+
+  /**
+   * Override to provide different next execution stage for
+   * Pieces that come after it.
+   *
+   * Execution stage should be immutable, and this function should be
+   * returning a new object, if it needs to return different value.
+   *
+   * It affects pieces that come after this piece,
+   * and isn't applied to execution stage this piece sees.
+   */
+  public S nextExecutionStage(S executionStage) {
+    return executionStage;
+  }
+
+  /**
+   * Override to register any potential aggregators used by this piece.
+   *
+   * @deprecated Use registerReducers instead.
+   */
+  @Deprecated
+  public void registerAggregators(BlockMasterApi masterApi)
+      throws InstantiationException, IllegalAccessException {
+  }
+
+  // Inner classes
+
+  /** Inner class to provide clean use without specifying types */
+  public abstract class InnerVertexSender
+      implements VertexSender<I, V, E>, VertexPostprocessor {
+    @Override
+    public void postprocess() { }
+  }
+
+  /** Inner class to provide clean use without specifying types */
+  public abstract class InnerVertexReceiver
+      implements VertexReceiver<I, V, E, M>, VertexPostprocessor {
+    @Override
+    public void postprocess() { }
+  }
+
+  // Internal implementation
+
+  @Override
+  public final Iterator<AbstractPiece> iterator() {
+    return Iterators.<AbstractPiece>singletonIterator(this);
+  }
+
+  @Override
+  public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+    consumer.apply(this);
+  }
+
+  @Override
+  public String toString() {
+    String name = getClass().getSimpleName();
+    if (name.isEmpty()) {
+      name = getClass().getName();
+    }
+    return name;
+  }
+
+
+  // make hashCode and equals final, forcing them to be based on
+  // reference identity.
+  @Override
+  public final int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public final boolean equals(Object obj) {
+    return super.equals(obj);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java
new file mode 100644
index 0000000..0963efb
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper;
+import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses;
+import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf;
+import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.DefaultMessageFactorySupplierFromConf;
+import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.SupplierFromConfByCopy;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.conf.EnumConfOption;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Additional abstract implementations for all pieces to be used.
+ * Code here is not in AbstractPiece only to allow for non-standard
+ * non-user-defined pieces. <br>
+ * Only logic used by the underlying framework directly is in AbstractPiece
+ * itself.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public abstract class DefaultParentPiece<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable, WV,
+    WM extends Writable, S> extends AbstractPiece<I, V, E, M, WV, WM, S> {
+  // TODO move to GiraphConstants
+  /**
+   * This option will tell which message encode & store enum to force, when
+   * combining is not enabled.
+   *
+   * MESSAGE_ENCODE_AND_STORE_TYPE and this property are basically upper
+   * and lower bound on message store type, when looking them in order from
+   * not doing anything special, to most advanced type:
+   * BYTEARRAY_PER_PARTITION,
+   * EXTRACT_BYTEARRAY_PER_PARTITION,
+   * POINTER_LIST_PER_VERTEX
+   * resulting encode type is going to be:
+   * pieceEncodingType = piece.allowOneMessageToManyIdsEncoding() ?
+   *    POINTER_LIST_PER_VERTEX : BYTEARRAY_PER_PARTITION)
+   * Math.max(index(minForce), Math.min(index(maxAllowed), index(pieceType);
+   *
+   * This is useful to force all pieces onto particular message store, even
+   * if they do not overrideallowOneMessageToManyIdsEncoding, though that might
+   * be rarely needed.
+   * This option might be more useful for fully local computation,
+   * where overall job behavior is quite different.
+   */
+  public static final EnumConfOption<MessageEncodeAndStoreType>
+  MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE =
+      EnumConfOption.create("giraph.messageEncodeAndStoreTypeMinForce",
+          MessageEncodeAndStoreType.class,
+          MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
+          "Select the message_encode_and_store_type min force to use");
+
+  private final ReduceUtilsObject reduceUtils = new ReduceUtilsObject();
+  private ReducersForPieceHandler reducersHandler;
+
+  // Overridable functions
+
+  /**
+   * Override to register any potential reducers used by this piece,
+   * through calls to {@code reduceApi}, which will return reducer handles
+   * for simple.
+   * <br/>
+   * Tip: Without defining a field, first write here name of the field and what
+   * you want to reduce, like:
+   * <br/>
+   * {@code totalSum = reduceApi.createLocalReducer(SumReduce.DOUBLE); }
+   * <br/>
+   * and then use tools your IDE provides to generate field signature itself,
+   * which might be slightly complex:
+   * <br/>
+   * {@code ReducerHandle<DoubleWritable, DoubleWritable> totalSum; }
+   */
+  public void registerReducers(CreateReducersApi reduceApi, S executionStage) {
+  }
+
+  /**
+   * Override to do vertex send processing.
+   *
+   * Creates handler that defines what should be executed on worker
+   * during send phase.
+   *
+   * This logic gets executed first.
+   * This function is called once on each worker on each thread, in parallel,
+   * on their copy of Piece object to create functions handler.
+   *
+   * If returned object implements Postprocessor interface, then corresponding
+   * postprocess() function is going to be called once, after all vertices
+   * corresponding thread needed to process are done.
+   */
+  public VertexSender<I, V, E> getVertexSender(
+      BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+    return null;
+  }
+
+  /**
+   * Override to specify type of the message this Piece sends, if it does
+   * send messages.
+   *
+   * If not overwritten, no messages can be sent.
+   */
+  protected Class<M> getMessageClass() {
+    return null;
+  }
+
+  /**
+   * Override to specify message value factory to be used,
+   * which creates objects into which messages will be deserialized.
+   *
+   * If not overwritten, or null is returned, DefaultMessageValueFactory
+   * will be used.
+   */
+  protected MessageValueFactory<M> getMessageFactory(
+      ImmutableClassesGiraphConfiguration conf) {
+    return null;
+  }
+
+  /**
+   * Override to specify message combiner to be used, if any.
+   *
+   * Message combiner itself should be immutable
+   * (i.e. it will be call simultanously from multiple threads)
+   */
+  protected MessageCombiner<? super I, M> getMessageCombiner(
+      ImmutableClassesGiraphConfiguration conf) {
+    return null;
+  }
+
+  /**
+   * Override to specify that this Piece allows one to many ids encoding to be
+   * used for messages.
+   * You should override this function, if you are sending identical message to
+   * all targets, and message itself is not extremely small.
+   */
+  protected boolean allowOneMessageToManyIdsEncoding() {
+    return false;
+  }
+
+  @Override
+  public MessageClasses<I, M> getMessageClasses(
+      ImmutableClassesGiraphConfiguration conf) {
+    Class<M> messageClass = null;
+    MessageValueFactory<M> messageFactory = getMessageFactory(conf);
+    MessageCombiner<? super I, M> messageCombiner = getMessageCombiner(conf);
+
+    if (messageFactory != null) {
+      messageClass = (Class) messageFactory.newInstance().getClass();
+    } else if (messageCombiner != null) {
+      messageClass = (Class) messageCombiner.createInitialMessage().getClass();
+    }
+
+    if (messageClass != null) {
+      Preconditions.checkState(getMessageClass() == null,
+          "Piece %s defines getMessageFactory or getMessageCombiner, " +
+          "so it doesn't need to define getMessageClass.",
+          toString());
+    } else {
+      messageClass = getMessageClass();
+      if (messageClass == null) {
+        messageClass = (Class) NoMessage.class;
+      }
+    }
+
+    SupplierFromConf<MessageValueFactory<M>> messageFactorySupplier;
+    if (messageFactory != null) {
+      messageFactorySupplier =
+          new SupplierFromConfByCopy<MessageValueFactory<M>>(messageFactory);
+    } else {
+      messageFactorySupplier =
+          new DefaultMessageFactorySupplierFromConf<>(messageClass);
+    }
+
+    SupplierFromConf<? extends MessageCombiner<? super I, M>>
+    messageCombinerSupplier;
+    if (messageCombiner != null) {
+      messageCombinerSupplier = new SupplierFromConfByCopy<>(messageCombiner);
+    } else {
+      messageCombinerSupplier = null;
+    }
+
+    int maxAllowed =
+        GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf).ordinal();
+    int minForce = MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE.get(conf).ordinal();
+    Preconditions.checkState(maxAllowed >= minForce);
+
+    int pieceEncodeType = (allowOneMessageToManyIdsEncoding() ?
+        MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX :
+        MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION).ordinal();
+    // bound piece type with boundaries:
+    pieceEncodeType = Math.max(minForce, Math.min(maxAllowed, pieceEncodeType));
+
+    MessageEncodeAndStoreType messageEncodeAndStoreType =
+        MessageEncodeAndStoreType.values()[pieceEncodeType];
+
+    if (messageFactory instanceof GiraphConfigurationSettable) {
+      throw new IllegalStateException(
+          messageFactory.getClass() + " MessageFactory in " + this +
+          " Piece implements GiraphConfigurationSettable");
+    }
+    if (messageCombiner instanceof GiraphConfigurationSettable) {
+      throw new IllegalStateException(
+          messageCombiner.getClass() + " MessageCombiner in " + this +
+          " Piece implements GiraphConfigurationSettable");
+    }
+
+    return new ObjectMessageClasses<>(
+        messageClass, messageFactorySupplier,
+        messageCombinerSupplier, messageEncodeAndStoreType);
+  }
+
+  // Internal implementation
+
+  @Override
+  public final InnerVertexSender getWrappedVertexSender(
+      final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+    reducersHandler.vertexSenderWorkerPreprocess(workerApi);
+    final VertexSender<I, V, E> functions =
+        getVertexSender(workerApi, executionStage);
+    return new InnerVertexSender() {
+      @Override
+      public void vertexSend(Vertex<I, V, E> vertex) {
+        if (functions != null) {
+          functions.vertexSend(vertex);
+        }
+      }
+      @Override
+      public void postprocess() {
+        if (functions instanceof VertexPostprocessor) {
+          ((VertexPostprocessor) functions).postprocess();
+        }
+        reducersHandler.vertexSenderWorkerPostprocess(workerApi);
+      }
+    };
+  }
+
+  @Override
+  public final void wrappedRegisterReducers(
+      BlockMasterApi masterApi, S executionStage) {
+    reducersHandler = new ReducersForPieceHandler();
+    registerReducers(new CreateReducersApiWrapper(
+        masterApi, reducersHandler), executionStage);
+  }
+
+  // utility functions:
+  // TODO Java8 - move these as default functions to VertexSender interface
+  protected final void reduceDouble(
+      ReducerHandle<DoubleWritable, ?> reduceHandle, double value) {
+    reduceUtils.reduceDouble(reduceHandle, value);
+  }
+
+  protected final void reduceFloat(
+      ReducerHandle<FloatWritable, ?> reduceHandle, float value) {
+    reduceUtils.reduceFloat(reduceHandle, value);
+  }
+
+  protected final void reduceLong(
+      ReducerHandle<LongWritable, ?> reduceHandle, long value) {
+    reduceUtils.reduceLong(reduceHandle, value);
+  }
+
+  protected final void reduceInt(
+      ReducerHandle<IntWritable, ?> reduceHandle, int value) {
+    reduceUtils.reduceInt(reduceHandle, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
new file mode 100644
index 0000000..3ad66d1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece;
+
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Piece that should be extended in common usecases, when we want to be:
+ * - sending and then receiving messages from vertices
+ * - sending data to be aggregated from workers to master
+ * - sending values from master, via aggregators, to workers
+ *
+ * (basically - we don't want to use WorkerContext)
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings("rawtypes")
+public class Piece<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable, S>
+    extends DefaultParentPiece<I, V, E, M, Object, NoMessage, S> {
+
+  // Disallowing use of Worker Context functions:
+  @Override
+  public final void workerContextSend(
+      BlockWorkerContextSendApi<NoMessage> workerContextApi,
+      S executionStage, Object workerValue) {
+  }
+
+  @Override
+  public final void workerContextReceive(
+      BlockWorkerContextReceiveApi workerContextApi,
+      S executionStage, Object workerValue, List<NoMessage> workerMessages) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java
new file mode 100644
index 0000000..a5d0c8c
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Piece that should be extended when WorkerContext is used.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings("rawtypes")
+public class PieceWithWorkerContext<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable, WV,
+    WM extends Writable, S>
+    extends DefaultParentPiece<I, V, E, M, WV, WM, S> {
+
+  /**
+   * Get global worker value.
+   * Value returned can be accessed from may threads, and so all
+   * accesses to it should be done in a thread-safe manner!
+   *
+   * This is the only place in Blocks Framework where you need
+   * to take care of concurrency.
+   */
+  @SuppressWarnings("unchecked")
+  public WV getWorkerValue(BlockWorkerApi<I> workerApi) {
+    return (WV) ((BlockWorkerValueAccessor) workerApi).getWorkerValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
new file mode 100644
index 0000000..23c2d29
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.delegate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Delegate Piece which allows combining multiple pieces in same iteration:
+ * new DelegatePiece(new LogicPiece(), new StatsPiece())
+ * You should be careful when doing so, since those pieces must not interact,
+ * and only one can send messages.
+ * Execution of any of the Piece methods by the framework is going to trigger
+ * sequential execution of that method on all of the pieces this DelegatePiece
+ * wraps. That means for example, getVertexSender is going to be called on all
+ * pieces before masterCompute is called on all pieces, which is called before
+ * getVertexReceiver on all pieces.
+ *
+ * Also, via overriding, it provides an abstract class for filtering. I.e. if
+ * you want piece that filters out calls to masterCompute, you can have:
+ * new FilterMasterPiece(new LogicPiece()),
+ * with FilterMasterPiece extends DelegatePiece, and only overrides getMaster
+ * function and DelegateMasterPiece class.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings("rawtypes")
+public class DelegatePiece<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable, WV, WM extends Writable, S>
+    extends AbstractPiece<I, V, E, M, WV, WM, S> {
+
+  private final List<AbstractPiece<I, V, E, M, WV, WM, S>> innerPieces;
+
+  @SafeVarargs
+  @SuppressWarnings("unchecked")
+  public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
+      ? super M, ? super WV, ? super WM, ? super S>... innerPieces) {
+    // Pieces are contravariant, but Java generics cannot express that,
+    // so use unchecked cast inside to allow callers to be typesafe
+    this.innerPieces = new ArrayList(Arrays.asList(innerPieces));
+  }
+
+  @SuppressWarnings("unchecked")
+  public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
+      ? super M, ? super WV, ? super WM, ? super S> innerPiece) {
+    // Pieces are contravariant, but Java generics cannot express that,
+    // so use unchecked cast inside to allow callers to be typesafe
+    this.innerPieces = new ArrayList(Arrays.asList(innerPiece));
+  }
+
+  protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
+      ArrayList<InnerVertexSender> workerSendFunctions,
+      BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+    return new DelegateWorkerSendFunctions(workerSendFunctions);
+  }
+
+  protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
+      ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
+      BlockWorkerReceiveApi<I> workerApi, S executionStage) {
+    return new DelegateWorkerReceiveFunctions(workerReceiveFunctions);
+  }
+
+  @Override
+  public InnerVertexSender getWrappedVertexSender(
+      BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+    ArrayList<InnerVertexSender> workerSendFunctions =
+        new ArrayList<>(innerPieces.size());
+    for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+      workerSendFunctions.add(
+          innerPiece.getWrappedVertexSender(workerApi, executionStage));
+    }
+    return delegateWorkerSendFunctions(
+        workerSendFunctions, workerApi, executionStage);
+  }
+
+  @Override
+  public InnerVertexReceiver getVertexReceiver(
+      BlockWorkerReceiveApi<I> workerApi, S executionStage) {
+    ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions =
+        new ArrayList<>(innerPieces.size());
+    for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+      workerReceiveFunctions.add(
+          innerPiece.getVertexReceiver(workerApi, executionStage));
+    }
+    return delegateWorkerReceiveFunctions(
+        workerReceiveFunctions, workerApi, executionStage);
+  }
+
+  /** Delegating WorkerSendPiece */
+  protected class DelegateWorkerSendFunctions extends InnerVertexSender {
+    private final ArrayList<InnerVertexSender> workerSendFunctions;
+
+    public DelegateWorkerSendFunctions(
+        ArrayList<InnerVertexSender> workerSendFunctions) {
+      this.workerSendFunctions = workerSendFunctions;
+    }
+
+    @Override
+    public void vertexSend(Vertex<I, V, E> vertex) {
+      for (InnerVertexSender functions : workerSendFunctions) {
+        if (functions != null) {
+          functions.vertexSend(vertex);
+        }
+      }
+    }
+
+    @Override
+    public void postprocess() {
+      for (InnerVertexSender functions : workerSendFunctions) {
+        if (functions != null) {
+          functions.postprocess();
+        }
+      }
+    }
+  }
+
+  /** Delegating WorkerReceivePiece */
+  protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver {
+    private final ArrayList<VertexReceiver<I, V, E, M>>
+    workerReceiveFunctions;
+
+    public DelegateWorkerReceiveFunctions(
+        ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions) {
+      this.workerReceiveFunctions = workerReceiveFunctions;
+    }
+
+    @Override
+    public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
+      for (VertexReceiver<I, V, E, M> functions :
+            workerReceiveFunctions) {
+        if (functions != null) {
+          functions.vertexReceive(vertex, messages);
+        }
+      }
+    }
+
+    @Override
+    public void postprocess() {
+      for (VertexReceiver<I, V, E, M> functions :
+            workerReceiveFunctions) {
+        if (functions instanceof VertexPostprocessor) {
+          ((VertexPostprocessor) functions).postprocess();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void masterCompute(BlockMasterApi api, S executionStage) {
+    for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
+      piece.masterCompute(api, executionStage);
+    }
+  }
+
+  @Override
+  public void workerContextSend(
+      BlockWorkerContextSendApi<WM> workerContextApi, S executionStage,
+      WV workerValue) {
+    for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
+      piece.workerContextSend(workerContextApi, executionStage, workerValue);
+    }
+  }
+
+  @Override
+  public void workerContextReceive(
+      BlockWorkerContextReceiveApi workerContextApi, S executionStage,
+      WV workerValue, List<WM> workerMessages) {
+    for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
+      piece.workerContextReceive(
+          workerContextApi, executionStage, workerValue, workerMessages);
+    }
+  }
+
+  @Override
+  public S nextExecutionStage(S executionStage) {
+    for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+      executionStage = innerPiece.nextExecutionStage(executionStage);
+    }
+    return executionStage;
+  }
+
+  @Override
+  public MessageClasses<I, M> getMessageClasses(
+      ImmutableClassesGiraphConfiguration conf) {
+    MessageClasses<I, M> messageClasses = null;
+    MessageClasses<I, M> firstMessageClasses = null;
+    for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+      MessageClasses<I, M> cur = innerPiece.getMessageClasses(conf);
+      Preconditions.checkState(cur != null);
+      if (!cur.getMessageClass().equals(NoMessage.class)) {
+        if (messageClasses != null) {
+          throw new RuntimeException(
+              "Only one piece combined through delegate (" +
+              toString() + ") can send messages");
+        }
+        messageClasses = cur;
+      }
+      if (firstMessageClasses == null) {
+        firstMessageClasses = cur;
+      }
+    }
+    return messageClasses != null ? messageClasses : firstMessageClasses;
+  }
+
+  @Override
+  public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+    for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+      innerPiece.forAllPossiblePieces(consumer);
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void registerAggregators(BlockMasterApi master)
+      throws InstantiationException, IllegalAccessException {
+    for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+      innerPiece.registerAggregators(master);
+    }
+  }
+
+  @Override
+  public void wrappedRegisterReducers(
+      BlockMasterApi masterApi, S executionStage) {
+    for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+      innerPiece.wrappedRegisterReducers(masterApi, executionStage);
+    }
+  }
+
+  protected String delegationName() {
+    return "Delegate";
+  }
+
+  @Override
+  public String toString() {
+    return delegationName() + innerPieces.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java
new file mode 100644
index 0000000..5c702c5
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.delegate;
+
+import java.util.ArrayList;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Piece which uses a provided suppliers to decide whether or not to run
+ * receive/send piece part on a certain vertex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings({ "rawtypes" })
+public class FilteringPiece<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable, WV, WM extends Writable, S>
+    extends DelegatePiece<I, V, E, M, WV, WM, S> {
+  private final SupplierFromVertex<I, V, E, Boolean> toCallSend;
+  private final SupplierFromVertex<I, V, E, Boolean> toCallReceive;
+
+  /**
+   * Creates filtering piece which uses passed {@code toCallSend} to filter
+   * calls to {@code vertexSend}, and passed {@code toCallReceive} to filter
+   * calls to {@code vertexReceive}, on passed {@code innerPiece}.
+   */
+  @SuppressWarnings("unchecked")
+  public FilteringPiece(
+      SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
+      SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
+        toCallReceive,
+      AbstractPiece<? super I, ? super V, ? super E, ? super M,
+        ? super WV, ? super WM, ? super S> innerPiece) {
+    super(innerPiece);
+    // Suppliers are contravariant on vertex types,
+    // but Java generics cannot express that,
+    // so use unchecked cast inside to allow callers to be typesafe
+    this.toCallSend = (SupplierFromVertex) toCallSend;
+    this.toCallReceive = (SupplierFromVertex) toCallReceive;
+    Preconditions.checkArgument(
+        toCallSend != null || toCallReceive != null,
+        "Both send and receive filter cannot be null");
+  }
+
+  /**
+   * Creates filtering piece, where both vertexSend and vertexReceive is
+   * filtered based on same supplier.
+   */
+  public FilteringPiece(
+      SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
+        toCallSendAndReceive,
+      AbstractPiece<? super I, ? super V, ? super E, ? super M,
+        ? super WV, ? super WM, ? super S> innerPiece) {
+    this(toCallSendAndReceive, toCallSendAndReceive, innerPiece);
+  }
+
+  /**
+   * Creates filtering piece, that filters only vertexReceive function,
+   * and always calls vertexSend function.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable, WV, WM extends Writable, S>
+  FilteringPiece<I, V, E, M, WV, WM, S> createReceiveFiltering(
+      SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
+        toCallReceive,
+      AbstractPiece<? super I, ? super V, ? super E, ? super M,
+        ? super WV, ? super WM, ? super S> innerPiece) {
+    return new FilteringPiece<>(null, toCallReceive, innerPiece);
+  }
+
+  /**
+   * Creates filtering block, that filters only vertexSend function,
+   * and always calls vertexReceive function.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable, WV, WM extends Writable, S>
+  FilteringPiece<I, V, E, M, WV, WM, S> createSendFiltering(
+      SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
+      AbstractPiece<? super I, ? super V, ? super E, ? super M, ? super WV,
+        ? super WM, ? super S> innerPiece) {
+    return new FilteringPiece<>(toCallSend, null, innerPiece);
+  }
+
+  @Override
+  protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
+      ArrayList<InnerVertexSender> workerSendFunctions,
+      BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+    return new DelegateWorkerSendFunctions(workerSendFunctions) {
+      @Override
+      public void vertexSend(Vertex<I, V, E> vertex) {
+        if (toCallSend == null || toCallSend.get(vertex)) {
+          super.vertexSend(vertex);
+        }
+      }
+    };
+  }
+
+  @Override
+  protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
+      ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
+      BlockWorkerReceiveApi<I> workerApi, S executionStage) {
+    return new DelegateWorkerReceiveFunctions(workerReceiveFunctions) {
+      @Override
+      public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
+        if (toCallReceive == null || toCallReceive.get(vertex)) {
+          super.vertexReceive(vertex, messages);
+        }
+      }
+    };
+  }
+
+  @Override
+  protected String delegationName() {
+    if (toCallSend != null && toCallReceive != null) {
+      if (toCallSend != toCallReceive) {
+        return "AsymFilter";
+      }
+      return "Filter";
+    } else if (toCallSend != null) {
+      return "SendFilter";
+    } else if (toCallReceive != null) {
+      return "ReceiveFilter";
+    } else {
+      throw new IllegalStateException("Both Send and Receive filters are null");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java
new file mode 100644
index 0000000..f367e6c
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Pieces that delegate their work to a set of one or multiple other Pieces.
+ */
+package org.apache.giraph.block_app.framework.piece.delegate;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java
new file mode 100644
index 0000000..f18d1f4
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;
+
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * Handle to a broadcast.
+ *
+ * @param <T> Value type
+ */
+public interface BroadcastHandle<T> {
+  /** Get broadcasted value */
+  T getBroadcast(WorkerBroadcastUsage worker);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java
new file mode 100644
index 0000000..bbec1c6
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Utility object with common primitive reduce operations,
+ * without need to create reusable objects within the piece.
+ */
+public class ReduceUtilsObject {
+  private final DoubleWritable reusableDouble = new DoubleWritable();
+  private final FloatWritable reusableFloat = new FloatWritable();
+  private final LongWritable reusableLong = new LongWritable();
+  private final IntWritable reusableInt = new IntWritable();
+
+  // utility functions:
+  public void reduceDouble(
+      ReducerHandle<DoubleWritable, ?> reduceHandle, double value) {
+    DoubleWritable tmp = reusableDouble;
+    tmp.set(value);
+    reduceHandle.reduce(tmp);
+  }
+
+  public void reduceFloat(
+      ReducerHandle<FloatWritable, ?> reduceHandle, float value) {
+    FloatWritable tmp = reusableFloat;
+    tmp.set(value);
+    reduceHandle.reduce(tmp);
+  }
+
+  public void reduceLong(
+      ReducerHandle<LongWritable, ?> reduceHandle, long value) {
+    LongWritable tmp = reusableLong;
+    tmp.set(value);
+    reduceHandle.reduce(tmp);
+  }
+
+  public void reduceInt(ReducerHandle<IntWritable, ?> reduceHandle, int value) {
+    IntWritable tmp = reusableInt;
+    tmp.set(value);
+    reduceHandle.reduce(tmp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java
new file mode 100644
index 0000000..921c863
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * Handle that wraps both reducerHandle and broadcastHandle, so callers
+ * don't need to have two fields.
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public class ReducerAndBroadcastWrapperHandle<S, R> {
+  private ReducerHandle<S, R> reducerHandle;
+  private BroadcastHandle<R> broadcastHandle;
+
+  /** Set reducer handle to just registered handle */
+  public void registeredReducer(ReducerHandle<S, R> reducerHandle) {
+    this.reducerHandle = reducerHandle;
+  }
+
+  /** Reduce single value */
+  public void reduce(S valueToReduce) {
+    reducerHandle.reduce(valueToReduce);
+  }
+
+  /** Get reduced value */
+  public R getReducedValue(MasterGlobalCommUsage master) {
+    return reducerHandle.getReducedValue(master);
+  }
+
+  /**
+   * Broadcast reduced value from master
+   */
+  public void broadcastValue(BlockMasterApi master) {
+    broadcastHandle = reducerHandle.broadcastValue(master);
+  }
+
+  /** Get broadcasted value */
+  public R getBroadcast(WorkerBroadcastUsage worker) {
+    return broadcastHandle.getBroadcast(worker);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java
new file mode 100644
index 0000000..dae40f2
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+
+/**
+ * Handle to a reducer.
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public interface ReducerHandle<S, R> {
+  /** Reduce single value */
+  void reduce(S valueToReduce);
+  /** Get reduced value */
+  R getReducedValue(MasterGlobalCommUsage master);
+
+  /**
+   * Broadcast reduced value from master
+   *
+   * @return Handle to the broadcasted value.
+   */
+  BroadcastHandle<R> broadcastValue(BlockMasterApi master);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java
new file mode 100644
index 0000000..7ee54cb
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.array;
+
+/**
+ * Handle to array of handles underneath
+ *
+ * @param <T> Value type
+ */
+public interface ArrayHandle<T> {
+  /**
+   * Get value at index.
+   */
+  T get(int index);
+
+  /**
+   * Size of this array if defined up front, or throws
+   * UnsupportedOperationException if size is dynamic
+   */
+  int getStaticSize();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java
new file mode 100644
index 0000000..bf0d333
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.array;
+
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * Handle to array of broadcasts
+ *
+ * @param <T> Value type
+ */
+public interface BroadcastArrayHandle<T>
+  extends ArrayHandle<BroadcastHandle<T>> {
+
+  /**
+   * Number of elements that were broadcasted.
+   */
+  int getBroadcastedSize(WorkerBroadcastUsage worker);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java
new file mode 100644
index 0000000..a4b99ac
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.array;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+
+/**
+ * Handle to array of reducers
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public interface ReducerArrayHandle<S, R>
+    extends ArrayHandle<ReducerHandle<S, R>> {
+
+  /**
+   * Number of elements that were reduced.
+   */
+  int getReducedSize(BlockMasterApi master);
+
+  /**
+   * Broadcast whole array of reducers to master
+   *
+   * @return Handle to the broadcasted array.
+   */
+  BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java
new file mode 100644
index 0000000..a8beb85
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Interfaces representing arrays of individual handles.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.array;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java
new file mode 100644
index 0000000..c2cc0f2
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.internal;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Wrapping masterApi and reducers handler into API for creating reducer
+ * handles.
+ */
+public class CreateReducersApiWrapper implements CreateReducersApi {
+  private final BlockMasterApi masterApi;
+  private final ReducersForPieceHandler reducersApi;
+
+  public CreateReducersApiWrapper(
+      BlockMasterApi masterApi, ReducersForPieceHandler reducersApi) {
+    this.masterApi = masterApi;
+    this.reducersApi = reducersApi;
+  }
+
+  @Override
+  public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer(
+      ReduceOperation<S, R> reduceOp) {
+    return reducersApi.createLocalReducer(
+        masterApi, reduceOp, reduceOp.createInitialValue());
+  }
+
+  @Override
+  public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer(
+      ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+    return reducersApi.createLocalReducer(
+        masterApi, reduceOp, globalInitialValue);
+  }
+
+  @Override
+  public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer(
+      ReduceOperation<S, R> reduceOp) {
+    return reducersApi.createGlobalReducer(
+        masterApi, reduceOp, reduceOp.createInitialValue());
+  }
+
+  @Override
+  public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer(
+      ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+    return reducersApi.createGlobalReducer(
+        masterApi, reduceOp, globalInitialValue);
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<?, ?, ?> getConf() {
+    return masterApi.getConf();
+  }
+}