You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/26 02:39:46 UTC
[2/2] git commit: updated refs/heads/trunk to 77f8a07
[GIRAPH-1013] Adding reducer handle utilities
Summary: And more functional interfaces, and PairWritable
Test Plan: mvn clean install
Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo
Reviewed By: maja.kabiljo
Differential Revision: https://reviews.facebook.net/D40269
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/77f8a075
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/77f8a075
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/77f8a075
Branch: refs/heads/trunk
Commit: 77f8a075ccc029cb608a382f5deb7cc0b27b02e5
Parents: add1d4f
Author: Igor Kabiljo <ik...@fb.com>
Authored: Wed Jun 17 12:47:52 2015 -0700
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Thu Jun 25 17:39:32 2015 -0700
----------------------------------------------------------------------
giraph-block-app/pom.xml | 4 +
.../reducers/array/ArrayOfHandles.java | 127 ++++++
.../block_app/reducers/array/ArrayReduce.java | 211 ++++++++++
.../reducers/array/BasicArrayReduce.java | 353 ++++++++++++++++
.../reducers/array/HugeArrayUtils.java | 404 +++++++++++++++++++
.../block_app/reducers/array/package-info.java | 21 +
.../CollectPrimitiveReduceOperation.java | 84 ++++
.../collect/CollectReduceOperation.java | 50 +++
.../CollectShardedPrimitiveReducerHandle.java | 96 +++++
.../collect/CollectShardedReducerHandle.java | 85 ++++
...tShardedTuplesOfPrimitivesReducerHandle.java | 158 ++++++++
...ollectTuplesOfPrimitivesReduceOperation.java | 96 +++++
.../reducers/collect/ShardedReducerHandle.java | 123 ++++++
.../reducers/collect/package-info.java | 21 +
.../block_app/reducers/map/BasicMapReduce.java | 276 +++++++++++++
.../block_app/reducers/map/package-info.java | 21 +
.../giraph/block_app/reducers/package-info.java | 21 +
.../apache/giraph/function/TripleFunction.java | 41 ++
.../function/primitive/Obj2DoubleFunction.java | 33 ++
.../function/primitive/Obj2FloatFunction.java | 33 ++
.../function/primitive/Obj2LongFunction.java | 33 ++
.../reducers/array/ObjectStripingTest.java | 58 +++
giraph-core/pom.xml | 4 +
.../impl/KryoWrappedReduceOperation.java | 86 ++++
.../writable/tuple/DoubleDoubleWritable.java | 39 ++
.../writable/tuple/IntDoubleWritable.java | 40 ++
.../giraph/writable/tuple/IntIntWritable.java | 39 ++
.../giraph/writable/tuple/IntLongWritable.java | 40 ++
.../writable/tuple/LongDoubleWritable.java | 40 ++
.../giraph/writable/tuple/LongIntWritable.java | 40 ++
.../giraph/writable/tuple/LongLongWritable.java | 39 ++
.../giraph/writable/tuple/PairWritable.java | 113 ++++++
.../giraph/writable/tuple/package-info.java | 21 +
pom.xml | 6 +
34 files changed, 2856 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-block-app/pom.xml b/giraph-block-app/pom.xml
index 1f653bb..a05c1c5 100644
--- a/giraph-block-app/pom.xml
+++ b/giraph-block-app/pom.xml
@@ -87,6 +87,10 @@ under the License.
<dependencies>
<!-- compile dependencies. sorted lexicographically. -->
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java
new file mode 100644
index 0000000..053fd61
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.array;
+
+import java.util.ArrayList;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * ArrayHandle implemented as an array of individual handles.
+ *
+ * @param <H> Handle type
+ */
+public class ArrayOfHandles<H> implements ArrayHandle<H> {
+ protected final ArrayList<H> handles;
+
+ public ArrayOfHandles(int count, Supplier<H> reduceHandleFactory) {
+ handles = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ handles.add(reduceHandleFactory.get());
+ }
+ }
+
+ public ArrayOfHandles(int count, Int2ObjFunction<H> reduceHandleFactory) {
+ handles = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ handles.add(reduceHandleFactory.apply(i));
+ }
+ }
+
+ @Override
+ public H get(int index) {
+ return handles.get(index);
+ }
+
+ @Override
+ public int getStaticSize() {
+ return handles.size();
+ }
+
+ /**
+ * ReducerArrayHandle implemented as an array of separate reducer handles.
+ *
+ * @param <H> Handle type
+ */
+ public static class ArrayOfReducers<S, R>
+ extends ArrayOfHandles<ReducerHandle<S, R>>
+ implements ReducerArrayHandle<S, R> {
+
+ public ArrayOfReducers(
+ int count, Supplier<ReducerHandle<S, R>> reduceHandleFactory) {
+ super(count, reduceHandleFactory);
+ }
+
+ public ArrayOfReducers(
+ int count, Int2ObjFunction<ReducerHandle<S, R>> reduceHandleFactory) {
+ super(count, reduceHandleFactory);
+ }
+
+ @Override
+ public int getReducedSize(BlockMasterApi master) {
+ return getStaticSize();
+ }
+
+ @Override
+ public BroadcastArrayHandle<R> broadcastValue(final BlockMasterApi master) {
+ return new ArrayOfBroadcasts<>(
+ getStaticSize(),
+ new Int2ObjFunction<BroadcastHandle<R>>() {
+ @Override
+ public BroadcastHandle<R> apply(int index) {
+ return get(index).broadcastValue(master);
+ }
+ });
+ }
+ }
+
+ /**
+ * BroadcastArrayHandle implemented as an array of separate broadcast handles.
+ *
+ * @param <T> Handle type
+ */
+ public static class ArrayOfBroadcasts<T>
+ extends ArrayOfHandles<BroadcastHandle<T>>
+ implements BroadcastArrayHandle<T> {
+
+ public ArrayOfBroadcasts(
+ int count,
+ Int2ObjFunction<BroadcastHandle<T>> broadcastHandleFactory) {
+ super(count, broadcastHandleFactory);
+ }
+
+ public ArrayOfBroadcasts(
+ int count,
+ Supplier<BroadcastHandle<T>> broadcastHandleFactory) {
+ super(count, broadcastHandleFactory);
+ }
+
+ @Override
+ public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+ return getStaticSize();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java
new file mode 100644
index 0000000..f2cdf8c
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.array;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.ArrayWritable;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * One reducer representing reduction of array of individual values.
+ * Elements are represented as object, and so BasicArrayReduce should be
+ * used instead when elements are primitive types.
+ *
+ * @param <S> Single value type, objects passed on workers
+ * @param <R> Reduced value type
+ */
+public class ArrayReduce<S, R extends Writable>
+ implements ReduceOperation<Pair<IntRef, S>, ArrayWritable<R>> {
+ private int fixedSize;
+ private ReduceOperation<S, R> elementReduceOp;
+ private Class<R> elementClass;
+
+ public ArrayReduce() {
+ }
+
+ /**
+ * Create ReduceOperation that reduces arrays by reducing individual
+ * elements.
+ *
+ * @param fixedSize Number of elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ */
+ public ArrayReduce(int fixedSize, ReduceOperation<S, R> elementReduceOp) {
+ this.fixedSize = fixedSize;
+ this.elementReduceOp = elementReduceOp;
+ init();
+ }
+
+ /**
+ * Registers one new reducer, that will reduce array of objects,
+ * by reducing individual elements using {@code elementReduceOp}.
+ *
+ * This function will return ReducerArrayHandle to it, by which
+ * individual elements can be manipulated separately.
+ *
+ * @param fixedSize Number of elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ * @param createFunction Function for creating a reducer
+ * @return Created ReducerArrayHandle
+ */
+ public static <S, T extends Writable>
+ ReducerArrayHandle<S, T> createArrayHandles(
+ final int fixedSize, ReduceOperation<S, T> elementReduceOp,
+ CreateReducerFunctionApi createFunction) {
+ final ReducerHandle<Pair<IntRef, S>, ArrayWritable<T>> reduceHandle =
+ createFunction.createReducer(
+ new ArrayReduce<>(fixedSize, elementReduceOp));
+
+ final IntRef curIndex = new IntRef(0);
+ final MutablePair<IntRef, S> reusablePair =
+ MutablePair.of(new IntRef(0), null);
+ final ReducerHandle<S, T> elementReduceHandle = new ReducerHandle<S, T>() {
+ @Override
+ public T getReducedValue(MasterGlobalCommUsage master) {
+ ArrayWritable<T> result = reduceHandle.getReducedValue(master);
+ return result.get()[curIndex.value];
+ }
+
+ @Override
+ public void reduce(S valueToReduce) {
+ reusablePair.getLeft().value = curIndex.value;
+ reusablePair.setRight(valueToReduce);
+ reduceHandle.reduce(reusablePair);
+ }
+
+ @Override
+ public BroadcastHandle<T> broadcastValue(BlockMasterApi master) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ return new ReducerArrayHandle<S, T>() {
+ @Override
+ public ReducerHandle<S, T> get(int index) {
+ curIndex.value = index;
+ return elementReduceHandle;
+ }
+
+ @Override
+ public int getStaticSize() {
+ return fixedSize;
+ }
+
+ @Override
+ public int getReducedSize(BlockMasterApi master) {
+ return getStaticSize();
+ }
+
+ @Override
+ public BroadcastArrayHandle<T> broadcastValue(BlockMasterApi master) {
+ final BroadcastHandle<ArrayWritable<T>> broadcastHandle =
+ reduceHandle.broadcastValue(master);
+ final IntRef curIndex = new IntRef(0);
+ final BroadcastHandle<T>
+ elementBroadcastHandle = new BroadcastHandle<T>() {
+ @Override
+ public T getBroadcast(WorkerBroadcastUsage worker) {
+ ArrayWritable<T> result = broadcastHandle.getBroadcast(worker);
+ return result.get()[curIndex.value];
+ }
+ };
+ return new BroadcastArrayHandle<T>() {
+ @Override
+ public BroadcastHandle<T> get(int index) {
+ curIndex.value = index;
+ return elementBroadcastHandle;
+ }
+
+ @Override
+ public int getStaticSize() {
+ return fixedSize;
+ }
+
+ @Override
+ public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+ return getStaticSize();
+ }
+ };
+ }
+ };
+ }
+
+ private void init() {
+ elementClass = (Class<R>) elementReduceOp.createInitialValue().getClass();
+ }
+
+ @Override
+ public ArrayWritable<R> createInitialValue() {
+ R[] values = (R[]) Array.newInstance(elementClass, fixedSize);
+ for (int i = 0; i < fixedSize; i++) {
+ values[i] = elementReduceOp.createInitialValue();
+ }
+ return new ArrayWritable<>(elementClass, values);
+ }
+
+ @Override
+ public ArrayWritable<R> reduce(
+ ArrayWritable<R> curValue, Pair<IntRef, S> valueToReduce) {
+ int index = valueToReduce.getLeft().value;
+ curValue.get()[index] =
+ elementReduceOp.reduce(curValue.get()[index], valueToReduce.getRight());
+ return curValue;
+ }
+
+ @Override
+ public ArrayWritable<R> reduceMerge(
+ ArrayWritable<R> curValue, ArrayWritable<R> valueToReduce) {
+ for (int i = 0; i < fixedSize; i++) {
+ curValue.get()[i] =
+ elementReduceOp.reduceMerge(
+ curValue.get()[i], valueToReduce.get()[i]);
+ }
+ return curValue;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(fixedSize);
+ WritableUtils.writeWritableObject(elementReduceOp, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ fixedSize = in.readInt();
+ elementReduceOp = WritableUtils.readWritableObject(in, null);
+ init();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java
new file mode 100644
index 0000000..91ced16
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.array;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Efficient generic primitive array reduce operation.
+ *
+ * Allows two modes - fixed size, and infinite size
+ * (with keeping only actually used elements and resizing)
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public class BasicArrayReduce<S, R extends Writable>
+ implements ReduceOperation<Pair<IntRef, S>, BasicArrayList<R>> {
+ private int fixedSize;
+ private PrimitiveTypeOps<R> typeOps;
+ private ReduceOperation<S, R> elementReduceOp;
+ private R initialElement;
+ private R reusable;
+ private R reusable2;
+
+ public BasicArrayReduce() {
+ }
+
+
+ /**
+ * Create ReduceOperation that reduces BasicArrays by reducing individual
+ * elements, with predefined size.
+ *
+ * @param fixedSize Number of elements
+ * @param typeOps TypeOps of individual elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ */
+ public BasicArrayReduce(
+ int fixedSize,
+ PrimitiveTypeOps<R> typeOps,
+ ReduceOperation<S, R> elementReduceOp) {
+ this.fixedSize = fixedSize;
+ this.typeOps = typeOps;
+ this.elementReduceOp = elementReduceOp;
+ init();
+ }
+
+
+ /**
+ * Create ReduceOperation that reduces BasicArrays by reducing individual
+ * elements, with unbounded size.
+ *
+ * @param typeOps TypeOps of individual elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ */
+ public BasicArrayReduce(
+ PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp) {
+ this(-1, typeOps, elementReduceOp);
+ }
+
+
+ /**
+ * Registers one new local reducer, that will reduce BasicArray,
+ * by reducing individual elements using {@code elementReduceOp},
+ * with unbounded size.
+ *
+ * This function will return ReducerArrayHandle, by which
+ * individual elements can be manipulated separately.
+ *
+ * @param typeOps TypeOps of individual elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ * @param reduceApi API for creating reducers
+ * @return Created ReducerArrayHandle
+ */
+ public static <S, R extends Writable>
+ ReducerArrayHandle<S, R> createLocalArrayHandles(
+ PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
+ CreateReducersApi reduceApi) {
+ return createLocalArrayHandles(-1, typeOps, elementReduceOp, reduceApi);
+ }
+
+ /**
+ * Registers one new local reducer, that will reduce BasicArray,
+ * by reducing individual elements using {@code elementReduceOp},
+ * with predefined size.
+ *
+ * This function will return ReducerArrayHandle, by which
+ * individual elements can be manipulated separately.
+ *
+ * @param fixedSize Number of elements
+ * @param typeOps TypeOps of individual elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ * @param reduceApi API for creating reducers
+ * @return Created ReducerArrayHandle
+ */
+ public static <S, R extends Writable>
+ ReducerArrayHandle<S, R> createLocalArrayHandles(
+ int fixedSize, PrimitiveTypeOps<R> typeOps,
+ ReduceOperation<S, R> elementReduceOp,
+ final CreateReducersApi reduceApi) {
+ return createArrayHandles(fixedSize, typeOps, elementReduceOp,
+ new CreateReducerFunctionApi() {
+ @Override
+ public <S, R extends Writable> ReducerHandle<S, R> createReducer(
+ ReduceOperation<S, R> reduceOp) {
+ return reduceApi.createLocalReducer(reduceOp);
+ }
+ });
+ }
+
+ /**
+ * Registers one new reducer, that will reduce BasicArray,
+ * by reducing individual elements using {@code elementReduceOp},
+ * with unbounded size.
+ *
+ * This function will return ReducerArrayHandle, by which
+ * individual elements can be manipulated separately.
+ *
+ * @param typeOps TypeOps of individual elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ * @param createFunction Function for creating a reducer
+ * @return Created ReducerArrayHandle
+ */
+ public static <S, R extends Writable>
+ ReducerArrayHandle<S, R> createArrayHandles(
+ PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
+ CreateReducerFunctionApi createFunction) {
+ return createArrayHandles(-1, typeOps, elementReduceOp, createFunction);
+ }
+
+ /**
+ * Registers one new reducer, that will reduce BasicArray,
+ * by reducing individual elements using {@code elementReduceOp},
+ * with predefined size.
+ *
+ * This function will return ReducerArrayHandle, by which
+ * individual elements can be manipulated separately.
+ *
+ * @param fixedSize Number of elements
+ * @param typeOps TypeOps of individual elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ * @param createFunction Function for creating a reducer
+ * @return Created ReducerArrayHandle
+ */
+ public static <S, R extends Writable>
+ ReducerArrayHandle<S, R> createArrayHandles(
+ final int fixedSize, final PrimitiveTypeOps<R> typeOps,
+ ReduceOperation<S, R> elementReduceOp,
+ CreateReducerFunctionApi createFunction) {
+ final ReducerHandle<Pair<IntRef, S>, BasicArrayList<R>> reduceHandle =
+ createFunction.createReducer(
+ new BasicArrayReduce<>(fixedSize, typeOps, elementReduceOp));
+ final IntRef curIndex = new IntRef(0);
+ final R reusableValue = typeOps.create();
+ final R initialValue = elementReduceOp.createInitialValue();
+ final MutablePair<IntRef, S> reusablePair =
+ MutablePair.of(new IntRef(0), null);
+ final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
+ @Override
+ public R getReducedValue(MasterGlobalCommUsage master) {
+ BasicArrayList<R> result = reduceHandle.getReducedValue(master);
+ if (fixedSize == -1 && curIndex.value >= result.size()) {
+ typeOps.set(reusableValue, initialValue);
+ } else {
+ result.getInto(curIndex.value, reusableValue);
+ }
+ return reusableValue;
+ }
+
+ @Override
+ public void reduce(S valueToReduce) {
+ reusablePair.getLeft().value = curIndex.value;
+ reusablePair.setRight(valueToReduce);
+ reduceHandle.reduce(reusablePair);
+ }
+
+ @Override
+ public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ return new ReducerArrayHandle<S, R>() {
+ @Override
+ public ReducerHandle<S, R> get(int index) {
+ curIndex.value = index;
+ return elementReduceHandle;
+ }
+
+ @Override
+ public int getStaticSize() {
+ if (fixedSize == -1) {
+ throw new UnsupportedOperationException(
+ "Cannot call size, when one is not specified upfront");
+ }
+ return fixedSize;
+ }
+
+ @Override
+ public int getReducedSize(BlockMasterApi master) {
+ return reduceHandle.getReducedValue(master).size();
+ }
+
+ @Override
+ public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
+ final BroadcastHandle<BasicArrayList<R>> broadcastHandle =
+ reduceHandle.broadcastValue(master);
+ final IntRef curIndex = new IntRef(0);
+ final R reusableValue = typeOps.create();
+ final BroadcastHandle<R>
+ elementBroadcastHandle = new BroadcastHandle<R>() {
+ @Override
+ public R getBroadcast(WorkerBroadcastUsage worker) {
+ BasicArrayList<R> result = broadcastHandle.getBroadcast(worker);
+ if (fixedSize == -1 && curIndex.value >= result.size()) {
+ typeOps.set(reusableValue, initialValue);
+ } else {
+ result.getInto(curIndex.value, reusableValue);
+ }
+ return reusableValue;
+ }
+ };
+ return new BroadcastArrayHandle<R>() {
+ @Override
+ public BroadcastHandle<R> get(int index) {
+ curIndex.value = index;
+ return elementBroadcastHandle;
+ }
+
+ @Override
+ public int getStaticSize() {
+ if (fixedSize == -1) {
+ throw new UnsupportedOperationException(
+ "Cannot call size, when one is not specified upfront");
+ }
+ return fixedSize;
+ }
+
+ @Override
+ public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+ return broadcastHandle.getBroadcast(worker).size();
+ }
+ };
+ }
+ };
+ }
+
+
+ private void init() {
+ initialElement = elementReduceOp.createInitialValue();
+ reusable = typeOps.create();
+ reusable2 = typeOps.create();
+ }
+
+ @Override
+ public BasicArrayList<R> createInitialValue() {
+ if (fixedSize != -1) {
+ BasicArrayList<R> list = typeOps.createArrayList(fixedSize);
+ fill(list, fixedSize);
+ return list;
+ } else {
+ return typeOps.createArrayList(1);
+ }
+ }
+
+ private void fill(BasicArrayList<R> list, int newSize) {
+ if (fixedSize != -1 && newSize > fixedSize) {
+ throw new IllegalArgumentException(newSize + " larger then " + fixedSize);
+ }
+
+ if (list.capacity() < newSize) {
+ list.setCapacity(newSize);
+ }
+ while (list.size() < newSize) {
+ list.add(initialElement);
+ }
+ }
+
+ @Override
+ public BasicArrayList<R> reduce(
+ BasicArrayList<R> curValue, Pair<IntRef, S> valueToReduce) {
+ int index = valueToReduce.getLeft().value;
+ fill(curValue, index + 1);
+ curValue.getInto(index, reusable);
+ R result = elementReduceOp.reduce(reusable, valueToReduce.getRight());
+ curValue.set(index, result);
+ return curValue;
+ }
+
+ @Override
+ public BasicArrayList<R> reduceMerge(
+ BasicArrayList<R> curValue, BasicArrayList<R> valueToReduce) {
+ fill(curValue, valueToReduce.size());
+ for (int i = 0; i < valueToReduce.size(); i++) {
+ valueToReduce.getInto(i, reusable2);
+ curValue.getInto(i, reusable);
+ R result = elementReduceOp.reduceMerge(reusable, reusable2);
+ curValue.set(i, result);
+ }
+
+ return curValue;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(fixedSize);
+ TypeOpsUtils.writeTypeOps(typeOps, out);
+ WritableUtils.writeWritableObject(elementReduceOp, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ fixedSize = in.readInt();
+ typeOps = TypeOpsUtils.readTypeOps(in);
+ elementReduceOp = WritableUtils.readWritableObject(in, null);
+ init();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
new file mode 100644
index 0000000..be5d4fe
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.array;
+
+import java.util.ArrayList;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
+import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfBroadcasts;
+import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfReducers;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.function.ObjectHolder;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.utils.ArrayWritable;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Utility class when we are dealing with huge arrays (i.e. large number of
+ * elements) within reducing/broadcasting.
+ *
+ * In Giraph, for each reducer there is a worker machine which is it's owner,
+ * which does partial aggregation for it. So if we have only single huge
+ * reducer - other workers will have to wait, while that single worker is doing
+ * huge reducing operation.
+ * On the other hand, each reducer has a meaningful overhead, so we should try
+ * to keep number of reducers as low as possible (in total less then 10k is a
+ * good number).
+ * What we want is to split such huge reducers into slightly more then number
+ * of worker reducers, and NUM_REDUCERS = 50000 is used here as a good middle
+ * ground.
+ *
+ * So when we have huge array, we don't want one reducer/broadcast for each
+ * element, but we also don't want one reducer/broadcast for the whole array.
+ *
+ * This class allows transparent split into reasonable number of reducers
+ * (~50000), which solves both of the above issues.
+ */
+public class HugeArrayUtils {
+ // Striping perfectly reducers of up to 25GB (i.e. 500KB * NUM_STRIPES).
+ private static final IntConfOption NUM_STRIPES = new IntConfOption(
+ "giraph.reducers.HugeArrayUtils.num_stripes", 50000,
+ "Number of distict reducers to create. If array is smaller then this" +
+ "number, each element will be it's own reducer");
+
+ private HugeArrayUtils() { }
+
+ /**
+ * Create global array of reducers, by splitting the huge array
+ * into NUM_STRIPES number of parts.
+ *
+ * @param fixedSize Number of elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ * @param reduceApi Api for creating reducers
+ * @return Created ReducerArrayHandle
+ */
+ public static <S, R extends Writable>
+ ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
+ final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
+ final CreateReducersApi reduceApi) {
+ return createGlobalReducerArrayHandle(
+ fixedSize, elementReduceOp, reduceApi,
+ NUM_STRIPES.get(reduceApi.getConf()));
+ }
+
+ /**
+ * Create global array of reducers, by splitting the huge array
+ * into {@code maxNumStripes} number of parts.
+ *
+ * @param fixedSize Number of elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ * @param reduceApi Api for creating reducers
+ * @param maxNumStripes Maximal number of reducers to create.
+ * @return Created ReducerArrayHandle
+ */
+ public static <S, R extends Writable>
+ ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
+ final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
+ final CreateReducersApi reduceApi, int maxNumStripes) {
+ PrimitiveTypeOps<R> typeOps = TypeOpsUtils.getPrimitiveTypeOpsOrNull(
+ (Class<R>) elementReduceOp.createInitialValue().getClass());
+
+ final CreateReducerFunctionApi
+ createReducer = new CreateReducerFunctionApi() {
+ @Override
+ public <S, R extends Writable> ReducerHandle<S, R> createReducer(
+ ReduceOperation<S, R> reduceOp) {
+ return reduceApi.createGlobalReducer(reduceOp);
+ }
+ };
+
+ if (fixedSize < maxNumStripes) {
+ return new ArrayOfReducers<>(
+ fixedSize,
+ new Supplier<ReducerHandle<S, R>>() {
+ @Override
+ public ReducerHandle<S, R> get() {
+ return createReducer.createReducer(elementReduceOp);
+ }
+ });
+ } else {
+ final ObjectStriping striping =
+ new ObjectStriping(fixedSize, maxNumStripes);
+
+ final ArrayList<ReducerArrayHandle<S, R>> handles =
+ new ArrayList<>(striping.getSplits());
+ for (int i = 0; i < striping.getSplits(); i++) {
+ if (typeOps != null) {
+ handles.add(BasicArrayReduce.createArrayHandles(
+ striping.getSplitSize(i), typeOps,
+ elementReduceOp, createReducer));
+ } else {
+ handles.add(ArrayReduce.createArrayHandles(
+ striping.getSplitSize(i), elementReduceOp, createReducer));
+ }
+ }
+
+ return new ReducerArrayHandle<S, R>() {
+ @Override
+ public ReducerHandle<S, R> get(int index) {
+ if ((index >= fixedSize) || (index < 0)) {
+ throw new RuntimeException(
+ "Reducer Access out of bounds: requested : " +
+ index + " from array of size : " + fixedSize);
+ }
+ int reducerIndex = striping.getSplitIndex(index);
+ int insideIndex = striping.getInsideIndex(index);
+ return handles.get(reducerIndex).get(insideIndex);
+ }
+
+ @Override
+ public int getStaticSize() {
+ return fixedSize;
+ }
+
+ @Override
+ public int getReducedSize(BlockMasterApi master) {
+ return getStaticSize();
+ }
+
+ @Override
+ public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
+ throw new UnsupportedOperationException("for now not supported");
+ }
+ };
+ }
+ }
+
+ /**
+ * Broadcast a huge array, by splitting into NUM_STRIPES number of parts.
+ *
+ * @param count Number of elements
+ * @param valueSupplier Supplier of value to be broadcasted for a given index
+ * @param master Master API
+ * @return Created BroadcastArrayHandle
+ */
+ public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
+ final int count,
+ final Int2ObjFunction<V> valueSupplier,
+ final BlockMasterApi master) {
+ return broadcast(count, valueSupplier, null, master);
+ }
+
+ /**
+ * Broadcast a huge array, by splitting into NUM_STRIPES number of parts.
+ * Efficient for primitive types, using BasicArray underneath.
+ *
+ * @param count Number of elements
+ * @param valueSupplier Supplier of value to be broadcasted for a given index
+ * @param typeOps Element TypeOps
+ * @param master Master API
+ * @return Created BroadcastArrayHandle
+ */
+ public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
+ final int count,
+ final Int2ObjFunction<V> valueSupplier,
+ final PrimitiveTypeOps<V> typeOps,
+ final BlockMasterApi master) {
+ int numStripes = NUM_STRIPES.get(master.getConf());
+ if (count < numStripes) {
+ return new ArrayOfBroadcasts<>(
+ count,
+ new Int2ObjFunction<BroadcastHandle<V>>() {
+ @Override
+ public BroadcastHandle<V> apply(int i) {
+ // We create a copy because the valueSupplier might return a
+ // reusable obj. This function is NOT safe if typeOps is null
+ // & valueSupplier returns reusable
+ return master.broadcast(
+ typeOps != null ?
+ typeOps.createCopy(valueSupplier.apply(i)) :
+ valueSupplier.apply(i));
+ }
+ });
+ } else {
+ ObjectStriping striping = new ObjectStriping(count, numStripes);
+ final Int2ObjFunction<BroadcastHandle<V>> handleSupplier;
+
+ if (typeOps != null) {
+ handleSupplier = getPrimitiveBroadcastHandleSupplier(
+ valueSupplier, typeOps, master, striping);
+ } else {
+ handleSupplier = getObjectBroadcastHandleSupplier(
+ valueSupplier, master, striping);
+ }
+ return new BroadcastArrayHandle<V>() {
+ @Override
+ public BroadcastHandle<V> get(int index) {
+ if (index >= count || index < 0) {
+ throw new RuntimeException(
+ "Broadcast Access out of bounds: requested: " +
+ index + " from array of size : " + count);
+ }
+ return handleSupplier.apply(index);
+ }
+
+ @Override
+ public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+ return count;
+ }
+
+ @Override
+ public int getStaticSize() {
+ return count;
+ }
+ };
+ }
+ }
+
+ private static <V extends Writable>
+ Int2ObjFunction<BroadcastHandle<V>> getObjectBroadcastHandleSupplier(
+ final Int2ObjFunction<V> valueSupplier,
+ final BlockMasterApi master, final ObjectStriping striping) {
+ final ObjectHolder<Class<V>> elementClass = new ObjectHolder<>();
+ final ArrayOfHandles<BroadcastHandle<ArrayWritable<V>>> arrayOfBroadcasts =
+ new ArrayOfHandles<>(
+ striping.getSplits(),
+ new Int2ObjFunction<BroadcastHandle<ArrayWritable<V>>>() {
+ @Override
+ public BroadcastHandle<ArrayWritable<V>> apply(int value) {
+ int size = striping.getSplitSize(value);
+ int start = striping.getSplitStart(value);
+ V[] array = (V[]) new Writable[size];
+ for (int i = 0; i < size; i++) {
+ array[i] = valueSupplier.apply(start + i);
+ if (elementClass.get() == null) {
+ elementClass.apply((Class<V>) array[i].getClass());
+ }
+ }
+ return master.broadcast(
+ new ArrayWritable<>(elementClass.get(), array));
+ }
+ });
+
+ final IntRef insideIndex = new IntRef(-1);
+ final ObjectHolder<BroadcastHandle<ArrayWritable<V>>> handleHolder =
+ new ObjectHolder<>();
+
+ final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
+ @Override
+ public V getBroadcast(WorkerBroadcastUsage worker) {
+ return handleHolder.get().getBroadcast(worker).get()[insideIndex.value];
+ }
+ };
+
+ return createBroadcastHandleSupplier(
+ striping, arrayOfBroadcasts, insideIndex, handleHolder,
+ reusableHandle);
+ }
+
+ private static <V extends Writable>
+ Int2ObjFunction<BroadcastHandle<V>> getPrimitiveBroadcastHandleSupplier(
+ final Int2ObjFunction<V> valueSupplier, final PrimitiveTypeOps<V> typeOps,
+ final BlockMasterApi master, final ObjectStriping striping) {
+ final ArrayOfHandles<BroadcastHandle<BasicArrayList<V>>> arrayOfBroadcasts =
+ new ArrayOfHandles<>(
+ striping.getSplits(),
+ new Int2ObjFunction<BroadcastHandle<BasicArrayList<V>>>() {
+ @Override
+ public BroadcastHandle<BasicArrayList<V>> apply(int value) {
+ int size = striping.getSplitSize(value);
+ int start = striping.getSplitStart(value);
+ BasicArrayList<V> array = typeOps.createArrayList(size);
+ for (int i = 0; i < size; i++) {
+ array.add(valueSupplier.apply(start + i));
+ }
+ return master.broadcast(array);
+ }
+ });
+
+ final IntRef insideIndex = new IntRef(-1);
+ final ObjectHolder<BroadcastHandle<BasicArrayList<V>>> handleHolder =
+ new ObjectHolder<>();
+ final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
+ private final V reusable = typeOps.create();
+ @Override
+ public V getBroadcast(WorkerBroadcastUsage worker) {
+ handleHolder.get().getBroadcast(worker).getInto(
+ insideIndex.value, reusable);
+ return reusable;
+ }
+ };
+
+ return createBroadcastHandleSupplier(
+ striping, arrayOfBroadcasts, insideIndex, handleHolder,
+ reusableHandle);
+ }
+
+ private static <V extends Writable, A>
+ Int2ObjFunction<BroadcastHandle<V>> createBroadcastHandleSupplier(
+ final ObjectStriping striping,
+ final ArrayOfHandles<BroadcastHandle<A>> arrayOfBroadcasts,
+ final IntRef insideIndex,
+ final ObjectHolder<BroadcastHandle<A>> handleHolder,
+ final BroadcastHandle<V> reusableHandle) {
+ final Int2ObjFunction<BroadcastHandle<V>> handleProvider =
+ new Int2ObjFunction<BroadcastHandle<V>>() {
+ @Override
+ public BroadcastHandle<V> apply(int index) {
+ int broadcastIndex = striping.getSplitIndex(index);
+ insideIndex.value = striping.getInsideIndex(index);
+ handleHolder.apply(arrayOfBroadcasts.get(broadcastIndex));
+ return reusableHandle;
+ }
+ };
+ return handleProvider;
+ }
+
+ /**
+ * Handles indices calculations when spliting one range into smaller number
+ * of splits, where indices stay consecutive.
+ */
+ static class ObjectStriping {
+ private final int splits;
+ private final int indicesPerObject;
+ private final int overflowNum;
+ private final int beforeOverflow;
+
+ public ObjectStriping(int size, int splits) {
+ this.splits = splits;
+ this.indicesPerObject = size / splits;
+ this.overflowNum = size % splits;
+ this.beforeOverflow = overflowNum * (indicesPerObject + 1);
+ }
+
+ public int getSplits() {
+ return splits;
+ }
+
+ public int getSplitSize(int splitIndex) {
+ return indicesPerObject + (splitIndex < overflowNum ? 1 : 0);
+ }
+
+ public int getSplitStart(int splitIndex) {
+ if (splitIndex < overflowNum) {
+ return splitIndex * (indicesPerObject + 1);
+ } else {
+ return beforeOverflow + (splitIndex - overflowNum) * indicesPerObject;
+ }
+ }
+
+ public int getSplitIndex(int objectIndex) {
+ if (objectIndex < beforeOverflow) {
+ return objectIndex / (indicesPerObject + 1);
+ } else {
+ return (objectIndex - beforeOverflow) / indicesPerObject + overflowNum;
+ }
+ }
+
+ public int getInsideIndex(int objectIndex) {
+ if (objectIndex < beforeOverflow) {
+ return objectIndex % (indicesPerObject + 1);
+ } else {
+ return (objectIndex - beforeOverflow) % indicesPerObject;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java
new file mode 100644
index 0000000..33f8a24
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Reducers for collecting arrays of objects.
+ */
+package org.apache.giraph.block_app.reducers.array;
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java
new file mode 100644
index 0000000..13dd153
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.types.ops.collections.ResettableIterator;
+import org.apache.giraph.utils.WritableUtils;
+
+/**
+ * Collect primitive values reduce operation
+ *
+ * @param <S> Primitive Writable type, which has its type ops
+ */
+public class CollectPrimitiveReduceOperation<S>
+ extends KryoWrappedReduceOperation<S, BasicArrayList<S>> {
+ /**
+ * Type ops if available, or null
+ */
+ private PrimitiveTypeOps<S> typeOps;
+
+ /** For reflection only */
+ public CollectPrimitiveReduceOperation() {
+ }
+
+ public CollectPrimitiveReduceOperation(PrimitiveTypeOps<S> typeOps) {
+ this.typeOps = typeOps;
+ }
+
+ @Override
+ public BasicArrayList<S> createValue() {
+ return createList();
+ }
+
+ @Override
+ public void reduce(BasicArrayList<S> reduceInto, S value) {
+ reduceInto.add(value);
+ }
+
+ @Override
+ public void reduceMerge(BasicArrayList<S> reduceInto,
+ BasicArrayList<S> toReduce) {
+ ResettableIterator<S> iterator = toReduce.fastIterator();
+ while (iterator.hasNext()) {
+ reduceInto.add(iterator.next());
+ }
+ }
+
+ public BasicArrayList<S> createList() {
+ return typeOps.createArrayList();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeClass(typeOps.getTypeClass(), out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ typeOps = TypeOpsUtils.getPrimitiveTypeOps(
+ WritableUtils.<S>readClass(in));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java
new file mode 100644
index 0000000..304ac47
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
+
+/**
+ * Collect values reduce operation
+ *
+ * @param <S> Type of values to collect
+ */
+public class CollectReduceOperation<S>
+ extends KryoWrappedReduceOperation<S, List<S>> {
+ @Override
+ public List<S> createValue() {
+ return createList();
+ }
+
+ @Override
+ public void reduce(List<S> reduceInto, S value) {
+ reduceInto.add(value);
+ }
+
+ @Override
+ public void reduceMerge(List<S> reduceInto, List<S> toReduce) {
+ reduceInto.addAll(toReduce);
+ }
+
+ public List<S> createList() {
+ return new ArrayList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java
new file mode 100644
index 0000000..b29b297
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+
+/**
+ * ShardedReducerHandle where we keep a list of reduced values,
+ * when primitives are used
+ *
+ * @param <S> Single value type
+ */
+public class CollectShardedPrimitiveReducerHandle<S>
+ extends ShardedReducerHandle<S, BasicArrayList<S>> {
+ /**
+ * Type ops if available, or null
+ */
+ private final PrimitiveTypeOps<S> typeOps;
+
+ public CollectShardedPrimitiveReducerHandle(final CreateReducersApi reduceApi,
+ Class<S> valueClass) {
+ typeOps = TypeOpsUtils.getPrimitiveTypeOps(valueClass);
+ register(reduceApi);
+ }
+
+ @Override
+ public ReduceOperation<S, KryoWritableWrapper<BasicArrayList<S>>>
+ createReduceOperation() {
+ return new CollectPrimitiveReduceOperation<>(typeOps);
+ }
+
+ @Override
+ public BasicArrayList<S> createReduceResult(MasterGlobalCommUsage master) {
+ int size = 0;
+ for (int i = 0; i < REDUCER_COUNT; i++) {
+ size += reducers.get(i).getReducedValue(master).get().size();
+ }
+ return createList(size);
+ }
+
+ public BasicArrayList<S> createList(int size) {
+ return typeOps.createArrayList(size);
+ }
+
+ @Override
+ public BroadcastHandle<BasicArrayList<S>> createBroadcastHandle(
+ BroadcastArrayHandle<KryoWritableWrapper<BasicArrayList<S>>> broadcasts) {
+ return new CollectShardedPrimitiveBroadcastHandle(broadcasts);
+ }
+
+ /**
+ * Broadcast handle for CollectShardedPrimitiveReducerHandle
+ */
+ public class CollectShardedPrimitiveBroadcastHandle
+ extends ShardedBroadcastHandle {
+ public CollectShardedPrimitiveBroadcastHandle(
+ BroadcastArrayHandle<KryoWritableWrapper<BasicArrayList<S>>>
+ broadcasts) {
+ super(broadcasts);
+ }
+
+ @Override
+ public BasicArrayList<S> createBroadcastResult(
+ WorkerBroadcastUsage worker) {
+ int size = 0;
+ for (int i = 0; i < REDUCER_COUNT; i++) {
+ size += broadcasts.get(i).getBroadcast(worker).get().size();
+ }
+ return createList(size);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java
new file mode 100644
index 0000000..5132ecf
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+
+/**
+ * ShardedReducerHandle where we keep a list of reduced values
+ *
+ * @param <S> Single value type
+ */
+public class CollectShardedReducerHandle<S>
+ extends ShardedReducerHandle<S, List<S>> {
+ public CollectShardedReducerHandle(CreateReducersApi reduceApi) {
+ register(reduceApi);
+ }
+
+ @Override
+ public ReduceOperation<S, KryoWritableWrapper<List<S>>>
+ createReduceOperation() {
+ return new CollectReduceOperation<>();
+ }
+
+ @Override
+ public List<S> createReduceResult(MasterGlobalCommUsage master) {
+ int size = 0;
+ for (int i = 0; i < REDUCER_COUNT; i++) {
+ size += reducers.get(i).getReducedValue(master).get().size();
+ }
+ return createList(size);
+ }
+
+ public List<S> createList(int size) {
+ return new ArrayList<S>(size);
+ }
+
+ @Override
+ public BroadcastHandle<List<S>> createBroadcastHandle(
+ BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) {
+ return new CollectShardedBroadcastHandle(broadcasts);
+ }
+
+ /**
+ * BroadcastHandle for CollectShardedReducerHandle
+ */
+ public class CollectShardedBroadcastHandle extends ShardedBroadcastHandle {
+ public CollectShardedBroadcastHandle(
+ BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) {
+ super(broadcasts);
+ }
+
+ @Override
+ public List<S> createBroadcastResult(WorkerBroadcastUsage worker) {
+ int size = 0;
+ for (int i = 0; i < REDUCER_COUNT; i++) {
+ size += broadcasts.get(i).getBroadcast(worker).get().size();
+ }
+ return createList(size);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java
new file mode 100644
index 0000000..3222c17
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+
+/**
+ * ShardedReducerHandle where we keep a list of reduced values,
+ * and values consist of multiple primitives, so we keep one primitive
+ * list for each
+ */
+@SuppressWarnings("unchecked")
+public class CollectShardedTuplesOfPrimitivesReducerHandle
+extends ShardedReducerHandle<List<Object>, List<BasicArrayList>> {
+ /**
+ * Type ops if available, or null
+ */
+ private final List<PrimitiveTypeOps> typeOpsList;
+
+ public CollectShardedTuplesOfPrimitivesReducerHandle(
+ final CreateReducersApi reduceApi, Class<?>... valueClasses) {
+ typeOpsList = new ArrayList<>();
+ for (Class<?> valueClass : valueClasses) {
+ typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(valueClass));
+ }
+ register(reduceApi);
+ }
+
+ public List<Object> createSingleValue() {
+ List<Object> ret = new ArrayList<>();
+ for (PrimitiveTypeOps typeOps : typeOpsList) {
+ ret.add(typeOps.create());
+ }
+ return ret;
+ }
+
+ @Override
+ public ReduceOperation<List<Object>,
+ KryoWritableWrapper<List<BasicArrayList>>> createReduceOperation() {
+ return new CollectTuplesOfPrimitivesReduceOperation(typeOpsList);
+ }
+
+ @Override
+ public List<BasicArrayList> createReduceResult(
+ MasterGlobalCommUsage master) {
+ int size = 0;
+ for (int i = 0; i < REDUCER_COUNT; i++) {
+ size += reducers.get(i).getReducedValue(master).get().get(0).size();
+ }
+ return createLists(size);
+ }
+
+ public List<BasicArrayList> createLists(int size) {
+ List<BasicArrayList> ret = new ArrayList<>();
+ for (PrimitiveTypeOps typeOps : typeOpsList) {
+ ret.add(typeOps.createArrayList(size));
+ }
+ return ret;
+ }
+
+ @Override
+ public BroadcastHandle<List<BasicArrayList>> createBroadcastHandle(
+ BroadcastArrayHandle<KryoWritableWrapper<List<BasicArrayList>>>
+ broadcasts) {
+ return new CollectShardedTuplesOfPrimitivesBroadcastHandle(broadcasts);
+ }
+
+ /**
+ * BroadcastHandle for CollectShardedTuplesOfPrimitivesReducerHandle
+ */
+ public class CollectShardedTuplesOfPrimitivesBroadcastHandle
+ extends ShardedBroadcastHandle {
+ public CollectShardedTuplesOfPrimitivesBroadcastHandle(
+ BroadcastArrayHandle<KryoWritableWrapper<List<BasicArrayList>>>
+ broadcasts) {
+ super(broadcasts);
+ }
+
+ @Override
+ public List<BasicArrayList> createBroadcastResult(
+ WorkerBroadcastUsage worker) {
+ int size = 0;
+ for (int i = 0; i < REDUCER_COUNT; i++) {
+ size += broadcasts.get(i).getBroadcast(worker).get().size();
+ }
+ return createLists(size);
+ }
+ }
+
+ /**
+ * Reduce broadcast wrapper
+ */
+ public static class CollectShardedTuplesOfPrimitivesReduceBroadcast {
+ private CollectShardedTuplesOfPrimitivesReducerHandle reducerHandle;
+ private BroadcastHandle<List<BasicArrayList>> broadcastHandle;
+
+ /** Set reducer handle to just registered handle */
+ public void registeredReducer(CreateReducersApi reduceApi,
+ Class<?>... valueClasses) {
+ this.reducerHandle = new CollectShardedTuplesOfPrimitivesReducerHandle(
+ reduceApi, valueClasses);
+ }
+
+ public List<Object> createSingleValue() {
+ return reducerHandle.createSingleValue();
+ }
+
+ /** Reduce single value */
+ public void reduce(List<Object> valueToReduce) {
+ reducerHandle.reduce(valueToReduce);
+ }
+
+ /** Get reduced value */
+ public List<BasicArrayList> getReducedValue(MasterGlobalCommUsage master) {
+ return reducerHandle.getReducedValue(master);
+ }
+
+ /**
+ * Broadcast reduced value from master
+ */
+ public void broadcastValue(BlockMasterApi master) {
+ broadcastHandle = reducerHandle.broadcastValue(master);
+ }
+
+ /** Get broadcasted value */
+ public List<BasicArrayList> getBroadcast(WorkerBroadcastUsage worker) {
+ return broadcastHandle.getBroadcast(worker);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java
new file mode 100644
index 0000000..afaba7a
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.types.ops.collections.ResettableIterator;
+import org.apache.giraph.utils.WritableUtils;
+
+/**
+ * Collect tuples of primitive values reduce operation
+ */
+public class CollectTuplesOfPrimitivesReduceOperation
+ extends KryoWrappedReduceOperation<List<Object>, List<BasicArrayList>> {
+ /**
+ * Type ops if available, or null
+ */
+ private List<PrimitiveTypeOps> typeOpsList;
+
+ /** For reflection only */
+ public CollectTuplesOfPrimitivesReduceOperation() {
+ }
+
+ public CollectTuplesOfPrimitivesReduceOperation(
+ List<PrimitiveTypeOps> typeOpsList) {
+ this.typeOpsList = typeOpsList;
+ }
+
+ @Override
+ public List<BasicArrayList> createValue() {
+ List<BasicArrayList> ret = new ArrayList<>(typeOpsList.size());
+ for (PrimitiveTypeOps typeOps : typeOpsList) {
+ ret.add(typeOps.createArrayList());
+ }
+ return ret;
+ }
+
+ @Override
+ public void reduce(List<BasicArrayList> reduceInto, List<Object> value) {
+ for (int i = 0; i < reduceInto.size(); i++) {
+ reduceInto.get(i).add(value.get(i));
+ }
+ }
+
+ @Override
+ public void reduceMerge(List<BasicArrayList> reduceInto,
+ List<BasicArrayList> toReduce) {
+ for (int i = 0; i < reduceInto.size(); i++) {
+ ResettableIterator iterator = toReduce.get(i).fastIterator();
+ while (iterator.hasNext()) {
+ reduceInto.get(i).add(iterator.next());
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(typeOpsList.size());
+ for (PrimitiveTypeOps typeOps : typeOpsList) {
+ WritableUtils.writeClass(typeOps.getTypeClass(), out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ typeOpsList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(
+ WritableUtils.readClass(in)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java
new file mode 100644
index 0000000..0c17216
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.collect;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
+import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+import org.apache.giraph.writable.kryo.TransientRandom;
+
+/**
+ * Reducing values into a list of reducers, randomly,
+ * and getting the results of all reducers together
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public abstract class ShardedReducerHandle<S, R>
+ implements ReducerHandle<S, R> {
+ // Use a prime number for number of reducers, large enough to make sure
+ // request sizes are within expected size (0.5MB)
+ protected static final int REDUCER_COUNT = 39989;
+
+ protected final TransientRandom random = new TransientRandom();
+
+ protected ArrayOfHandles.ArrayOfReducers<S, KryoWritableWrapper<R>> reducers;
+
+ public final void register(final CreateReducersApi reduceApi) {
+ reducers = new ArrayOfHandles.ArrayOfReducers<>(REDUCER_COUNT,
+ new Supplier<ReducerHandle<S, KryoWritableWrapper<R>>>() {
+ @Override
+ public ReducerHandle<S, KryoWritableWrapper<R>> get() {
+ return reduceApi.createLocalReducer(createReduceOperation());
+ }
+ });
+ }
+
+ @Override
+ public final void reduce(S value) {
+ reducers.get(random.nextInt(REDUCER_COUNT)).reduce(value);
+ }
+
+ @Override
+ public final R getReducedValue(MasterGlobalCommUsage master) {
+ KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
+ createReduceResult(master));
+ ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
+ createReduceOperation();
+ for (int i = 0; i < REDUCER_COUNT; i++) {
+ reduceOperation.reduceMerge(ret,
+ reducers.get(i).getReducedValue(master));
+ }
+ return ret.get();
+ }
+
+ public abstract ReduceOperation<S, KryoWritableWrapper<R>>
+ createReduceOperation();
+
+ public R createReduceResult(MasterGlobalCommUsage master) {
+ return createReduceOperation().createInitialValue().get();
+ }
+
+ public BroadcastHandle<R> createBroadcastHandle(
+ BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
+ return new ShardedBroadcastHandle(broadcasts);
+ }
+
+ @Override
+ public final BroadcastHandle<R> broadcastValue(BlockMasterApi masterApi) {
+ return createBroadcastHandle(reducers.broadcastValue(masterApi));
+ }
+
+ /**
+ * Broadcast for ShardedReducerHandle
+ */
+ public class ShardedBroadcastHandle implements BroadcastHandle<R> {
+ protected final BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts;
+
+ public ShardedBroadcastHandle(
+ BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
+ this.broadcasts = broadcasts;
+ }
+
+ public R createBroadcastResult(WorkerBroadcastUsage worker) {
+ return createReduceOperation().createInitialValue().get();
+ }
+
+ @Override
+ public final R getBroadcast(WorkerBroadcastUsage worker) {
+ KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
+ createBroadcastResult(worker));
+ ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
+ createReduceOperation();
+ for (int i = 0; i < REDUCER_COUNT; i++) {
+ reduceOperation.reduceMerge(ret,
+ broadcasts.get(i).getBroadcast(worker));
+ }
+ return ret.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java
new file mode 100644
index 0000000..dc640f7
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Reducers for distributed collection of objects.
+ */
+package org.apache.giraph.block_app.reducers.collect;
http://git-wip-us.apache.org/repos/asf/giraph/blob/77f8a075/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java
new file mode 100644
index 0000000..0e1e113
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.reducers.map;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.map.BroadcastMapHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.map.ReducerMapHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.PrimitiveTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.WritableWriter;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Efficient generic primitive map of values reduce operation.
+ * (it is BasicMap Reduce, not to be confused with MapReduce)
+ *
+ * @param <K> Key type
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public class BasicMapReduce<K extends WritableComparable, S,
+ R extends Writable>
+ implements ReduceOperation<Pair<K, S>, Basic2ObjectMap<K, R>> {
+ private PrimitiveIdTypeOps<K> keyTypeOps;
+ private PrimitiveTypeOps<R> typeOps;
+ private ReduceOperation<S, R> elementReduceOp;
+ private WritableWriter<R> writer;
+
+ public BasicMapReduce() {
+ }
+
+ /**
+ * Create ReduceOperation that reduces BasicMaps by reducing individual
+ * elements corresponding to the same key.
+ *
+ * @param keyTypeOps TypeOps of keys
+ * @param typeOps TypeOps of individual elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ */
+ public BasicMapReduce(
+ PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
+ ReduceOperation<S, R> elementReduceOp) {
+ this.keyTypeOps = keyTypeOps;
+ this.typeOps = typeOps;
+ this.elementReduceOp = elementReduceOp;
+ init();
+ }
+
+ /**
+ * Registers one new local reducer, that will reduce BasicMap,
+ * by reducing individual elements corresponding to the same key
+ * using {@code elementReduceOp}.
+ *
+ * This function will return ReducerMapHandle, by which
+ * individual elements can be manipulated separately.
+ *
+ * @param keyTypeOps TypeOps of keys
+ * @param typeOps TypeOps of individual elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ * @param reduceApi API for creating reducers
+ * @return Created ReducerMapHandle
+ */
+ public static <K extends WritableComparable, S, R extends Writable>
+ ReducerMapHandle<K, S, R> createLocalMapHandles(
+ PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
+ ReduceOperation<S, R> elementReduceOp,
+ final CreateReducersApi reduceApi) {
+ return createMapHandles(
+ keyTypeOps, typeOps, elementReduceOp,
+ new CreateReducerFunctionApi() {
+ @Override
+ public <S, R extends Writable> ReducerHandle<S, R> createReducer(
+ ReduceOperation<S, R> reduceOp) {
+ return reduceApi.createLocalReducer(reduceOp);
+ }
+ });
+ }
+
+ /**
+ * Registers one new reducer, that will reduce BasicMap,
+ * by reducing individual elements corresponding to the same key
+ * using {@code elementReduceOp}.
+ *
+ * This function will return ReducerMapHandle, by which
+ * individual elements can be manipulated separately.
+ *
+ * @param keyTypeOps TypeOps of keys
+ * @param typeOps TypeOps of individual elements
+ * @param elementReduceOp ReduceOperation for individual elements
+ * @param createFunction Function for creating a reducer
+ * @return Created ReducerMapHandle
+ */
+ public static <K extends WritableComparable, S, R extends Writable>
+ ReducerMapHandle<K, S, R> createMapHandles(
+ final PrimitiveIdTypeOps<K> keyTypeOps, final PrimitiveTypeOps<R> typeOps,
+ ReduceOperation<S, R> elementReduceOp,
+ CreateReducerFunctionApi createFunction) {
+ final ReducerHandle<Pair<K, S>, Basic2ObjectMap<K, R>> reduceHandle =
+ createFunction.createReducer(
+ new BasicMapReduce<>(keyTypeOps, typeOps, elementReduceOp));
+ final K curIndex = keyTypeOps.create();
+ final R reusableValue = typeOps.create();
+ final R initialValue = elementReduceOp.createInitialValue();
+ final MutablePair<K, S> reusablePair = MutablePair.of(null, null);
+ final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
+ @Override
+ public R getReducedValue(MasterGlobalCommUsage master) {
+ Basic2ObjectMap<K, R> result = reduceHandle.getReducedValue(master);
+ R value = result.get(curIndex);
+ if (value == null) {
+ typeOps.set(reusableValue, initialValue);
+ } else {
+ typeOps.set(reusableValue, value);
+ }
+ return reusableValue;
+ }
+
+ @Override
+ public void reduce(S valueToReduce) {
+ reusablePair.setLeft(curIndex);
+ reusablePair.setRight(valueToReduce);
+ reduceHandle.reduce(reusablePair);
+ }
+
+ @Override
+ public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ return new ReducerMapHandle<K, S, R>() {
+ @Override
+ public ReducerHandle<S, R> get(K key) {
+ keyTypeOps.set(curIndex, key);
+ return elementReduceHandle;
+ }
+
+ @Override
+ public int getReducedSize(BlockMasterApi master) {
+ return reduceHandle.getReducedValue(master).size();
+ }
+
+ @Override
+ public BroadcastMapHandle<K, R> broadcastValue(BlockMasterApi master) {
+ final BroadcastHandle<Basic2ObjectMap<K, R>> broadcastHandle =
+ reduceHandle.broadcastValue(master);
+ final K curIndex = keyTypeOps.create();
+ final R reusableValue = typeOps.create();
+ final BroadcastHandle<R>
+ elementBroadcastHandle = new BroadcastHandle<R>() {
+ @Override
+ public R getBroadcast(WorkerBroadcastUsage worker) {
+ Basic2ObjectMap<K, R> result = broadcastHandle.getBroadcast(worker);
+ R value = result.get(curIndex);
+ if (value == null) {
+ typeOps.set(reusableValue, initialValue);
+ } else {
+ typeOps.set(reusableValue, value);
+ }
+ return reusableValue;
+ }
+ };
+ return new BroadcastMapHandle<K, R>() {
+ @Override
+ public BroadcastHandle<R> get(K key) {
+ keyTypeOps.set(curIndex, key);
+ return elementBroadcastHandle;
+ }
+
+ @Override
+ public int getBroadcastedSize(WorkerBroadcastUsage worker) {
+ return broadcastHandle.getBroadcast(worker).size();
+ }
+ };
+ }
+ };
+ }
+
+ private void init() {
+ writer = new WritableWriter<R>() {
+ @Override
+ public void write(DataOutput out, R value) throws IOException {
+ value.write(out);
+ }
+
+ @Override
+ public R readFields(DataInput in) throws IOException {
+ R result = typeOps.create();
+ result.readFields(in);
+ return result;
+ }
+ };
+ }
+
+ @Override
+ public Basic2ObjectMap<K, R> createInitialValue() {
+ return keyTypeOps.create2ObjectOpenHashMap(writer);
+ }
+
+ @Override
+ public Basic2ObjectMap<K, R> reduce(
+ Basic2ObjectMap<K, R> curValue, Pair<K, S> valueToReduce) {
+ R result = curValue.get(valueToReduce.getLeft());
+ if (result == null) {
+ result = typeOps.create();
+ }
+ result = elementReduceOp.reduce(result, valueToReduce.getRight());
+ curValue.put(valueToReduce.getLeft(), result);
+ return curValue;
+ }
+
+ @Override
+ public Basic2ObjectMap<K, R> reduceMerge(
+ Basic2ObjectMap<K, R> curValue, Basic2ObjectMap<K, R> valueToReduce) {
+ for (Iterator<K> iter = valueToReduce.fastKeyIterator(); iter.hasNext();) {
+ K key = iter.next();
+
+ R result = curValue.get(key);
+ if (result == null) {
+ result = typeOps.create();
+ }
+ result = elementReduceOp.reduceMerge(result, valueToReduce.get(key));
+ curValue.put(key, result);
+ }
+ return curValue;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ TypeOpsUtils.writeTypeOps(keyTypeOps, out);
+ TypeOpsUtils.writeTypeOps(typeOps, out);
+ WritableUtils.writeWritableObject(elementReduceOp, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ keyTypeOps = TypeOpsUtils.readTypeOps(in);
+ typeOps = TypeOpsUtils.readTypeOps(in);
+ elementReduceOp = WritableUtils.readWritableObject(in, null);
+ init();
+ }
+}