You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/26 02:39:46 UTC

[2/2] git commit: updated refs/heads/trunk to 77f8a07

[GIRAPH-1013] Adding reducer handle utilities

Summary: And more functional interfaces, and PairWritable

Test Plan: mvn clean install

Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D40269


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/77f8a075
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/77f8a075
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/77f8a075

Branch: refs/heads/trunk
Commit: 77f8a075ccc029cb608a382f5deb7cc0b27b02e5
Parents: add1d4f
Author: Igor Kabiljo <ik...@fb.com>
Authored: Wed Jun 17 12:47:52 2015 -0700
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Thu Jun 25 17:39:32 2015 -0700

----------------------------------------------------------------------
 giraph-block-app/pom.xml                        |   4 +
 .../reducers/array/ArrayOfHandles.java          | 127 ++++++
 .../block_app/reducers/array/ArrayReduce.java   | 211 ++++++++++
 .../reducers/array/BasicArrayReduce.java        | 353 ++++++++++++++++
 .../reducers/array/HugeArrayUtils.java          | 404 +++++++++++++++++++
 .../block_app/reducers/array/package-info.java  |  21 +
 .../CollectPrimitiveReduceOperation.java        |  84 ++++
 .../collect/CollectReduceOperation.java         |  50 +++
 .../CollectShardedPrimitiveReducerHandle.java   |  96 +++++
 .../collect/CollectShardedReducerHandle.java    |  85 ++++
 ...tShardedTuplesOfPrimitivesReducerHandle.java | 158 ++++++++
 ...ollectTuplesOfPrimitivesReduceOperation.java |  96 +++++
 .../reducers/collect/ShardedReducerHandle.java  | 123 ++++++
 .../reducers/collect/package-info.java          |  21 +
 .../block_app/reducers/map/BasicMapReduce.java  | 276 +++++++++++++
 .../block_app/reducers/map/package-info.java    |  21 +
 .../giraph/block_app/reducers/package-info.java |  21 +
 .../apache/giraph/function/TripleFunction.java  |  41 ++
 .../function/primitive/Obj2DoubleFunction.java  |  33 ++
 .../function/primitive/Obj2FloatFunction.java   |  33 ++
 .../function/primitive/Obj2LongFunction.java    |  33 ++
 .../reducers/array/ObjectStripingTest.java      |  58 +++
 giraph-core/pom.xml                             |   4 +
 .../impl/KryoWrappedReduceOperation.java        |  86 ++++
 .../writable/tuple/DoubleDoubleWritable.java    |  39 ++
 .../writable/tuple/IntDoubleWritable.java       |  40 ++
 .../giraph/writable/tuple/IntIntWritable.java   |  39 ++
 .../giraph/writable/tuple/IntLongWritable.java  |  40 ++
 .../writable/tuple/LongDoubleWritable.java      |  40 ++
 .../giraph/writable/tuple/LongIntWritable.java  |  40 ++
 .../giraph/writable/tuple/LongLongWritable.java |  39 ++
 .../giraph/writable/tuple/PairWritable.java     | 113 ++++++
 .../giraph/writable/tuple/package-info.java     |  21 +
 pom.xml                                         |   6 +
 34 files changed, 2856 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-block-app/pom.xml b/giraph-block-app/pom.xml
