You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/11 04:50:42 UTC
[3/5] git commit: updated refs/heads/trunk to 819d6d3
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
new file mode 100644
index 0000000..fd38520
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.output;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.giraph.block_app.framework.api.BlockOutputApi;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Handler for blocks output - keeps track of outputs and writers created
+ */
+@SuppressWarnings("unchecked")
+public class BlockOutputHandle implements BlockOutputApi {
+ private transient Configuration conf;
+ private transient Progressable progressable;
+ private final Map<String, BlockOutputDesc> outputDescMap;
+ private final Map<String, Queue<BlockOutputWriter>> freeWriters;
+ private final Map<String, Queue<BlockOutputWriter>> occupiedWriters;
+
+ public BlockOutputHandle(String jobIdentifier, Configuration conf,
+ Progressable hadoopProgressable) {
+ outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(
+ conf, jobIdentifier);
+ freeWriters = new HashMap<>();
+ occupiedWriters = new HashMap<>();
+ for (String confOption : outputDescMap.keySet()) {
+ freeWriters.put(confOption,
+ new ConcurrentLinkedQueue<BlockOutputWriter>());
+ occupiedWriters.put(confOption,
+ new ConcurrentLinkedQueue<BlockOutputWriter>());
+ }
+ initialize(conf, hadoopProgressable);
+ }
+
+ public void initialize(Configuration conf, Progressable progressable) {
+ this.conf = conf;
+ this.progressable = progressable;
+ }
+
+
+ @Override
+ public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
+ OD getOutputDesc(String confOption) {
+ return (OD) outputDescMap.get(confOption);
+ }
+
+ @Override
+ public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
+ OW outputWriter = (OW) freeWriters.get(confOption).poll();
+ if (outputWriter == null) {
+ outputWriter = (OW) outputDescMap.get(confOption).createOutputWriter(
+ conf, progressable);
+ }
+ occupiedWriters.get(confOption).add(outputWriter);
+ return outputWriter;
+ }
+
+ public void returnAllWriters() {
+ for (Map.Entry<String, Queue<BlockOutputWriter>> entry :
+ occupiedWriters.entrySet()) {
+ freeWriters.get(entry.getKey()).addAll(entry.getValue());
+ entry.getValue().clear();
+ }
+ }
+
+ public void closeAllWriters() {
+ final Queue<BlockOutputWriter> allWriters = new ConcurrentLinkedQueue<>();
+ for (Queue<BlockOutputWriter> blockOutputWriters : freeWriters.values()) {
+ allWriters.addAll(blockOutputWriters);
+ }
+ if (allWriters.isEmpty()) {
+ return;
+ }
+ // Closing writers can take time - use multiple threads and call progress
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ BlockOutputWriter writer = allWriters.poll();
+ while (writer != null) {
+ writer.close();
+ writer = allWriters.poll();
+ }
+ return null;
+ }
+ };
+ }
+ };
+ ProgressableUtils.getResultsWithNCallables(callableFactory,
+ Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf),
+ allWriters.size()), "close-writers-%d", progressable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java
new file mode 100644
index 0000000..5f82612
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.output;
+
+import org.apache.giraph.block_app.framework.api.BlockOutputApi;
+import org.apache.giraph.conf.GiraphConfiguration;
+
+/**
+ * Block output option, with apis to use from application code
+ *
+ * @param <OD> Output description type
+ * @param <OW> Output writer type
+ */
+public class BlockOutputOption<OD extends BlockOutputDesc<OW>,
+ OW extends BlockOutputWriter> {
+ private final String key;
+
+ public BlockOutputOption(String key) {
+ this.key = key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void register(OD outputDesc, GiraphConfiguration conf) {
+ BlockOutputFormat.addOutputDesc(outputDesc, key, conf);
+ }
+
+ public OD getOutputDesc(BlockOutputApi outputApi) {
+ return outputApi.<OW, OD>getOutputDesc(key);
+ }
+
+ public OW getWriter(BlockOutputApi outputApi) {
+ return outputApi.getWriter(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java
new file mode 100644
index 0000000..5574ab4
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.output;
+
+import java.io.Closeable;
+
+/**
+ * Block output writer
+ */
+public interface BlockOutputWriter extends Closeable {
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java
new file mode 100644
index 0000000..11193b1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Multi-output support for Block Applications
+ */
+package org.apache.giraph.block_app.framework.output;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java
new file mode 100644
index 0000000..0b6934e
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Block application abstraction package.
+ *
+ * Giraph application is represented as a collection of pieces,
+ * aggregated via blocks, ultimately into a single block, that
+ * represents complete application execution.
+ */
+package org.apache.giraph.block_app.framework;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
new file mode 100644
index 0000000..882f4f1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.function.Consumer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Parent of all Pieces, contains comprehensive list of methods Piece
+ * can support. Specific subclasses should be extended directly,
+ * to simplify usage - most frequently for example Piece class.
+ *
+ * Single unit of execution, capturing:
+ * - sending and then receiving messages from vertices
+ * - sending data to be aggregated from workers to master
+ * - sending values from master, via aggregators, to workers
+ * - sending and receiving worker messages
+ *
+ *
+ * Order of execution is:
+ * - On master, once at the start of the application
+ * -- registerAggregators (deprecated, use registerReducers instead)
+ *
+ * - After masterCompute of previous piece, on master:
+ * -- registerReducers
+ *
+ * - Send logic on workers:
+ * -- getVertexSender per each worker thread, and on object returned by it:
+ * --- vertexSend on each vertex
+ * --- postprocess on each worker thread
+ * -- workerContextSend per worker
+ *
+ * - Logic on master:
+ * -- masterCompute
+ *
+ * - Receive logic on workers:
+ * -- workerContextReceive per worker
+ * -- getVertexReceiver per each worker thread, and on object returned by it:
+ * --- vertexReceive on each vertex
+ * --- postprocess on each worker thread
+ *
+ * And before everything, during initialization, registerAggregators.
+ *
+ * Only masterCompute and registerReducers/registerAggregators should modify
+ * the Piece, all of the worker methods should treat Piece as read-only.
+ *
+ * Each piece should be encapsulated unit of execution. Vertex value should be
+ * used as a single implicit "communication" channel between different pieces,
+ * all other dependencies should be explicitly defined and passed through
+ * constructor, via interfaces (as explained below).
+ * I.e. state of the vertex value is invariant that Pieces act upon.
+ * Best is not to depend on explicit vertex value class, but on interface that
+ * provides all needed functions, so that pieces can be freely combined,
+ * as long as vertex value implements appropriate ones.
+ * Similarly, use most abstract class you need - if Piece doesn't depend
+ * on edge value, don't use NullWritable, but Writable. Or if it doesn't
+ * depend on ExecutionStage, use Object for it.
+ *
+ * All other external dependencies should be explicitly passed through
+ * constructor, through interfaces.
+ *
+ * All Pieces will be created within one context - on the master.
+ * They are then going to be replicated across all workers, and across all
+ * threads within each worker, and will see everything that happens in global
+ * context (masterCompute) before them, including any state master has.
+ * Through ObjectHolder/ObjectTransfer, you can pass data between Pieces in
+ * global context, and from global context to worker functions of a Piece
+ * that happens in the future.
+ *
+ * VertexReceiver of previous Piece and VertexSender of next Piece live in
+ * the same context, and vertexReceive of the next Piece is executed
+ * immediately after vertexSend of the previous piece, before vertexSend is
+ * called on the next vertex.
+ * This detail allows you to have external dependency on each other through
+ * memory only mediator objects - like ObjectTransfer.
+ *
+ * All other logic going to live in different contexts,
+ * specifically VertexSender and VertexReceiver of the same Piece,
+ * or workerContextSend and VertexSender of the same Piece, and cannot interact
+ * with each other outside of changing the state of the graph or using
+ * global communication api.
+ *
+ * All methods on this class (or objects it returns) will be called serially,
+ * so there is no need for any Thread synchronization.
+ * Each Thread will have a complete deep copy of the Piece, to achieve that,
+ * so all static fields must be written to be Thread safe!
+ * (i.e. either immutable, or have synchronized/locked access to them)
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings({ "rawtypes" })
+public abstract class AbstractPiece<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable, WV,
+ WM extends Writable, S> implements Block {
+
+ // Overridable functions
+
+ // registerReducers(CreateReducersApi reduceApi, S executionStage)
+
+ /**
+ * Add automatic handling of reducers to registerReducers.
+ * Only for internal use.
+ */
+ public abstract void wrappedRegisterReducers(
+ BlockMasterApi masterApi, S executionStage);
+
+ // getVertexSender(BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage)
+
+ /**
+ * Add automatic handling of reducers to getVertexSender.
+ *
+ * Only for Framework internal use.
+ */
+ public abstract InnerVertexSender getWrappedVertexSender(
+ final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage);
+
+ /**
+ * Override to have worker context send computation.
+ *
+ * Called once per worker, after all vertices have been processed with
+ * getVertexSender.
+ */
+ public void workerContextSend(
+ BlockWorkerContextSendApi<WM> workerContextApi, S executionStage,
+ WV workerValue) {
+ }
+
+ /**
+ * Function that is called on master, after send phase, before receive phase.
+ *
+ * It can:
+ * - read aggregators sent from worker
+ * - do global processing
+ * - send data to workers through aggregators
+ */
+ public void masterCompute(BlockMasterApi masterApi, S executionStage) {
+ }
+
+ /**
+ * Override to have worker context receive computation.
+ *
+ * Called once per worker, before all vertices are going to be processed
+ * with getVertexReceiver.
+ */
+ public void workerContextReceive(
+ BlockWorkerContextReceiveApi workerContextApi, S executionStage,
+ WV workerValue, List<WM> workerMessages) {
+ }
+
+ /**
+ * Override to do vertex receive processing.
+ *
+ * Creates handler that defines what should be executed on worker
+ * for each vertex during receive phase.
+ *
+ * This logic executed last.
+ * This function is called once on each worker on each thread, in parallel,
+ * on their copy of Piece object to create functions handler.
+ *
+ * If returned object implements Postprocessor interface, then corresponding
+ * postprocess() function is going to be called once, after all vertices
+ * corresponding thread needed to process are done.
+ */
+ public VertexReceiver<I, V, E, M> getVertexReceiver(
+ BlockWorkerReceiveApi<I> workerApi, S executionStage) {
+ return null;
+ }
+
+ /**
+ * Returns MessageClasses definition for messages being sent by this Piece.
+ */
+ public abstract MessageClasses<I, M> getMessageClasses(
+ ImmutableClassesGiraphConfiguration conf);
+
+ /**
+ * Override to provide different next execution stage for
+ * Pieces that come after it.
+ *
+ * Execution stage should be immutable, and this function should be
+ * returning a new object, if it needs to return different value.
+ *
+ * It affects pieces that come after this piece,
+ * and isn't applied to execution stage this piece sees.
+ */
+ public S nextExecutionStage(S executionStage) {
+ return executionStage;
+ }
+
+ /**
+ * Override to register any potential aggregators used by this piece.
+ *
+ * @deprecated Use registerReducers instead.
+ */
+ @Deprecated
+ public void registerAggregators(BlockMasterApi masterApi)
+ throws InstantiationException, IllegalAccessException {
+ }
+
+ // Inner classes
+
+ /** Inner class to provide clean use without specifying types */
+ public abstract class InnerVertexSender
+ implements VertexSender<I, V, E>, VertexPostprocessor {
+ @Override
+ public void postprocess() { }
+ }
+
+ /** Inner class to provide clean use without specifying types */
+ public abstract class InnerVertexReceiver
+ implements VertexReceiver<I, V, E, M>, VertexPostprocessor {
+ @Override
+ public void postprocess() { }
+ }
+
+ // Internal implementation
+
+ @Override
+ public final Iterator<AbstractPiece> iterator() {
+ return Iterators.<AbstractPiece>singletonIterator(this);
+ }
+
+ @Override
+ public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+ consumer.apply(this);
+ }
+
+ @Override
+ public String toString() {
+ String name = getClass().getSimpleName();
+ if (name.isEmpty()) {
+ name = getClass().getName();
+ }
+ return name;
+ }
+
+
+ // make hashCode and equals final, forcing them to be based on
+ // reference identity.
+ @Override
+ public final int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ return super.equals(obj);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java
new file mode 100644
index 0000000..0963efb
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper;
+import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses;
+import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf;
+import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.DefaultMessageFactorySupplierFromConf;
+import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.SupplierFromConfByCopy;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.conf.EnumConfOption;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Additional abstract implementations for all pieces to be used.
+ * Code here is not in AbstractPiece only to allow for non-standard
+ * non-user-defined pieces. <br>
+ * Only logic used by the underlying framework directly is in AbstractPiece
+ * itself.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public abstract class DefaultParentPiece<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable, WV,
+ WM extends Writable, S> extends AbstractPiece<I, V, E, M, WV, WM, S> {
+ // TODO move to GiraphConstants
+ /**
+ * This option will tell which message encode & store enum to force, when
+ * combining is not enabled.
+ *
+ * MESSAGE_ENCODE_AND_STORE_TYPE and this property are basically upper
+ * and lower bound on message store type, when looking them in order from
+ * not doing anything special, to most advanced type:
+ * BYTEARRAY_PER_PARTITION,
+ * EXTRACT_BYTEARRAY_PER_PARTITION,
+ * POINTER_LIST_PER_VERTEX
+ * resulting encode type is going to be:
+ * pieceEncodingType = piece.allowOneMessageToManyIdsEncoding() ?
+ * POINTER_LIST_PER_VERTEX : BYTEARRAY_PER_PARTITION)
+ * Math.max(index(minForce), Math.min(index(maxAllowed), index(pieceType);
+ *
+ * This is useful to force all pieces onto particular message store, even
+ * if they do not overrideallowOneMessageToManyIdsEncoding, though that might
+ * be rarely needed.
+ * This option might be more useful for fully local computation,
+ * where overall job behavior is quite different.
+ */
+ public static final EnumConfOption<MessageEncodeAndStoreType>
+ MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE =
+ EnumConfOption.create("giraph.messageEncodeAndStoreTypeMinForce",
+ MessageEncodeAndStoreType.class,
+ MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
+ "Select the message_encode_and_store_type min force to use");
+
+ private final ReduceUtilsObject reduceUtils = new ReduceUtilsObject();
+ private ReducersForPieceHandler reducersHandler;
+
+ // Overridable functions
+
+ /**
+ * Override to register any potential reducers used by this piece,
+ * through calls to {@code reduceApi}, which will return reducer handles
+ * for simple.
+ * <br/>
+ * Tip: Without defining a field, first write here name of the field and what
+ * you want to reduce, like:
+ * <br/>
+ * {@code totalSum = reduceApi.createLocalReducer(SumReduce.DOUBLE); }
+ * <br/>
+ * and then use tools your IDE provides to generate field signature itself,
+ * which might be slightly complex:
+ * <br/>
+ * {@code ReducerHandle<DoubleWritable, DoubleWritable> totalSum; }
+ */
+ public void registerReducers(CreateReducersApi reduceApi, S executionStage) {
+ }
+
+ /**
+ * Override to do vertex send processing.
+ *
+ * Creates handler that defines what should be executed on worker
+ * during send phase.
+ *
+ * This logic gets executed first.
+ * This function is called once on each worker on each thread, in parallel,
+ * on their copy of Piece object to create functions handler.
+ *
+ * If returned object implements Postprocessor interface, then corresponding
+ * postprocess() function is going to be called once, after all vertices
+ * corresponding thread needed to process are done.
+ */
+ public VertexSender<I, V, E> getVertexSender(
+ BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+ return null;
+ }
+
+ /**
+ * Override to specify type of the message this Piece sends, if it does
+ * send messages.
+ *
+ * If not overwritten, no messages can be sent.
+ */
+ protected Class<M> getMessageClass() {
+ return null;
+ }
+
+ /**
+ * Override to specify message value factory to be used,
+ * which creates objects into which messages will be deserialized.
+ *
+ * If not overwritten, or null is returned, DefaultMessageValueFactory
+ * will be used.
+ */
+ protected MessageValueFactory<M> getMessageFactory(
+ ImmutableClassesGiraphConfiguration conf) {
+ return null;
+ }
+
+ /**
+ * Override to specify message combiner to be used, if any.
+ *
+ * Message combiner itself should be immutable
+ * (i.e. it will be call simultanously from multiple threads)
+ */
+ protected MessageCombiner<? super I, M> getMessageCombiner(
+ ImmutableClassesGiraphConfiguration conf) {
+ return null;
+ }
+
+ /**
+ * Override to specify that this Piece allows one to many ids encoding to be
+ * used for messages.
+ * You should override this function, if you are sending identical message to
+ * all targets, and message itself is not extremely small.
+ */
+ protected boolean allowOneMessageToManyIdsEncoding() {
+ return false;
+ }
+
+ @Override
+ public MessageClasses<I, M> getMessageClasses(
+ ImmutableClassesGiraphConfiguration conf) {
+ Class<M> messageClass = null;
+ MessageValueFactory<M> messageFactory = getMessageFactory(conf);
+ MessageCombiner<? super I, M> messageCombiner = getMessageCombiner(conf);
+
+ if (messageFactory != null) {
+ messageClass = (Class) messageFactory.newInstance().getClass();
+ } else if (messageCombiner != null) {
+ messageClass = (Class) messageCombiner.createInitialMessage().getClass();
+ }
+
+ if (messageClass != null) {
+ Preconditions.checkState(getMessageClass() == null,
+ "Piece %s defines getMessageFactory or getMessageCombiner, " +
+ "so it doesn't need to define getMessageClass.",
+ toString());
+ } else {
+ messageClass = getMessageClass();
+ if (messageClass == null) {
+ messageClass = (Class) NoMessage.class;
+ }
+ }
+
+ SupplierFromConf<MessageValueFactory<M>> messageFactorySupplier;
+ if (messageFactory != null) {
+ messageFactorySupplier =
+ new SupplierFromConfByCopy<MessageValueFactory<M>>(messageFactory);
+ } else {
+ messageFactorySupplier =
+ new DefaultMessageFactorySupplierFromConf<>(messageClass);
+ }
+
+ SupplierFromConf<? extends MessageCombiner<? super I, M>>
+ messageCombinerSupplier;
+ if (messageCombiner != null) {
+ messageCombinerSupplier = new SupplierFromConfByCopy<>(messageCombiner);
+ } else {
+ messageCombinerSupplier = null;
+ }
+
+ int maxAllowed =
+ GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf).ordinal();
+ int minForce = MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE.get(conf).ordinal();
+ Preconditions.checkState(maxAllowed >= minForce);
+
+ int pieceEncodeType = (allowOneMessageToManyIdsEncoding() ?
+ MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX :
+ MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION).ordinal();
+ // bound piece type with boundaries:
+ pieceEncodeType = Math.max(minForce, Math.min(maxAllowed, pieceEncodeType));
+
+ MessageEncodeAndStoreType messageEncodeAndStoreType =
+ MessageEncodeAndStoreType.values()[pieceEncodeType];
+
+ if (messageFactory instanceof GiraphConfigurationSettable) {
+ throw new IllegalStateException(
+ messageFactory.getClass() + " MessageFactory in " + this +
+ " Piece implements GiraphConfigurationSettable");
+ }
+ if (messageCombiner instanceof GiraphConfigurationSettable) {
+ throw new IllegalStateException(
+ messageCombiner.getClass() + " MessageCombiner in " + this +
+ " Piece implements GiraphConfigurationSettable");
+ }
+
+ return new ObjectMessageClasses<>(
+ messageClass, messageFactorySupplier,
+ messageCombinerSupplier, messageEncodeAndStoreType);
+ }
+
+ // Internal implementation
+
+ @Override
+ public final InnerVertexSender getWrappedVertexSender(
+ final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+ reducersHandler.vertexSenderWorkerPreprocess(workerApi);
+ final VertexSender<I, V, E> functions =
+ getVertexSender(workerApi, executionStage);
+ return new InnerVertexSender() {
+ @Override
+ public void vertexSend(Vertex<I, V, E> vertex) {
+ if (functions != null) {
+ functions.vertexSend(vertex);
+ }
+ }
+ @Override
+ public void postprocess() {
+ if (functions instanceof VertexPostprocessor) {
+ ((VertexPostprocessor) functions).postprocess();
+ }
+ reducersHandler.vertexSenderWorkerPostprocess(workerApi);
+ }
+ };
+ }
+
+ @Override
+ public final void wrappedRegisterReducers(
+ BlockMasterApi masterApi, S executionStage) {
+ reducersHandler = new ReducersForPieceHandler();
+ registerReducers(new CreateReducersApiWrapper(
+ masterApi, reducersHandler), executionStage);
+ }
+
+ // utility functions:
+ // TODO Java8 - move these as default functions to VertexSender interface
+ protected final void reduceDouble(
+ ReducerHandle<DoubleWritable, ?> reduceHandle, double value) {
+ reduceUtils.reduceDouble(reduceHandle, value);
+ }
+
+ protected final void reduceFloat(
+ ReducerHandle<FloatWritable, ?> reduceHandle, float value) {
+ reduceUtils.reduceFloat(reduceHandle, value);
+ }
+
+ protected final void reduceLong(
+ ReducerHandle<LongWritable, ?> reduceHandle, long value) {
+ reduceUtils.reduceLong(reduceHandle, value);
+ }
+
+ protected final void reduceInt(
+ ReducerHandle<IntWritable, ?> reduceHandle, int value) {
+ reduceUtils.reduceInt(reduceHandle, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
new file mode 100644
index 0000000..3ad66d1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece;
+
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Piece that should be extended in common usecases, when we want to be:
+ * - sending and then receiving messages from vertices
+ * - sending data to be aggregated from workers to master
+ * - sending values from master, via aggregators, to workers
+ *
+ * (basically - we don't want to use WorkerContext)
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings("rawtypes")
+public class Piece<I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable, S>
+ extends DefaultParentPiece<I, V, E, M, Object, NoMessage, S> {
+
+ // Disallowing use of Worker Context functions:
+ @Override
+ public final void workerContextSend(
+ BlockWorkerContextSendApi<NoMessage> workerContextApi,
+ S executionStage, Object workerValue) {
+ }
+
+ @Override
+ public final void workerContextReceive(
+ BlockWorkerContextReceiveApi workerContextApi,
+ S executionStage, Object workerValue, List<NoMessage> workerMessages) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java
new file mode 100644
index 0000000..a5d0c8c
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Piece that should be extended when WorkerContext is used.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings("rawtypes")
+public class PieceWithWorkerContext<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable, WV,
+ WM extends Writable, S>
+ extends DefaultParentPiece<I, V, E, M, WV, WM, S> {
+
+ /**
+ * Get global worker value.
+ * Value returned can be accessed from may threads, and so all
+ * accesses to it should be done in a thread-safe manner!
+ *
+ * This is the only place in Blocks Framework where you need
+ * to take care of concurrency.
+ */
+ @SuppressWarnings("unchecked")
+ public WV getWorkerValue(BlockWorkerApi<I> workerApi) {
+ return (WV) ((BlockWorkerValueAccessor) workerApi).getWorkerValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
new file mode 100644
index 0000000..23c2d29
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.delegate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Delegate Piece which allows combining multiple pieces in same iteration:
+ * new DelegatePiece(new LogicPiece(), new StatsPiece())
+ * You should be careful when doing so, since those pieces must not interact,
+ * and only one can send messages.
+ * Execution of any of the Piece methods by the framework is going to trigger
+ * sequential execution of that method on all of the pieces this DelegatePiece
+ * wraps. That means for example, getVertexSender is going to be called on all
+ * pieces before masterCompute is called on all pieces, which is called before
+ * getVertexReceiver on all pieces.
+ *
+ * Also, via overriding, it provides an abstract class for filtering. I.e. if
+ * you want piece that filters out calls to masterCompute, you can have:
+ * new FilterMasterPiece(new LogicPiece()),
+ * with FilterMasterPiece extends DelegatePiece, and only overrides getMaster
+ * function and DelegateMasterPiece class.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings("rawtypes")
+public class DelegatePiece<I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable, WV, WM extends Writable, S>
+ extends AbstractPiece<I, V, E, M, WV, WM, S> {
+
+ private final List<AbstractPiece<I, V, E, M, WV, WM, S>> innerPieces;
+
+ @SafeVarargs
+ @SuppressWarnings("unchecked")
+ public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
+ ? super M, ? super WV, ? super WM, ? super S>... innerPieces) {
+ // Pieces are contravariant, but Java generics cannot express that,
+ // so use unchecked cast inside to allow callers to be typesafe
+ this.innerPieces = new ArrayList(Arrays.asList(innerPieces));
+ }
+
+ @SuppressWarnings("unchecked")
+ public DelegatePiece(AbstractPiece<? super I, ? super V, ? super E,
+ ? super M, ? super WV, ? super WM, ? super S> innerPiece) {
+ // Pieces are contravariant, but Java generics cannot express that,
+ // so use unchecked cast inside to allow callers to be typesafe
+ this.innerPieces = new ArrayList(Arrays.asList(innerPiece));
+ }
+
+ protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
+ ArrayList<InnerVertexSender> workerSendFunctions,
+ BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+ return new DelegateWorkerSendFunctions(workerSendFunctions);
+ }
+
+ protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
+ ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
+ BlockWorkerReceiveApi<I> workerApi, S executionStage) {
+ return new DelegateWorkerReceiveFunctions(workerReceiveFunctions);
+ }
+
+ @Override
+ public InnerVertexSender getWrappedVertexSender(
+ BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+ ArrayList<InnerVertexSender> workerSendFunctions =
+ new ArrayList<>(innerPieces.size());
+ for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+ workerSendFunctions.add(
+ innerPiece.getWrappedVertexSender(workerApi, executionStage));
+ }
+ return delegateWorkerSendFunctions(
+ workerSendFunctions, workerApi, executionStage);
+ }
+
+ @Override
+ public InnerVertexReceiver getVertexReceiver(
+ BlockWorkerReceiveApi<I> workerApi, S executionStage) {
+ ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions =
+ new ArrayList<>(innerPieces.size());
+ for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+ workerReceiveFunctions.add(
+ innerPiece.getVertexReceiver(workerApi, executionStage));
+ }
+ return delegateWorkerReceiveFunctions(
+ workerReceiveFunctions, workerApi, executionStage);
+ }
+
+ /** Delegating WorkerSendPiece */
+ protected class DelegateWorkerSendFunctions extends InnerVertexSender {
+ private final ArrayList<InnerVertexSender> workerSendFunctions;
+
+ public DelegateWorkerSendFunctions(
+ ArrayList<InnerVertexSender> workerSendFunctions) {
+ this.workerSendFunctions = workerSendFunctions;
+ }
+
+ @Override
+ public void vertexSend(Vertex<I, V, E> vertex) {
+ for (InnerVertexSender functions : workerSendFunctions) {
+ if (functions != null) {
+ functions.vertexSend(vertex);
+ }
+ }
+ }
+
+ @Override
+ public void postprocess() {
+ for (InnerVertexSender functions : workerSendFunctions) {
+ if (functions != null) {
+ functions.postprocess();
+ }
+ }
+ }
+ }
+
+ /** Delegating WorkerReceivePiece */
+ protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver {
+ private final ArrayList<VertexReceiver<I, V, E, M>>
+ workerReceiveFunctions;
+
+ public DelegateWorkerReceiveFunctions(
+ ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions) {
+ this.workerReceiveFunctions = workerReceiveFunctions;
+ }
+
+ @Override
+ public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
+ for (VertexReceiver<I, V, E, M> functions :
+ workerReceiveFunctions) {
+ if (functions != null) {
+ functions.vertexReceive(vertex, messages);
+ }
+ }
+ }
+
+ @Override
+ public void postprocess() {
+ for (VertexReceiver<I, V, E, M> functions :
+ workerReceiveFunctions) {
+ if (functions instanceof VertexPostprocessor) {
+ ((VertexPostprocessor) functions).postprocess();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void masterCompute(BlockMasterApi api, S executionStage) {
+ for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
+ piece.masterCompute(api, executionStage);
+ }
+ }
+
+ @Override
+ public void workerContextSend(
+ BlockWorkerContextSendApi<WM> workerContextApi, S executionStage,
+ WV workerValue) {
+ for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
+ piece.workerContextSend(workerContextApi, executionStage, workerValue);
+ }
+ }
+
+ @Override
+ public void workerContextReceive(
+ BlockWorkerContextReceiveApi workerContextApi, S executionStage,
+ WV workerValue, List<WM> workerMessages) {
+ for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
+ piece.workerContextReceive(
+ workerContextApi, executionStage, workerValue, workerMessages);
+ }
+ }
+
+ @Override
+ public S nextExecutionStage(S executionStage) {
+ for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+ executionStage = innerPiece.nextExecutionStage(executionStage);
+ }
+ return executionStage;
+ }
+
+ @Override
+ public MessageClasses<I, M> getMessageClasses(
+ ImmutableClassesGiraphConfiguration conf) {
+ MessageClasses<I, M> messageClasses = null;
+ MessageClasses<I, M> firstMessageClasses = null;
+ for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+ MessageClasses<I, M> cur = innerPiece.getMessageClasses(conf);
+ Preconditions.checkState(cur != null);
+ if (!cur.getMessageClass().equals(NoMessage.class)) {
+ if (messageClasses != null) {
+ throw new RuntimeException(
+ "Only one piece combined through delegate (" +
+ toString() + ") can send messages");
+ }
+ messageClasses = cur;
+ }
+ if (firstMessageClasses == null) {
+ firstMessageClasses = cur;
+ }
+ }
+ return messageClasses != null ? messageClasses : firstMessageClasses;
+ }
+
+ @Override
+ public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+ for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+ innerPiece.forAllPossiblePieces(consumer);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void registerAggregators(BlockMasterApi master)
+ throws InstantiationException, IllegalAccessException {
+ for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+ innerPiece.registerAggregators(master);
+ }
+ }
+
+ @Override
+ public void wrappedRegisterReducers(
+ BlockMasterApi masterApi, S executionStage) {
+ for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
+ innerPiece.wrappedRegisterReducers(masterApi, executionStage);
+ }
+ }
+
+ protected String delegationName() {
+ return "Delegate";
+ }
+
+ @Override
+ public String toString() {
+ return delegationName() + innerPieces.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java
new file mode 100644
index 0000000..5c702c5
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.delegate;
+
+import java.util.ArrayList;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Piece which uses a provided suppliers to decide whether or not to run
+ * receive/send piece part on a certain vertex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ * @param <WV> Worker value type
+ * @param <WM> Worker message type
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings({ "rawtypes" })
+public class FilteringPiece<I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable, WV, WM extends Writable, S>
+ extends DelegatePiece<I, V, E, M, WV, WM, S> {
+ private final SupplierFromVertex<I, V, E, Boolean> toCallSend;
+ private final SupplierFromVertex<I, V, E, Boolean> toCallReceive;
+
+ /**
+ * Creates filtering piece which uses passed {@code toCallSend} to filter
+ * calls to {@code vertexSend}, and passed {@code toCallReceive} to filter
+ * calls to {@code vertexReceive}, on passed {@code innerPiece}.
+ */
+ @SuppressWarnings("unchecked")
+ public FilteringPiece(
+ SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
+ SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
+ toCallReceive,
+ AbstractPiece<? super I, ? super V, ? super E, ? super M,
+ ? super WV, ? super WM, ? super S> innerPiece) {
+ super(innerPiece);
+ // Suppliers are contravariant on vertex types,
+ // but Java generics cannot express that,
+ // so use unchecked cast inside to allow callers to be typesafe
+ this.toCallSend = (SupplierFromVertex) toCallSend;
+ this.toCallReceive = (SupplierFromVertex) toCallReceive;
+ Preconditions.checkArgument(
+ toCallSend != null || toCallReceive != null,
+ "Both send and receive filter cannot be null");
+ }
+
+ /**
+ * Creates filtering piece, where both vertexSend and vertexReceive is
+ * filtered based on same supplier.
+ */
+ public FilteringPiece(
+ SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
+ toCallSendAndReceive,
+ AbstractPiece<? super I, ? super V, ? super E, ? super M,
+ ? super WV, ? super WM, ? super S> innerPiece) {
+ this(toCallSendAndReceive, toCallSendAndReceive, innerPiece);
+ }
+
+ /**
+ * Creates filtering piece, that filters only vertexReceive function,
+ * and always calls vertexSend function.
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable, WV, WM extends Writable, S>
+ FilteringPiece<I, V, E, M, WV, WM, S> createReceiveFiltering(
+ SupplierFromVertex<? super I, ? super V, ? super E, Boolean>
+ toCallReceive,
+ AbstractPiece<? super I, ? super V, ? super E, ? super M,
+ ? super WV, ? super WM, ? super S> innerPiece) {
+ return new FilteringPiece<>(null, toCallReceive, innerPiece);
+ }
+
+ /**
+ * Creates filtering block, that filters only vertexSend function,
+ * and always calls vertexReceive function.
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable, WV, WM extends Writable, S>
+ FilteringPiece<I, V, E, M, WV, WM, S> createSendFiltering(
+ SupplierFromVertex<? super I, ? super V, ? super E, Boolean> toCallSend,
+ AbstractPiece<? super I, ? super V, ? super E, ? super M, ? super WV,
+ ? super WM, ? super S> innerPiece) {
+ return new FilteringPiece<>(toCallSend, null, innerPiece);
+ }
+
+ @Override
+ protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
+ ArrayList<InnerVertexSender> workerSendFunctions,
+ BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
+ return new DelegateWorkerSendFunctions(workerSendFunctions) {
+ @Override
+ public void vertexSend(Vertex<I, V, E> vertex) {
+ if (toCallSend == null || toCallSend.get(vertex)) {
+ super.vertexSend(vertex);
+ }
+ }
+ };
+ }
+
+ @Override
+ protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
+ ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
+ BlockWorkerReceiveApi<I> workerApi, S executionStage) {
+ return new DelegateWorkerReceiveFunctions(workerReceiveFunctions) {
+ @Override
+ public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
+ if (toCallReceive == null || toCallReceive.get(vertex)) {
+ super.vertexReceive(vertex, messages);
+ }
+ }
+ };
+ }
+
+ @Override
+ protected String delegationName() {
+ if (toCallSend != null && toCallReceive != null) {
+ if (toCallSend != toCallReceive) {
+ return "AsymFilter";
+ }
+ return "Filter";
+ } else if (toCallSend != null) {
+ return "SendFilter";
+ } else if (toCallReceive != null) {
+ return "ReceiveFilter";
+ } else {
+ throw new IllegalStateException("Both Send and Receive filters are null");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java
new file mode 100644
index 0000000..f367e6c
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Pieces that delegate their work to a set of one or multiple other Pieces.
+ */
+package org.apache.giraph.block_app.framework.piece.delegate;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java
new file mode 100644
index 0000000..f18d1f4
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;
+
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * Handle to a broadcast.
+ *
+ * @param <T> Value type
+ */
+public interface BroadcastHandle<T> {
+ /** Get broadcasted value */
+ T getBroadcast(WorkerBroadcastUsage worker);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java
new file mode 100644
index 0000000..bbec1c6
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Utility object with common primitive reduce operations,
+ * without need to create reusable objects within the piece.
+ */
+public class ReduceUtilsObject {
+ private final DoubleWritable reusableDouble = new DoubleWritable();
+ private final FloatWritable reusableFloat = new FloatWritable();
+ private final LongWritable reusableLong = new LongWritable();
+ private final IntWritable reusableInt = new IntWritable();
+
+ // utility functions:
+ public void reduceDouble(
+ ReducerHandle<DoubleWritable, ?> reduceHandle, double value) {
+ DoubleWritable tmp = reusableDouble;
+ tmp.set(value);
+ reduceHandle.reduce(tmp);
+ }
+
+ public void reduceFloat(
+ ReducerHandle<FloatWritable, ?> reduceHandle, float value) {
+ FloatWritable tmp = reusableFloat;
+ tmp.set(value);
+ reduceHandle.reduce(tmp);
+ }
+
+ public void reduceLong(
+ ReducerHandle<LongWritable, ?> reduceHandle, long value) {
+ LongWritable tmp = reusableLong;
+ tmp.set(value);
+ reduceHandle.reduce(tmp);
+ }
+
+ public void reduceInt(ReducerHandle<IntWritable, ?> reduceHandle, int value) {
+ IntWritable tmp = reusableInt;
+ tmp.set(value);
+ reduceHandle.reduce(tmp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java
new file mode 100644
index 0000000..921c863
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * Handle that wraps both reducerHandle and broadcastHandle, so callers
+ * don't need to have two fields.
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public class ReducerAndBroadcastWrapperHandle<S, R> {
+ private ReducerHandle<S, R> reducerHandle;
+ private BroadcastHandle<R> broadcastHandle;
+
+ /** Set reducer handle to just registered handle */
+ public void registeredReducer(ReducerHandle<S, R> reducerHandle) {
+ this.reducerHandle = reducerHandle;
+ }
+
+ /** Reduce single value */
+ public void reduce(S valueToReduce) {
+ reducerHandle.reduce(valueToReduce);
+ }
+
+ /** Get reduced value */
+ public R getReducedValue(MasterGlobalCommUsage master) {
+ return reducerHandle.getReducedValue(master);
+ }
+
+ /**
+ * Broadcast reduced value from master
+ */
+ public void broadcastValue(BlockMasterApi master) {
+ broadcastHandle = reducerHandle.broadcastValue(master);
+ }
+
+ /** Get broadcasted value */
+ public R getBroadcast(WorkerBroadcastUsage worker) {
+ return broadcastHandle.getBroadcast(worker);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java
new file mode 100644
index 0000000..dae40f2
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+
+/**
+ * Handle to a reducer.
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public interface ReducerHandle<S, R> {
+ /** Reduce single value */
+ void reduce(S valueToReduce);
+ /** Get reduced value */
+ R getReducedValue(MasterGlobalCommUsage master);
+
+ /**
+ * Broadcast reduced value from master
+ *
+ * @return Handle to the broadcasted value.
+ */
+ BroadcastHandle<R> broadcastValue(BlockMasterApi master);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java
new file mode 100644
index 0000000..7ee54cb
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.array;
+
+/**
+ * Handle to array of handles underneath
+ *
+ * @param <T> Value type
+ */
+public interface ArrayHandle<T> {
+ /**
+ * Get value at index.
+ */
+ T get(int index);
+
+ /**
+ * Size of this array if defined up front, or throws
+ * UnsupportedOperationException if size is dynamic
+ */
+ int getStaticSize();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java
new file mode 100644
index 0000000..bf0d333
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.array;
+
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * Handle to array of broadcasts
+ *
+ * @param <T> Value type
+ */
+public interface BroadcastArrayHandle<T>
+ extends ArrayHandle<BroadcastHandle<T>> {
+
+ /**
+ * Number of elements that were broadcasted.
+ */
+ int getBroadcastedSize(WorkerBroadcastUsage worker);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java
new file mode 100644
index 0000000..a4b99ac
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.array;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+
+/**
+ * Handle to array of reducers
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public interface ReducerArrayHandle<S, R>
+ extends ArrayHandle<ReducerHandle<S, R>> {
+
+ /**
+ * Number of elements that were reduced.
+ */
+ int getReducedSize(BlockMasterApi master);
+
+ /**
+ * Broadcast whole array of reducers to master
+ *
+ * @return Handle to the broadcasted array.
+ */
+ BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java
new file mode 100644
index 0000000..a8beb85
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Interfaces representing arrays of individual handles.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.array;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java
new file mode 100644
index 0000000..c2cc0f2
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.internal;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Wrapping masterApi and reducers handler into API for creating reducer
+ * handles.
+ */
+public class CreateReducersApiWrapper implements CreateReducersApi {
+ private final BlockMasterApi masterApi;
+ private final ReducersForPieceHandler reducersApi;
+
+ public CreateReducersApiWrapper(
+ BlockMasterApi masterApi, ReducersForPieceHandler reducersApi) {
+ this.masterApi = masterApi;
+ this.reducersApi = reducersApi;
+ }
+
+ @Override
+ public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer(
+ ReduceOperation<S, R> reduceOp) {
+ return reducersApi.createLocalReducer(
+ masterApi, reduceOp, reduceOp.createInitialValue());
+ }
+
+ @Override
+ public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer(
+ ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+ return reducersApi.createLocalReducer(
+ masterApi, reduceOp, globalInitialValue);
+ }
+
+ @Override
+ public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer(
+ ReduceOperation<S, R> reduceOp) {
+ return reducersApi.createGlobalReducer(
+ masterApi, reduceOp, reduceOp.createInitialValue());
+ }
+
+ @Override
+ public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer(
+ ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+ return reducersApi.createGlobalReducer(
+ masterApi, reduceOp, globalInitialValue);
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration<?, ?, ?> getConf() {
+ return masterApi.getConf();
+ }
+}