index 1f653bb..a05c1c5 100644
--- a/giraph-block-app/pom.xml
+++ b/giraph-block-app/pom.xml
@@ -87,6 +87,10 @@ under the License.
   <dependencies>
     <!-- compile dependencies. sorted lexicographically. -->
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java
new file mode 100644
index 0000000..053fd61
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.array;
+
+import java.util.ArrayList;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * ArrayHandle implemented as an array of individual handles.
+ *
+ * @param <H> Handle type
+ */
+public class ArrayOfHandles<H> implements ArrayHandle<H> {
+  protected final ArrayList<H> handles;
+
+  public ArrayOfHandles(int count, Supplier<H> reduceHandleFactory) {
+    handles = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      handles.add(reduceHandleFactory.get());
+    }
+  }
+
+  public ArrayOfHandles(int count, Int2ObjFunction<H> reduceHandleFactory) {
+    handles = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      handles.add(reduceHandleFactory.apply(i));
+    }
+  }
+
+  @Override
+  public H get(int index) {
+    return handles.get(index);
+  }
+
+  @Override
+  public int getStaticSize() {
+    return handles.size();
+  }
+
+  /**
+   * ReducerArrayHandle implemented as an array of separate reducer handles.
+   *
+   * @param <H> Handle type
+   */
+  public static class ArrayOfReducers<S, R>
+      extends ArrayOfHandles<ReducerHandle<S, R>>
+      implements ReducerArrayHandle<S, R> {
+
+    public ArrayOfReducers(
+        int count, Supplier<ReducerHandle<S, R>> reduceHandleFactory) {
+      super(count, reduceHandleFactory);
+    }
+
+    public ArrayOfReducers(
+        int count, Int2ObjFunction<ReducerHandle<S, R>> reduceHandleFactory) {
+      super(count, reduceHandleFactory);
+    }
+
+    @Override
+    public int getReducedSize(BlockMasterApi master) {
+      return getStaticSize();
+    }
+
+    @Override
+    public BroadcastArrayHandle<R> broadcastValue(final BlockMasterApi master) {
+      return new ArrayOfBroadcasts<>(
+          getStaticSize(),
+          new Int2ObjFunction<BroadcastHandle<R>>() {
+            @Override
+            public BroadcastHandle<R> apply(int index) {
+              return get(index).broadcastValue(master);
+            }
+          });
+    }
+  }
+
+  /**
+   * BroadcastArrayHandle implemented as an array of separate broadcast handles.
+   *
+   * @param <T> Handle type
+   */
+  public static class ArrayOfBroadcasts<T>
+      extends ArrayOfHandles<BroadcastHandle<T>>
+      implements BroadcastArrayHandle<T> {
+
+    public ArrayOfBroadcasts(
+        int count,
+        Int2ObjFunction<BroadcastHandle<T>> broadcastHandleFactory) {
+      super(count, broadcastHandleFactory);
+    }
+
+    public ArrayOfBroadcasts(
+        int count,
+        Supplier<BroadcastHandle<T>> broadcastHandleFactory) {
+      super(count, broadcastHandleFactory);
+    }
+
+    @Override
+    public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+      return getStaticSize();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java
new file mode 100644
index 0000000..f2cdf8c
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.array;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.ArrayWritable;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * One reducer representing reduction of array of individual values.
+ * Elements are represented as object, and so BasicArrayReduce should be
+ * used instead when elements are primitive types.
+ *
+ * @param <S> Single value type, objects passed on workers
+ * @param <R> Reduced value type
+ */
+public class ArrayReduce<S, R extends Writable>
+    implements ReduceOperation<Pair<IntRef, S>, ArrayWritable<R>> {
+  private int fixedSize;
+  private ReduceOperation<S, R> elementReduceOp;
+  private Class<R> elementClass;
+
+  public ArrayReduce() {
+  }
+
+  /**
+   * Create ReduceOperation that reduces arrays by reducing individual
+   * elements.
+   *
+   * @param fixedSize Number of elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   */
+  public ArrayReduce(int fixedSize, ReduceOperation<S, R> elementReduceOp) {
+    this.fixedSize = fixedSize;
+    this.elementReduceOp = elementReduceOp;
+    init();
+  }
+
+  /**
+   * Registers one new reducer, that will reduce array of objects,
+   * by reducing individual elements using {@code elementReduceOp}.
+   *
+   * This function will return ReducerArrayHandle to it, by which
+   * individual elements can be manipulated separately.
+   *
+   * @param fixedSize Number of elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   * @param createFunction Function for creating a reducer
+   * @return Created ReducerArrayHandle
+   */
+  public static <S, T extends Writable>
+  ReducerArrayHandle<S, T> createArrayHandles(
+      final int fixedSize, ReduceOperation<S, T> elementReduceOp,
+      CreateReducerFunctionApi createFunction) {
+    final ReducerHandle<Pair<IntRef, S>, ArrayWritable<T>> reduceHandle =
+        createFunction.createReducer(
+            new ArrayReduce<>(fixedSize, elementReduceOp));
+
+    final IntRef curIndex = new IntRef(0);
+    final MutablePair<IntRef, S> reusablePair =
+        MutablePair.of(new IntRef(0), null);
+    final ReducerHandle<S, T> elementReduceHandle = new ReducerHandle<S, T>() {
+      @Override
+      public T getReducedValue(MasterGlobalCommUsage master) {
+        ArrayWritable<T> result = reduceHandle.getReducedValue(master);
+        return result.get()[curIndex.value];
+      }
+
+      @Override
+      public void reduce(S valueToReduce) {
+        reusablePair.getLeft().value = curIndex.value;
+        reusablePair.setRight(valueToReduce);
+        reduceHandle.reduce(reusablePair);
+      }
+
+      @Override
+      public BroadcastHandle<T> broadcastValue(BlockMasterApi master) {
+        throw new UnsupportedOperationException();
+      }
+    };
+
+    return new ReducerArrayHandle<S, T>() {
+      @Override
+      public ReducerHandle<S, T> get(int index) {
+        curIndex.value = index;
+        return elementReduceHandle;
+      }
+
+      @Override
+      public int getStaticSize() {
+        return fixedSize;
+      }
+
+      @Override
+      public int getReducedSize(BlockMasterApi master) {
+        return getStaticSize();
+      }
+
+      @Override
+      public BroadcastArrayHandle<T> broadcastValue(BlockMasterApi master) {
+        final BroadcastHandle<ArrayWritable<T>> broadcastHandle =
+            reduceHandle.broadcastValue(master);
+        final IntRef curIndex = new IntRef(0);
+        final BroadcastHandle<T>
+        elementBroadcastHandle = new BroadcastHandle<T>() {
+          @Override
+          public T getBroadcast(WorkerBroadcastUsage worker) {
+            ArrayWritable<T> result = broadcastHandle.getBroadcast(worker);
+            return result.get()[curIndex.value];
+          }
+        };
+        return new BroadcastArrayHandle<T>() {
+          @Override
+          public BroadcastHandle<T> get(int index) {
+            curIndex.value = index;
+            return elementBroadcastHandle;
+          }
+
+          @Override
+          public int getStaticSize() {
+            return fixedSize;
+          }
+
+          @Override
+          public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+            return getStaticSize();
+          }
+        };
+      }
+    };
+  }
+
+  private void init() {
+    elementClass = (Class<R>) elementReduceOp.createInitialValue().getClass();
+  }
+
+  @Override
+  public ArrayWritable<R> createInitialValue() {
+    R[] values = (R[]) Array.newInstance(elementClass, fixedSize);
+    for (int i = 0; i < fixedSize; i++) {
+      values[i] = elementReduceOp.createInitialValue();
+    }
+    return new ArrayWritable<>(elementClass, values);
+  }
+
+  @Override
+  public ArrayWritable<R> reduce(
+      ArrayWritable<R> curValue, Pair<IntRef, S> valueToReduce) {
+    int index = valueToReduce.getLeft().value;
+    curValue.get()[index] =
+        elementReduceOp.reduce(curValue.get()[index], valueToReduce.getRight());
+    return curValue;
+  }
+
+  @Override
+  public ArrayWritable<R> reduceMerge(
+      ArrayWritable<R> curValue, ArrayWritable<R> valueToReduce) {
+    for (int i = 0; i < fixedSize; i++) {
+      curValue.get()[i] =
+          elementReduceOp.reduceMerge(
+              curValue.get()[i], valueToReduce.get()[i]);
+    }
+    return curValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(fixedSize);
+    WritableUtils.writeWritableObject(elementReduceOp, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    fixedSize = in.readInt();
+    elementReduceOp = WritableUtils.readWritableObject(in, null);
+    init();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java
new file mode 100644
index 0000000..91ced16
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.array;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Efficient generic primitive array reduce operation.
+ *
+ * Allows two modes - fixed size, and infinite size
+ * (with keeping only actually used elements and resizing)
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public class BasicArrayReduce<S, R extends Writable>
+    implements ReduceOperation<Pair<IntRef, S>, BasicArrayList<R>> {
+  private int fixedSize;
+  private PrimitiveTypeOps<R> typeOps;
+  private ReduceOperation<S, R> elementReduceOp;
+  private R initialElement;
+  private R reusable;
+  private R reusable2;
+
+  public BasicArrayReduce() {
+  }
+
+
+  /**
+   * Create ReduceOperation that reduces BasicArrays by reducing individual
+   * elements, with predefined size.
+   *
+   * @param fixedSize Number of elements
+   * @param typeOps TypeOps of individual elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   */
+  public BasicArrayReduce(
+      int fixedSize,
+      PrimitiveTypeOps<R> typeOps,
+      ReduceOperation<S, R> elementReduceOp) {
+    this.fixedSize = fixedSize;
+    this.typeOps = typeOps;
+    this.elementReduceOp = elementReduceOp;
+    init();
+  }
+
+
+  /**
+   * Create ReduceOperation that reduces BasicArrays by reducing individual
+   * elements, with unbounded size.
+   *
+   * @param typeOps TypeOps of individual elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   */
+  public BasicArrayReduce(
+      PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp) {
+    this(-1, typeOps, elementReduceOp);
+  }
+
+
+  /**
+   * Registers one new local reducer, that will reduce BasicArray,
+   * by reducing individual elements using {@code elementReduceOp},
+   * with unbounded size.
+   *
+   * This function will return ReducerArrayHandle, by which
+   * individual elements can be manipulated separately.
+   *
+   * @param typeOps TypeOps of individual elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   * @param reduceApi API for creating reducers
+   * @return Created ReducerArrayHandle
+   */
+  public static <S, R extends Writable>
+  ReducerArrayHandle<S, R> createLocalArrayHandles(
+      PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
+      CreateReducersApi reduceApi) {
+    return createLocalArrayHandles(-1, typeOps, elementReduceOp, reduceApi);
+  }
+
+  /**
+   * Registers one new local reducer, that will reduce BasicArray,
+   * by reducing individual elements using {@code elementReduceOp},
+   * with predefined size.
+   *
+   * This function will return ReducerArrayHandle, by which
+   * individual elements can be manipulated separately.
+   *
+   * @param fixedSize Number of elements
+   * @param typeOps TypeOps of individual elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   * @param reduceApi API for creating reducers
+   * @return Created ReducerArrayHandle
+   */
+  public static <S, R extends Writable>
+  ReducerArrayHandle<S, R> createLocalArrayHandles(
+      int fixedSize, PrimitiveTypeOps<R> typeOps,
+      ReduceOperation<S, R> elementReduceOp,
+      final CreateReducersApi reduceApi) {
+    return createArrayHandles(fixedSize, typeOps, elementReduceOp,
+        new CreateReducerFunctionApi() {
+          @Override
+          public <S, R extends Writable> ReducerHandle<S, R> createReducer(
+              ReduceOperation<S, R> reduceOp) {
+            return reduceApi.createLocalReducer(reduceOp);
+          }
+        });
+  }
+
+  /**
+   * Registers one new reducer, that will reduce BasicArray,
+   * by reducing individual elements using {@code elementReduceOp},
+   * with unbounded size.
+   *
+   * This function will return ReducerArrayHandle, by which
+   * individual elements can be manipulated separately.
+   *
+   * @param typeOps TypeOps of individual elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   * @param createFunction Function for creating a reducer
+   * @return Created ReducerArrayHandle
+   */
+  public static <S, R extends Writable>
+  ReducerArrayHandle<S, R> createArrayHandles(
+      PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
+      CreateReducerFunctionApi createFunction) {
+    return createArrayHandles(-1, typeOps, elementReduceOp, createFunction);
+  }
+
+  /**
+   * Registers one new reducer, that will reduce BasicArray,
+   * by reducing individual elements using {@code elementReduceOp},
+   * with predefined size.
+   *
+   * This function will return ReducerArrayHandle, by which
+   * individual elements can be manipulated separately.
+   *
+   * @param fixedSize Number of elements
+   * @param typeOps TypeOps of individual elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   * @param createFunction Function for creating a reducer
+   * @return Created ReducerArrayHandle
+   */
+  public static <S, R extends Writable>
+  ReducerArrayHandle<S, R> createArrayHandles(
+      final int fixedSize, final PrimitiveTypeOps<R> typeOps,
+      ReduceOperation<S, R> elementReduceOp,
+      CreateReducerFunctionApi createFunction) {
+    final ReducerHandle<Pair<IntRef, S>, BasicArrayList<R>> reduceHandle =
+        createFunction.createReducer(
+            new BasicArrayReduce<>(fixedSize, typeOps, elementReduceOp));
+    final IntRef curIndex = new IntRef(0);
+    final R reusableValue = typeOps.create();
+    final R initialValue = elementReduceOp.createInitialValue();
+    final MutablePair<IntRef, S> reusablePair =
+        MutablePair.of(new IntRef(0), null);
+    final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
+      @Override
+      public R getReducedValue(MasterGlobalCommUsage master) {
+        BasicArrayList<R> result = reduceHandle.getReducedValue(master);
+        if (fixedSize == -1 && curIndex.value >= result.size()) {
+          typeOps.set(reusableValue, initialValue);
+        } else {
+          result.getInto(curIndex.value, reusableValue);
+        }
+        return reusableValue;
+      }
+
+      @Override
+      public void reduce(S valueToReduce) {
+        reusablePair.getLeft().value = curIndex.value;
+        reusablePair.setRight(valueToReduce);
+        reduceHandle.reduce(reusablePair);
+      }
+
+      @Override
+      public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
+        throw new UnsupportedOperationException();
+      }
+    };
+
+    return new ReducerArrayHandle<S, R>() {
+      @Override
+      public ReducerHandle<S, R> get(int index) {
+        curIndex.value = index;
+        return elementReduceHandle;
+      }
+
+      @Override
+      public int getStaticSize() {
+        if (fixedSize == -1) {
+          throw new UnsupportedOperationException(
+              "Cannot call size, when one is not specified upfront");
+        }
+        return fixedSize;
+      }
+
+      @Override
+      public int getReducedSize(BlockMasterApi master) {
+        return reduceHandle.getReducedValue(master).size();
+      }
+
+      @Override
+      public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
+        final BroadcastHandle<BasicArrayList<R>> broadcastHandle =
+            reduceHandle.broadcastValue(master);
+        final IntRef curIndex = new IntRef(0);
+        final R reusableValue = typeOps.create();
+        final BroadcastHandle<R>
+        elementBroadcastHandle = new BroadcastHandle<R>() {
+          @Override
+          public R getBroadcast(WorkerBroadcastUsage worker) {
+            BasicArrayList<R> result = broadcastHandle.getBroadcast(worker);
+            if (fixedSize == -1 && curIndex.value >= result.size()) {
+              typeOps.set(reusableValue, initialValue);
+            } else {
+              result.getInto(curIndex.value, reusableValue);
+            }
+            return reusableValue;
+          }
+        };
+        return new BroadcastArrayHandle<R>() {
+          @Override
+          public BroadcastHandle<R> get(int index) {
+            curIndex.value = index;
+            return elementBroadcastHandle;
+          }
+
+          @Override
+          public int getStaticSize() {
+            if (fixedSize == -1) {
+              throw new UnsupportedOperationException(
+                  "Cannot call size, when one is not specified upfront");
+            }
+            return fixedSize;
+          }
+
+          @Override
+          public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+            return broadcastHandle.getBroadcast(worker).size();
+          }
+        };
+      }
+    };
+  }
+
+
+  private void init() {
+    initialElement = elementReduceOp.createInitialValue();
+    reusable = typeOps.create();
+    reusable2 = typeOps.create();
+  }
+
+  @Override
+  public BasicArrayList<R> createInitialValue() {
+    if (fixedSize != -1) {
+      BasicArrayList<R> list = typeOps.createArrayList(fixedSize);
+      fill(list, fixedSize);
+      return list;
+    } else {
+      return typeOps.createArrayList(1);
+    }
+  }
+
+  private void fill(BasicArrayList<R> list, int newSize) {
+    if (fixedSize != -1 && newSize > fixedSize) {
+      throw new IllegalArgumentException(newSize + " larger then " + fixedSize);
+    }
+
+    if (list.capacity() < newSize) {
+      list.setCapacity(newSize);
+    }
+    while (list.size() < newSize) {
+      list.add(initialElement);
+    }
+  }
+
+  @Override
+  public BasicArrayList<R> reduce(
+      BasicArrayList<R> curValue, Pair<IntRef, S> valueToReduce) {
+    int index = valueToReduce.getLeft().value;
+    fill(curValue, index + 1);
+    curValue.getInto(index, reusable);
+    R result = elementReduceOp.reduce(reusable, valueToReduce.getRight());
+    curValue.set(index, result);
+    return curValue;
+  }
+
+  @Override
+  public BasicArrayList<R> reduceMerge(
+      BasicArrayList<R> curValue, BasicArrayList<R> valueToReduce) {
+    fill(curValue, valueToReduce.size());
+    for (int i = 0; i < valueToReduce.size(); i++) {
+      valueToReduce.getInto(i, reusable2);
+      curValue.getInto(i, reusable);
+      R result = elementReduceOp.reduceMerge(reusable, reusable2);
+      curValue.set(i, result);
+    }
+
+    return curValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(fixedSize);
+    TypeOpsUtils.writeTypeOps(typeOps, out);
+    WritableUtils.writeWritableObject(elementReduceOp, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    fixedSize = in.readInt();
+    typeOps = TypeOpsUtils.readTypeOps(in);
+    elementReduceOp = WritableUtils.readWritableObject(in, null);
+    init();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
new file mode 100644
index 0000000..be5d4fe
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.array;
+
+import java.util.ArrayList;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
+import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfBroadcasts;
+import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfReducers;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.function.ObjectHolder;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.utils.ArrayWritable;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Utility class when we are dealing with huge arrays (i.e. large number of
+ * elements) within reducing/broadcasting.
+ *
+ * In Giraph, for each reducer there is a worker machine which is it's owner,
+ * which does partial aggregation for it. So if we have only single huge
+ * reducer - other workers will have to wait, while that single worker is doing
+ * huge reducing operation.
+ * On the other hand, each reducer has a meaningful overhead, so we should try
+ * to keep number of reducers as low as possible (in total less then 10k is a
+ * good number).
+ * What we want is to split such huge reducers into slightly more then number
+ * of worker reducers, and NUM_REDUCERS = 50000 is used here as a good middle
+ * ground.
+ *
+ * So when we have huge array, we don't want one reducer/broadcast for each
+ * element, but we also don't want one reducer/broadcast for the whole array.
+ *
+ * This class allows transparent split into reasonable number of reducers
+ * (~50000), which solves both of the above issues.
+ */
+public class HugeArrayUtils {
+  // Striping perfectly reducers of up to 25GB (i.e. 500KB * NUM_STRIPES).
+  private static final IntConfOption NUM_STRIPES = new IntConfOption(
+      "giraph.reducers.HugeArrayUtils.num_stripes", 50000,
+      "Number of distict reducers to create. If array is smaller then this" +
+      "number, each element will be it's own reducer");
+
+  private HugeArrayUtils() { }
+
+  /**
+   * Create global array of reducers, by splitting the huge array
+   * into NUM_STRIPES number of parts.
+   *
+   * @param fixedSize Number of elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   * @param reduceApi Api for creating reducers
+   * @return Created ReducerArrayHandle
+   */
+  public static <S, R extends Writable>
+  ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
+      final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
+      final CreateReducersApi reduceApi) {
+    return createGlobalReducerArrayHandle(
+        fixedSize, elementReduceOp, reduceApi,
+        NUM_STRIPES.get(reduceApi.getConf()));
+  }
+
+  /**
+   * Create global array of reducers, by splitting the huge array
+   * into {@code maxNumStripes} number of parts.
+   *
+   * @param fixedSize Number of elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   * @param reduceApi Api for creating reducers
+   * @param maxNumStripes Maximal number of reducers to create.
+   * @return Created ReducerArrayHandle
+   */
+  public static <S, R extends Writable>
+  ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
+      final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
+      final CreateReducersApi reduceApi, int maxNumStripes) {
+    PrimitiveTypeOps<R> typeOps = TypeOpsUtils.getPrimitiveTypeOpsOrNull(
+        (Class<R>) elementReduceOp.createInitialValue().getClass());
+
+    final CreateReducerFunctionApi
+    createReducer = new CreateReducerFunctionApi() {
+      @Override
+      public <S, R extends Writable> ReducerHandle<S, R> createReducer(
+          ReduceOperation<S, R> reduceOp) {
+        return reduceApi.createGlobalReducer(reduceOp);
+      }
+    };
+
+    if (fixedSize < maxNumStripes) {
+      return new ArrayOfReducers<>(
+          fixedSize,
+          new Supplier<ReducerHandle<S, R>>() {
+            @Override
+            public ReducerHandle<S, R> get() {
+              return createReducer.createReducer(elementReduceOp);
+            }
+          });
+    } else {
+      final ObjectStriping striping =
+          new ObjectStriping(fixedSize, maxNumStripes);
+
+      final ArrayList<ReducerArrayHandle<S, R>> handles =
+          new ArrayList<>(striping.getSplits());
+      for (int i = 0; i < striping.getSplits(); i++) {
+        if (typeOps != null) {
+          handles.add(BasicArrayReduce.createArrayHandles(
+              striping.getSplitSize(i), typeOps,
+              elementReduceOp, createReducer));
+        } else {
+          handles.add(ArrayReduce.createArrayHandles(
+              striping.getSplitSize(i), elementReduceOp, createReducer));
+        }
+      }
+
+      return new ReducerArrayHandle<S, R>() {
+        @Override
+        public ReducerHandle<S, R> get(int index) {
+          if ((index >= fixedSize) || (index < 0)) {
+            throw new RuntimeException(
+                "Reducer Access out of bounds: requested : " +
+                    index + " from array of size : " + fixedSize);
+          }
+          int reducerIndex = striping.getSplitIndex(index);
+          int insideIndex = striping.getInsideIndex(index);
+          return handles.get(reducerIndex).get(insideIndex);
+        }
+
+        @Override
+        public int getStaticSize() {
+          return fixedSize;
+        }
+
+        @Override
+        public int getReducedSize(BlockMasterApi master) {
+          return getStaticSize();
+        }
+
+        @Override
+        public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
+          throw new UnsupportedOperationException("for now not supported");
+        }
+      };
+    }
+  }
+
+  /**
+   * Broadcast a huge array, by splitting into NUM_STRIPES number of parts.
+   *
+   * @param count Number of elements
+   * @param valueSupplier Supplier of value to be broadcasted for a given index
+   * @param master Master API
+   * @return Created BroadcastArrayHandle
+   */
+  public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
+      final int count,
+      final Int2ObjFunction<V> valueSupplier,
+      final BlockMasterApi master) {
+    return broadcast(count, valueSupplier, null, master);
+  }
+
+  /**
+   * Broadcast a huge array, by splitting into NUM_STRIPES number of parts.
+   * Efficient for primitive types, using BasicArray underneath.
+   *
+   * @param count Number of elements
+   * @param valueSupplier Supplier of value to be broadcasted for a given index
+   * @param typeOps Element TypeOps
+   * @param master Master API
+   * @return Created BroadcastArrayHandle
+   */
+  public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
+      final int count,
+      final Int2ObjFunction<V> valueSupplier,
+      final PrimitiveTypeOps<V> typeOps,
+      final BlockMasterApi master) {
+    int numStripes = NUM_STRIPES.get(master.getConf());
+    if (count < numStripes) {
+      return new ArrayOfBroadcasts<>(
+          count,
+          new Int2ObjFunction<BroadcastHandle<V>>() {
+            @Override
+            public BroadcastHandle<V> apply(int i) {
+              // We create a copy because the valueSupplier might return a
+              // reusable obj. This function is NOT safe if typeOps is null
+              // & valueSupplier returns reusable
+              return master.broadcast(
+                typeOps != null ?
+                typeOps.createCopy(valueSupplier.apply(i)) :
+                valueSupplier.apply(i));
+            }
+          });
+    } else {
+      ObjectStriping striping = new ObjectStriping(count, numStripes);
+      final Int2ObjFunction<BroadcastHandle<V>> handleSupplier;
+
+      if (typeOps != null) {
+        handleSupplier = getPrimitiveBroadcastHandleSupplier(
+            valueSupplier, typeOps, master, striping);
+      } else {
+        handleSupplier = getObjectBroadcastHandleSupplier(
+            valueSupplier, master, striping);
+      }
+      return new BroadcastArrayHandle<V>() {
+        @Override
+        public BroadcastHandle<V> get(int index) {
+          if (index >= count || index < 0) {
+            throw new RuntimeException(
+                "Broadcast Access out of bounds: requested: " +
+                  index + " from array of size : " + count);
+          }
+          return handleSupplier.apply(index);
+        }
+
+        @Override
+        public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+          return count;
+        }
+
+        @Override
+        public int getStaticSize() {
+          return count;
+        }
+      };
+    }
+  }
+
+  private static <V extends Writable>
+  Int2ObjFunction<BroadcastHandle<V>> getObjectBroadcastHandleSupplier(
+      final Int2ObjFunction<V> valueSupplier,
+      final BlockMasterApi master, final ObjectStriping striping) {
+    final ObjectHolder<Class<V>> elementClass = new ObjectHolder<>();
+    final ArrayOfHandles<BroadcastHandle<ArrayWritable<V>>> arrayOfBroadcasts =
+      new ArrayOfHandles<>(
+        striping.getSplits(),
+        new Int2ObjFunction<BroadcastHandle<ArrayWritable<V>>>() {
+          @Override
+          public BroadcastHandle<ArrayWritable<V>> apply(int value) {
+            int size = striping.getSplitSize(value);
+            int start = striping.getSplitStart(value);
+            V[] array = (V[]) new Writable[size];
+            for (int i = 0; i < size; i++) {
+              array[i] = valueSupplier.apply(start + i);
+              if (elementClass.get() == null) {
+                elementClass.apply((Class<V>) array[i].getClass());
+              }
+            }
+            return master.broadcast(
+                new ArrayWritable<>(elementClass.get(), array));
+          }
+        });
+
+    final IntRef insideIndex = new IntRef(-1);
+    final ObjectHolder<BroadcastHandle<ArrayWritable<V>>> handleHolder =
+        new ObjectHolder<>();
+
+    final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
+      @Override
+      public V getBroadcast(WorkerBroadcastUsage worker) {
+        return handleHolder.get().getBroadcast(worker).get()[insideIndex.value];
+      }
+    };
+
+    return createBroadcastHandleSupplier(
+        striping, arrayOfBroadcasts, insideIndex, handleHolder,
+        reusableHandle);
+  }
+
+  private static <V extends Writable>
+  Int2ObjFunction<BroadcastHandle<V>> getPrimitiveBroadcastHandleSupplier(
+      final Int2ObjFunction<V> valueSupplier, final PrimitiveTypeOps<V> typeOps,
+      final BlockMasterApi master, final ObjectStriping striping) {
+    final ArrayOfHandles<BroadcastHandle<BasicArrayList<V>>> arrayOfBroadcasts =
+      new ArrayOfHandles<>(
+        striping.getSplits(),
+        new Int2ObjFunction<BroadcastHandle<BasicArrayList<V>>>() {
+          @Override
+          public BroadcastHandle<BasicArrayList<V>> apply(int value) {
+            int size = striping.getSplitSize(value);
+            int start = striping.getSplitStart(value);
+            BasicArrayList<V> array = typeOps.createArrayList(size);
+            for (int i = 0; i < size; i++) {
+              array.add(valueSupplier.apply(start + i));
+            }
+            return master.broadcast(array);
+          }
+        });
+
+    final IntRef insideIndex = new IntRef(-1);
+    final ObjectHolder<BroadcastHandle<BasicArrayList<V>>> handleHolder =
+            new ObjectHolder<>();
+    final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
+      private final V reusable = typeOps.create();
+      @Override
+      public V getBroadcast(WorkerBroadcastUsage worker) {
+        handleHolder.get().getBroadcast(worker).getInto(
+            insideIndex.value, reusable);
+        return reusable;
+      }
+    };
+
+    return createBroadcastHandleSupplier(
+        striping, arrayOfBroadcasts, insideIndex, handleHolder,
+        reusableHandle);
+  }
+
+  private static <V extends Writable, A>
+  Int2ObjFunction<BroadcastHandle<V>> createBroadcastHandleSupplier(
+      final ObjectStriping striping,
+      final ArrayOfHandles<BroadcastHandle<A>> arrayOfBroadcasts,
+      final IntRef insideIndex,
+      final ObjectHolder<BroadcastHandle<A>> handleHolder,
+      final BroadcastHandle<V> reusableHandle) {
+    final Int2ObjFunction<BroadcastHandle<V>> handleProvider =
+        new Int2ObjFunction<BroadcastHandle<V>>() {
+      @Override
+      public BroadcastHandle<V> apply(int index) {
+        int broadcastIndex = striping.getSplitIndex(index);
+        insideIndex.value = striping.getInsideIndex(index);
+        handleHolder.apply(arrayOfBroadcasts.get(broadcastIndex));
+        return reusableHandle;
+      }
+    };
+    return handleProvider;
+  }
+
+  /**
+   * Handles indices calculations when spliting one range into smaller number
+   * of splits, where indices stay consecutive.
+   */
+  static class ObjectStriping {
+    private final int splits;
+    private final int indicesPerObject;
+    private final int overflowNum;
+    private final int beforeOverflow;
+
+    public ObjectStriping(int size, int splits) {
+      this.splits = splits;
+      this.indicesPerObject = size / splits;
+      this.overflowNum = size % splits;
+      this.beforeOverflow = overflowNum * (indicesPerObject + 1);
+    }
+
+    public int getSplits() {
+      return splits;
+    }
+
+    public int getSplitSize(int splitIndex) {
+      return indicesPerObject + (splitIndex < overflowNum ? 1 : 0);
+    }
+
+    public int getSplitStart(int splitIndex) {
+      if (splitIndex < overflowNum) {
+        return splitIndex * (indicesPerObject + 1);
+      } else {
+        return beforeOverflow + (splitIndex - overflowNum) * indicesPerObject;
+      }
+    }
+
+    public int getSplitIndex(int objectIndex) {
+      if (objectIndex < beforeOverflow) {
+        return objectIndex / (indicesPerObject + 1);
+      } else {
+        return (objectIndex - beforeOverflow) / indicesPerObject + overflowNum;
+      }
+    }
+
+    public int getInsideIndex(int objectIndex) {
+      if (objectIndex < beforeOverflow) {
+        return objectIndex % (indicesPerObject + 1);
+      } else {
+        return (objectIndex - beforeOverflow) % indicesPerObject;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java
new file mode 100644
index 0000000..33f8a24
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Reducers for collecting arrays of objects.
+ */
+package org.apache.giraph.block_app.reducers.array;

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java
new file mode 100644
index 0000000..13dd153
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.types.ops.collections.ResettableIterator;
+import org.apache.giraph.utils.WritableUtils;
+
+/**
+ * Collect primitive values reduce operation
+ *
+ * @param <S> Primitive Writable type, which has its type ops
+ */
+public class CollectPrimitiveReduceOperation<S>
+    extends KryoWrappedReduceOperation<S, BasicArrayList<S>> {
+  /**
+   * Type ops if available, or null
+   */
+  private PrimitiveTypeOps<S> typeOps;
+
+  /** For reflection only */
+  public CollectPrimitiveReduceOperation() {
+  }
+
+  public CollectPrimitiveReduceOperation(PrimitiveTypeOps<S> typeOps) {
+    this.typeOps = typeOps;
+  }
+
+  @Override
+  public BasicArrayList<S> createValue() {
+    return createList();
+  }
+
+  @Override
+  public void reduce(BasicArrayList<S> reduceInto, S value) {
+    reduceInto.add(value);
+  }
+
+  @Override
+  public void reduceMerge(BasicArrayList<S> reduceInto,
+      BasicArrayList<S> toReduce) {
+    ResettableIterator<S> iterator = toReduce.fastIterator();
+    while (iterator.hasNext()) {
+      reduceInto.add(iterator.next());
+    }
+  }
+
+  public BasicArrayList<S> createList() {
+    return typeOps.createArrayList();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeClass(typeOps.getTypeClass(), out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    typeOps = TypeOpsUtils.getPrimitiveTypeOps(
+        WritableUtils.<S>readClass(in));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java
new file mode 100644
index 0000000..304ac47
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
+
+/**
+ * Collect values reduce operation
+ *
+ * @param <S> Type of values to collect
+ */
+public class CollectReduceOperation<S>
+    extends KryoWrappedReduceOperation<S, List<S>> {
+  @Override
+  public List<S> createValue() {
+    return createList();
+  }
+
+  @Override
+  public void reduce(List<S> reduceInto, S value) {
+    reduceInto.add(value);
+  }
+
+  @Override
+  public void reduceMerge(List<S> reduceInto, List<S> toReduce) {
+    reduceInto.addAll(toReduce);
+  }
+
+  public List<S> createList() {
+    return new ArrayList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java
new file mode 100644
index 0000000..b29b297
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+
+/**
+ * ShardedReducerHandle where we keep a list of reduced values,
+ * when primitives are used
+ *
+ * @param <S> Single value type
+ */
+public class CollectShardedPrimitiveReducerHandle<S>
+    extends ShardedReducerHandle<S, BasicArrayList<S>> {
+  /**
+   * Type ops if available, or null
+   */
+  private final PrimitiveTypeOps<S> typeOps;
+
+  public CollectShardedPrimitiveReducerHandle(final CreateReducersApi reduceApi,
+      Class<S> valueClass) {
+    typeOps = TypeOpsUtils.getPrimitiveTypeOps(valueClass);
+    register(reduceApi);
+  }
+
+  @Override
+  public ReduceOperation<S, KryoWritableWrapper<BasicArrayList<S>>>
+  createReduceOperation() {
+    return new CollectPrimitiveReduceOperation<>(typeOps);
+  }
+
+  @Override
+  public BasicArrayList<S> createReduceResult(MasterGlobalCommUsage master) {
+    int size = 0;
+    for (int i = 0; i < REDUCER_COUNT; i++) {
+      size += reducers.get(i).getReducedValue(master).get().size();
+    }
+    return createList(size);
+  }
+
+  public BasicArrayList<S> createList(int size) {
+    return typeOps.createArrayList(size);
+  }
+
+  @Override
+  public BroadcastHandle<BasicArrayList<S>> createBroadcastHandle(
+      BroadcastArrayHandle<KryoWritableWrapper<BasicArrayList<S>>> broadcasts) {
+    return new CollectShardedPrimitiveBroadcastHandle(broadcasts);
+  }
+
+  /**
+   * Broadcast handle for CollectShardedPrimitiveReducerHandle
+   */
+  public class CollectShardedPrimitiveBroadcastHandle
+      extends ShardedBroadcastHandle {
+    public CollectShardedPrimitiveBroadcastHandle(
+        BroadcastArrayHandle<KryoWritableWrapper<BasicArrayList<S>>>
+            broadcasts) {
+      super(broadcasts);
+    }
+
+    @Override
+    public BasicArrayList<S> createBroadcastResult(
+        WorkerBroadcastUsage worker) {
+      int size = 0;
+      for (int i = 0; i < REDUCER_COUNT; i++) {
+        size += broadcasts.get(i).getBroadcast(worker).get().size();
+      }
+      return createList(size);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java
new file mode 100644
index 0000000..5132ecf
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+
+/**
+ * ShardedReducerHandle where we keep a list of reduced values
+ *
+ * @param <S> Single value type
+ */
+public class CollectShardedReducerHandle<S>
+    extends ShardedReducerHandle<S, List<S>> {
+  public CollectShardedReducerHandle(CreateReducersApi reduceApi) {
+    register(reduceApi);
+  }
+
+  @Override
+  public ReduceOperation<S, KryoWritableWrapper<List<S>>>
+  createReduceOperation() {
+    return new CollectReduceOperation<>();
+  }
+
+  @Override
+  public List<S> createReduceResult(MasterGlobalCommUsage master) {
+    int size = 0;
+    for (int i = 0; i < REDUCER_COUNT; i++) {
+      size += reducers.get(i).getReducedValue(master).get().size();
+    }
+    return createList(size);
+  }
+
+  public List<S> createList(int size) {
+    return new ArrayList<S>(size);
+  }
+
+  @Override
+  public BroadcastHandle<List<S>> createBroadcastHandle(
+      BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) {
+    return new CollectShardedBroadcastHandle(broadcasts);
+  }
+
+  /**
+   * BroadcastHandle for CollectShardedReducerHandle
+   */
+  public class CollectShardedBroadcastHandle extends ShardedBroadcastHandle {
+    public CollectShardedBroadcastHandle(
+        BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) {
+      super(broadcasts);
+    }
+
+    @Override
+    public List<S> createBroadcastResult(WorkerBroadcastUsage worker) {
+      int size = 0;
+      for (int i = 0; i < REDUCER_COUNT; i++) {
+        size += broadcasts.get(i).getBroadcast(worker).get().size();
+      }
+      return createList(size);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java
new file mode 100644
index 0000000..3222c17
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+
+/**
+ * ShardedReducerHandle where we keep a list of reduced values,
+ * and values consist of multiple primitives, so we keep one primitive
+ * list for each
+ */
+@SuppressWarnings("unchecked")
+public class CollectShardedTuplesOfPrimitivesReducerHandle
+extends ShardedReducerHandle<List<Object>, List<BasicArrayList>> {
+  /**
+   * Type ops if available, or null
+   */
+  private final List<PrimitiveTypeOps> typeOpsList;
+
+  public CollectShardedTuplesOfPrimitivesReducerHandle(
+      final CreateReducersApi reduceApi, Class<?>... valueClasses) {
+    typeOpsList = new ArrayList<>();
+    for (Class<?> valueClass : valueClasses) {
+      typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(valueClass));
+    }
+    register(reduceApi);
+  }
+
+  public List<Object> createSingleValue() {
+    List<Object> ret = new ArrayList<>();
+    for (PrimitiveTypeOps typeOps : typeOpsList) {
+      ret.add(typeOps.create());
+    }
+    return ret;
+  }
+
+  @Override
+  public ReduceOperation<List<Object>,
+      KryoWritableWrapper<List<BasicArrayList>>> createReduceOperation() {
+    return new CollectTuplesOfPrimitivesReduceOperation(typeOpsList);
+  }
+
+  @Override
+  public List<BasicArrayList> createReduceResult(
+      MasterGlobalCommUsage master) {
+    int size = 0;
+    for (int i = 0; i < REDUCER_COUNT; i++) {
+      size += reducers.get(i).getReducedValue(master).get().get(0).size();
+    }
+    return createLists(size);
+  }
+
+  public List<BasicArrayList> createLists(int size) {
+    List<BasicArrayList> ret = new ArrayList<>();
+    for (PrimitiveTypeOps typeOps : typeOpsList) {
+      ret.add(typeOps.createArrayList(size));
+    }
+    return ret;
+  }
+
+  @Override
+  public BroadcastHandle<List<BasicArrayList>> createBroadcastHandle(
+      BroadcastArrayHandle<KryoWritableWrapper<List<BasicArrayList>>>
+          broadcasts) {
+    return new CollectShardedTuplesOfPrimitivesBroadcastHandle(broadcasts);
+  }
+
+  /**
+   * BroadcastHandle for CollectShardedTuplesOfPrimitivesReducerHandle
+   */
+  public class CollectShardedTuplesOfPrimitivesBroadcastHandle
+      extends ShardedBroadcastHandle {
+    public CollectShardedTuplesOfPrimitivesBroadcastHandle(
+        BroadcastArrayHandle<KryoWritableWrapper<List<BasicArrayList>>>
+            broadcasts) {
+      super(broadcasts);
+    }
+
+    @Override
+    public List<BasicArrayList> createBroadcastResult(
+        WorkerBroadcastUsage worker) {
+      int size = 0;
+      for (int i = 0; i < REDUCER_COUNT; i++) {
+        size += broadcasts.get(i).getBroadcast(worker).get().size();
+      }
+      return createLists(size);
+    }
+  }
+
+  /**
+   * Reduce broadcast wrapper
+   */
+  public static class CollectShardedTuplesOfPrimitivesReduceBroadcast {
+    private CollectShardedTuplesOfPrimitivesReducerHandle reducerHandle;
+    private BroadcastHandle<List<BasicArrayList>> broadcastHandle;
+
+    /** Set reducer handle to just registered handle */
+    public void registeredReducer(CreateReducersApi reduceApi,
+        Class<?>... valueClasses) {
+      this.reducerHandle = new CollectShardedTuplesOfPrimitivesReducerHandle(
+          reduceApi, valueClasses);
+    }
+
+    public List<Object> createSingleValue() {
+      return reducerHandle.createSingleValue();
+    }
+
+    /** Reduce single value */
+    public void reduce(List<Object> valueToReduce) {
+      reducerHandle.reduce(valueToReduce);
+    }
+
+    /** Get reduced value */
+    public List<BasicArrayList> getReducedValue(MasterGlobalCommUsage master) {
+      return reducerHandle.getReducedValue(master);
+    }
+
+    /**
+     * Broadcast reduced value from master
+     */
+    public void broadcastValue(BlockMasterApi master) {
+      broadcastHandle = reducerHandle.broadcastValue(master);
+    }
+
+    /** Get broadcasted value */
+    public List<BasicArrayList> getBroadcast(WorkerBroadcastUsage worker) {
+      return broadcastHandle.getBroadcast(worker);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java
new file mode 100644
index 0000000..afaba7a
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.types.ops.collections.ResettableIterator;
+import org.apache.giraph.utils.WritableUtils;
+
+/**
+ * Collect tuples of primitive values reduce operation
+ */
+public class CollectTuplesOfPrimitivesReduceOperation
+    extends KryoWrappedReduceOperation<List<Object>, List<BasicArrayList>> {
+  /**
+   * Type ops if available, or null
+   */
+  private List<PrimitiveTypeOps> typeOpsList;
+
+  /** For reflection only */
+  public CollectTuplesOfPrimitivesReduceOperation() {
+  }
+
+  public CollectTuplesOfPrimitivesReduceOperation(
+      List<PrimitiveTypeOps> typeOpsList) {
+    this.typeOpsList = typeOpsList;
+  }
+
+  @Override
+  public List<BasicArrayList> createValue() {
+    List<BasicArrayList> ret = new ArrayList<>(typeOpsList.size());
+    for (PrimitiveTypeOps typeOps : typeOpsList) {
+      ret.add(typeOps.createArrayList());
+    }
+    return ret;
+  }
+
+  @Override
+  public void reduce(List<BasicArrayList> reduceInto, List<Object> value) {
+    for (int i = 0; i < reduceInto.size(); i++) {
+      reduceInto.get(i).add(value.get(i));
+    }
+  }
+
+  @Override
+  public void reduceMerge(List<BasicArrayList> reduceInto,
+      List<BasicArrayList> toReduce) {
+    for (int i = 0; i < reduceInto.size(); i++) {
+      ResettableIterator iterator = toReduce.get(i).fastIterator();
+      while (iterator.hasNext()) {
+        reduceInto.get(i).add(iterator.next());
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(typeOpsList.size());
+    for (PrimitiveTypeOps typeOps : typeOpsList) {
+      WritableUtils.writeClass(typeOps.getTypeClass(), out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int size = in.readInt();
+    typeOpsList = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(
+          WritableUtils.readClass(in)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java
new file mode 100644
index 0000000..0c17216
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+import org.apache.giraph.writable.kryo.TransientRandom;
+
+/**
+ * Reducing values into a list of reducers, randomly,
+ * and getting the results of all reducers together
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public abstract class ShardedReducerHandle<S, R>
+    implements ReducerHandle<S, R> {
+  // Use a prime number for number of reducers, large enough to make sure
+  // request sizes are within expected size (0.5MB)
+  protected static final int REDUCER_COUNT = 39989;
+
+  protected final TransientRandom random = new TransientRandom();
+
+  protected ArrayOfHandles.ArrayOfReducers<S, KryoWritableWrapper<R>> reducers;
+
+  public final void register(final CreateReducersApi reduceApi) {
+    reducers = new ArrayOfHandles.ArrayOfReducers<>(REDUCER_COUNT,
+        new Supplier<ReducerHandle<S, KryoWritableWrapper<R>>>() {
+          @Override
+          public ReducerHandle<S, KryoWritableWrapper<R>> get() {
+            return reduceApi.createLocalReducer(createReduceOperation());
+          }
+        });
+  }
+
+  @Override
+  public final void reduce(S value) {
+    reducers.get(random.nextInt(REDUCER_COUNT)).reduce(value);
+  }
+
+  @Override
+  public final R getReducedValue(MasterGlobalCommUsage master) {
+    KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
+        createReduceResult(master));
+    ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
+        createReduceOperation();
+    for (int i = 0; i < REDUCER_COUNT; i++) {
+      reduceOperation.reduceMerge(ret,
+          reducers.get(i).getReducedValue(master));
+    }
+    return ret.get();
+  }
+
+  public abstract ReduceOperation<S, KryoWritableWrapper<R>>
+  createReduceOperation();
+
+  public R createReduceResult(MasterGlobalCommUsage master) {
+    return createReduceOperation().createInitialValue().get();
+  }
+
+  public BroadcastHandle<R> createBroadcastHandle(
+      BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
+    return new ShardedBroadcastHandle(broadcasts);
+  }
+
+  @Override
+  public final BroadcastHandle<R> broadcastValue(BlockMasterApi masterApi) {
+    return createBroadcastHandle(reducers.broadcastValue(masterApi));
+  }
+
+  /**
+   * Broadcast for ShardedReducerHandle
+   */
+  public class ShardedBroadcastHandle implements BroadcastHandle<R> {
+    protected final BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts;
+
+    public ShardedBroadcastHandle(
+        BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
+      this.broadcasts = broadcasts;
+    }
+
+    public R createBroadcastResult(WorkerBroadcastUsage worker) {
+      return createReduceOperation().createInitialValue().get();
+    }
+
+    @Override
+    public final R getBroadcast(WorkerBroadcastUsage worker) {
+      KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
+          createBroadcastResult(worker));
+      ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
+          createReduceOperation();
+      for (int i = 0; i < REDUCER_COUNT; i++) {
+        reduceOperation.reduceMerge(ret,
+            broadcasts.get(i).getBroadcast(worker));
+      }
+      return ret.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java
new file mode 100644
index 0000000..dc640f7
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Reducers for distributed collection of objects.
+ */
+package org.apache.giraph.block_app.reducers.collect;

http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java
new file mode 100644
index 0000000..0e1e113
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.map;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.map.BroadcastMapHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.map.ReducerMapHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.WritableWriter;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Efficient generic primitive map of values reduce operation.
+ * (it is BasicMap Reduce, not to be confused with MapReduce)
+ *
+ * @param <K> Key type
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public class BasicMapReduce<K extends WritableComparable, S,
+    R extends Writable>
+    implements ReduceOperation<Pair<K, S>, Basic2ObjectMap<K, R>> {
+  private PrimitiveIdTypeOps<K> keyTypeOps;
+  private PrimitiveTypeOps<R> typeOps;
+  private ReduceOperation<S, R> elementReduceOp;
+  private WritableWriter<R> writer;
+
+  public BasicMapReduce() {
+  }
+
+  /**
+   * Create ReduceOperation that reduces BasicMaps by reducing individual
+   * elements corresponding to the same key.
+   *
+   * @param keyTypeOps TypeOps of keys
+   * @param typeOps TypeOps of individual elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   */
+  public BasicMapReduce(
+      PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
+      ReduceOperation<S, R> elementReduceOp) {
+    this.keyTypeOps = keyTypeOps;
+    this.typeOps = typeOps;
+    this.elementReduceOp = elementReduceOp;
+    init();
+  }
+
+  /**
+   * Registers one new local reducer, that will reduce BasicMap,
+   * by reducing individual elements corresponding to the same key
+   * using {@code elementReduceOp}.
+   *
+   * This function will return ReducerMapHandle, by which
+   * individual elements can be manipulated separately.
+   *
+   * @param keyTypeOps TypeOps of keys
+   * @param typeOps TypeOps of individual elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   * @param reduceApi API for creating reducers
+   * @return Created ReducerMapHandle
+   */
+  public static <K extends WritableComparable, S, R extends Writable>
+  ReducerMapHandle<K, S, R> createLocalMapHandles(
+      PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
+      ReduceOperation<S, R> elementReduceOp,
+      final CreateReducersApi reduceApi) {
+    return createMapHandles(
+        keyTypeOps, typeOps, elementReduceOp,
+        new CreateReducerFunctionApi() {
+          @Override
+          public <S, R extends Writable> ReducerHandle<S, R> createReducer(
+              ReduceOperation<S, R> reduceOp) {
+            return reduceApi.createLocalReducer(reduceOp);
+          }
+        });
+  }
+
+  /**
+   * Registers one new reducer, that will reduce BasicMap,
+   * by reducing individual elements corresponding to the same key
+   * using {@code elementReduceOp}.
+   *
+   * This function will return ReducerMapHandle, by which
+   * individual elements can be manipulated separately.
+   *
+   * @param keyTypeOps TypeOps of keys
+   * @param typeOps TypeOps of individual elements
+   * @param elementReduceOp ReduceOperation for individual elements
+   * @param createFunction Function for creating a reducer
+   * @return Created ReducerMapHandle
+   */
+  public static <K extends WritableComparable, S, R extends Writable>
+  ReducerMapHandle<K, S, R> createMapHandles(
+      final PrimitiveIdTypeOps<K> keyTypeOps, final PrimitiveTypeOps<R> typeOps,
+      ReduceOperation<S, R> elementReduceOp,
+      CreateReducerFunctionApi createFunction) {
+    final ReducerHandle<Pair<K, S>, Basic2ObjectMap<K, R>> reduceHandle =
+      createFunction.createReducer(
+          new BasicMapReduce<>(keyTypeOps, typeOps, elementReduceOp));
+    final K curIndex = keyTypeOps.create();
+    final R reusableValue = typeOps.create();
+    final R initialValue = elementReduceOp.createInitialValue();
+    final MutablePair<K, S> reusablePair = MutablePair.of(null, null);
+    final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
+      @Override
+      public R getReducedValue(MasterGlobalCommUsage master) {
+        Basic2ObjectMap<K, R> result = reduceHandle.getReducedValue(master);
+        R value = result.get(curIndex);
+        if (value == null) {
+          typeOps.set(reusableValue, initialValue);
+        } else {
+          typeOps.set(reusableValue, value);
+        }
+        return reusableValue;
+      }
+
+      @Override
+      public void reduce(S valueToReduce) {
+        reusablePair.setLeft(curIndex);
+        reusablePair.setRight(valueToReduce);
+        reduceHandle.reduce(reusablePair);
+      }
+
+      @Override
+      public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
+        throw new UnsupportedOperationException();
+      }
+    };
+
+    return new ReducerMapHandle<K, S, R>() {
+      @Override
+      public ReducerHandle<S, R> get(K key) {
+        keyTypeOps.set(curIndex, key);
+        return elementReduceHandle;
+      }
+
+      @Override
+      public int getReducedSize(BlockMasterApi master) {
+        return reduceHandle.getReducedValue(master).size();
+      }
+
+      @Override
+      public BroadcastMapHandle<K, R> broadcastValue(BlockMasterApi master) {
+        final BroadcastHandle<Basic2ObjectMap<K, R>> broadcastHandle =
+          reduceHandle.broadcastValue(master);
+        final K curIndex = keyTypeOps.create();
+        final R reusableValue = typeOps.create();
+        final BroadcastHandle<R>
+        elementBroadcastHandle = new BroadcastHandle<R>() {
+          @Override
+          public R getBroadcast(WorkerBroadcastUsage worker) {
+            Basic2ObjectMap<K, R> result = broadcastHandle.getBroadcast(worker);
+            R value = result.get(curIndex);
+            if (value == null) {
+              typeOps.set(reusableValue, initialValue);
+            } else {
+              typeOps.set(reusableValue, value);
+            }
+            return reusableValue;
+          }
+        };
+        return new BroadcastMapHandle<K, R>() {
+          @Override
+          public BroadcastHandle<R> get(K key) {
+            keyTypeOps.set(curIndex, key);
+            return elementBroadcastHandle;
+          }
+
+          @Override
+          public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+            return broadcastHandle.getBroadcast(worker).size();
+          }
+        };
+      }
+    };
+  }
+
+  private void init() {
+    writer = new WritableWriter<R>() {
+      @Override
+      public void write(DataOutput out, R value) throws IOException {
+        value.write(out);
+      }
+
+      @Override
+      public R readFields(DataInput in) throws IOException {
+        R result = typeOps.create();
+        result.readFields(in);
+        return result;
+      }
+    };
+  }
+
+  @Override
+  public Basic2ObjectMap<K, R> createInitialValue() {
+    return keyTypeOps.create2ObjectOpenHashMap(writer);
+  }
+
+  @Override
+  public Basic2ObjectMap<K, R> reduce(
+      Basic2ObjectMap<K, R> curValue, Pair<K, S> valueToReduce) {
+    R result = curValue.get(valueToReduce.getLeft());
+    if (result == null) {
+      result = typeOps.create();
+    }
+    result = elementReduceOp.reduce(result, valueToReduce.getRight());
+    curValue.put(valueToReduce.getLeft(), result);
+    return curValue;
+  }
+
+  @Override
+  public Basic2ObjectMap<K, R> reduceMerge(
+      Basic2ObjectMap<K, R> curValue, Basic2ObjectMap<K, R> valueToReduce) {
+    for (Iterator<K> iter = valueToReduce.fastKeyIterator(); iter.hasNext();) {
+      K key = iter.next();
+
+      R result = curValue.get(key);
+      if (result == null) {
+        result = typeOps.create();
+      }
+      result = elementReduceOp.reduceMerge(result, valueToReduce.get(key));
+      curValue.put(key, result);
+    }
+    return curValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TypeOpsUtils.writeTypeOps(keyTypeOps, out);
+    TypeOpsUtils.writeTypeOps(typeOps, out);
+    WritableUtils.writeWritableObject(elementReduceOp, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    keyTypeOps = TypeOpsUtils.readTypeOps(in);
+    typeOps = TypeOpsUtils.readTypeOps(in);
+    elementReduceOp = WritableUtils.readWritableObject(in, null);
+    init();
+  }
+}