You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/12/01 23:24:20 UTC
git commit: updated refs/heads/trunk to fda1bb3
Repository: giraph
Updated Branches:
refs/heads/trunk bad44d472 -> fda1bb382
Improving and adding reducers
Summary:
- adding common reducers - Max/Min/Sum Reducer. As you can see - no code duplication
- adding NumericTypeOps functions to support the above
- added Varint encoding class
- renaming registerReduce -> registerReducer
Test Plan: mvn install
Reviewers: pavanka, sergey.edunov, maja.kabiljo
Reviewed By: maja.kabiljo
Differential Revision: https://reviews.facebook.net/D28983
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fda1bb38
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fda1bb38
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fda1bb38
Branch: refs/heads/trunk
Commit: fda1bb3820ac574ed5d59cb779171e313798336d
Parents: bad44d4
Author: Igor Kabiljo <ik...@fb.com>
Authored: Mon Dec 1 14:21:12 2014 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Dec 1 14:24:10 2014 -0800
----------------------------------------------------------------------
.../matrix/dense/FloatDenseMatrix.java | 8 +-
.../matrix/dense/IntDenseMatrix.java | 8 +-
.../matrix/sparse/DoubleSparseVector.java | 14 +-
.../matrix/sparse/FloatSparseMatrix.java | 8 +-
.../matrix/sparse/IntSparseMatrix.java | 8 +-
.../matrix/sparse/LongSparseVector.java | 14 +-
.../giraph/benchmark/ReducersBenchmark.java | 8 +-
.../AggregatorToGlobalCommTranslation.java | 4 +-
.../giraph/master/MasterAggregatorHandler.java | 6 +-
.../org/apache/giraph/master/MasterCompute.java | 8 +-
.../giraph/master/MasterGlobalCommUsage.java | 4 +-
.../apache/giraph/reducers/impl/MaxReduce.java | 83 ++++++++
.../apache/giraph/reducers/impl/MinReduce.java | 83 ++++++++
.../apache/giraph/reducers/impl/SumReduce.java | 81 ++++++++
.../giraph/reducers/impl/package-info.java | 21 ++
.../apache/giraph/types/ops/DoubleTypeOps.java | 34 +++-
.../apache/giraph/types/ops/FloatTypeOps.java | 33 +++-
.../org/apache/giraph/types/ops/IntTypeOps.java | 33 +++-
.../apache/giraph/types/ops/LongTypeOps.java | 33 +++-
.../apache/giraph/types/ops/NumericTypeOps.java | 67 +++++++
.../org/apache/giraph/types/ops/TypeOps.java | 3 +
.../apache/giraph/types/ops/TypeOpsUtils.java | 27 +++
.../types/ops/collections/BasicArrayList.java | 41 ++++
.../java/org/apache/giraph/utils/Varint.java | 198 +++++++++++++++++++
.../org/apache/giraph/utils/WritableUtils.java | 39 ++++
.../worker/WorkerAggregatorDelegator.java | 5 +
.../giraph/worker/WorkerAggregatorHandler.java | 16 +-
.../apache/giraph/worker/WorkerReduceUsage.java | 9 +
28 files changed, 854 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java
index ce75d6d..588a92a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java
@@ -26,9 +26,9 @@ import java.util.ArrayList;
*/
public class FloatDenseMatrix {
/** The number of rows in the matrix */
- private int numRows;
+ private final int numRows;
/** The number of columns in the matrix */
- private int numColumns;
+ private final int numColumns;
/** The rows of the matrix */
private ArrayList<FloatDenseVector> rows = null;
@@ -109,7 +109,7 @@ public class FloatDenseMatrix {
* @param i the row number
* @return the row of the matrix
*/
- FloatDenseVector getRow(int i) {
+ public FloatDenseVector getRow(int i) {
return rows.get(i);
}
@@ -118,7 +118,7 @@ public class FloatDenseMatrix {
*
* @param vec the vector to add
*/
- void addRow(FloatDenseVector vec) {
+ public void addRow(FloatDenseVector vec) {
if (rows.size() >= numRows) {
throw new RuntimeException("Cannot add more rows!");
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java
index ed85574..9ec5439 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java
@@ -26,9 +26,9 @@ import java.util.ArrayList;
*/
public class IntDenseMatrix {
/** The number of rows in the matrix */
- private int numRows;
+ private final int numRows;
/** The number of columns in the matrix */
- private int numColumns;
+ private final int numColumns;
/** The rows of the matrix */
private ArrayList<IntDenseVector> rows = null;
@@ -109,7 +109,7 @@ public class IntDenseMatrix {
* @param i the row number
* @return the row of the matrix
*/
- IntDenseVector getRow(int i) {
+ public IntDenseVector getRow(int i) {
return rows.get(i);
}
@@ -118,7 +118,7 @@ public class IntDenseMatrix {
*
* @param vec the vector to add
*/
- void addRow(IntDenseVector vec) {
+ public void addRow(IntDenseVector vec) {
if (rows.size() >= numRows) {
throw new RuntimeException("Cannot add more rows!");
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java
index fb54459..7abf0e6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java
@@ -19,14 +19,13 @@
package org.apache.giraph.aggregators.matrix.sparse;
import it.unimi.dsi.fastutil.ints.Int2DoubleMap;
+import it.unimi.dsi.fastutil.ints.Int2DoubleOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import it.unimi.dsi.fastutil.ints.Int2DoubleOpenHashMap;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
import org.apache.hadoop.io.Writable;
/**
@@ -85,6 +84,15 @@ public class DoubleSparseVector implements Writable {
}
/**
+ * Increment value for a given key
+ * @param key Key
+ * @param value Increment
+ */
+ public void add(int key, double value) {
+ entries.addTo(key, value);
+ }
+
+ /**
* Clear the contents of the vector.
*/
public void clear() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java
index a54ae31..e021555 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java
@@ -26,9 +26,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
*/
public class FloatSparseMatrix {
/** The number of rows in the matrix */
- private int numRows;
+ private final int numRows;
/** The rows of the matrix */
- private Int2ObjectOpenHashMap<FloatSparseVector> rows;
+ private final Int2ObjectOpenHashMap<FloatSparseVector> rows;
/**
* Create a new matrix with the given number of rows.
@@ -88,7 +88,7 @@ public class FloatSparseMatrix {
* @param i the row number
* @return the row of the matrix
*/
- FloatSparseVector getRow(int i) {
+ public FloatSparseVector getRow(int i) {
return rows.get(i);
}
@@ -98,7 +98,7 @@ public class FloatSparseMatrix {
* @param i the row
* @param vec the vector to set as the row
*/
- void setRow(int i, FloatSparseVector vec) {
+ public void setRow(int i, FloatSparseVector vec) {
rows.put(i, vec);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java
index b7cde77..29e8482 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java
@@ -26,9 +26,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
*/
public class IntSparseMatrix {
/** The number of rows in the matrix */
- private int numRows;
+ private final int numRows;
/** The rows of the matrix */
- private Int2ObjectOpenHashMap<IntSparseVector> rows;
+ private final Int2ObjectOpenHashMap<IntSparseVector> rows;
/**
* Create a new matrix with the given number of rows.
@@ -88,7 +88,7 @@ public class IntSparseMatrix {
* @param i the row number
* @return the row of the matrix
*/
- IntSparseVector getRow(int i) {
+ public IntSparseVector getRow(int i) {
return rows.get(i);
}
@@ -98,7 +98,7 @@ public class IntSparseMatrix {
* @param i the row
* @param vec the vector to set as the row
*/
- void setRow(int i, IntSparseVector vec) {
+ public void setRow(int i, IntSparseVector vec) {
rows.put(i, vec);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java
index 6337215..6a16525 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java
@@ -19,14 +19,13 @@
package org.apache.giraph.aggregators.matrix.sparse;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
+import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
import org.apache.hadoop.io.Writable;
/**
@@ -85,6 +84,15 @@ public class LongSparseVector implements Writable {
}
/**
+ * Increment value for a given key
+ * @param key Key
+ * @param value Increment
+ */
+ public void add(int key, long value) {
+ entries.addTo(key, value);
+ }
+
+ /**
* Clear the contents of the vector.
*/
public void clear() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
index 263274d..16c33e9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
@@ -126,15 +126,15 @@ public class ReducersBenchmark extends GiraphBenchmark {
String mi = "m" + i;
String pi = "p" + i;
- registerReduce(wi, TestLongSumReducer.INSTANCE);
- registerReduce(mi, new TestLongSumReducer());
+ registerReducer(wi, TestLongSumReducer.INSTANCE);
+ registerReducer(mi, new TestLongSumReducer());
if (superstep > 0) {
broadcast(wi, getReduced(wi));
broadcast(mi, new LongWritable(-superstep * i));
broadcast(pi, getReduced(pi));
- registerReduce(pi, new TestLongSumReducer(),
+ registerReducer(pi, new TestLongSumReducer(),
(LongWritable) getReduced(pi));
assertEquals(superstep * (getTotalNumVertices() * i) + w,
@@ -142,7 +142,7 @@ public class ReducersBenchmark extends GiraphBenchmark {
assertEquals(superstep * getTotalNumVertices() * i,
((LongWritable) getReduced(pi)).get());
} else {
- registerReduce(pi, new TestLongSumReducer());
+ registerReducer(pi, new TestLongSumReducer());
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
index eb25182..fa3f376 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
@@ -135,10 +135,10 @@ public class AggregatorToGlobalCommTranslation
AggregatorReduceOperation<Writable> cleanReduceOp =
entry.getValue().createReduceOp();
if (entry.getValue().isPersistent()) {
- globalComm.registerReduce(
+ globalComm.registerReducer(
entry.getKey(), cleanReduceOp, value);
} else {
- globalComm.registerReduce(
+ globalComm.registerReducer(
entry.getKey(), cleanReduceOp);
}
entry.getValue().setCurrentValue(null);
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index ccee656..98de9d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -80,13 +80,13 @@ public class MasterAggregatorHandler
}
@Override
- public final <S, R extends Writable> void registerReduce(
+ public final <S, R extends Writable> void registerReducer(
String name, ReduceOperation<S, R> reduceOp) {
- registerReduce(name, reduceOp, reduceOp.createInitialValue());
+ registerReducer(name, reduceOp, reduceOp.createInitialValue());
}
@Override
- public <S, R extends Writable> void registerReduce(
+ public <S, R extends Writable> void registerReducer(
String name, ReduceOperation<S, R> reduceOp,
R globalInitialValue) {
if (reducerMap.containsKey(name)) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 68eb416..eb4144a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -190,15 +190,15 @@ public abstract class MasterCompute
}
@Override
- public final <S, R extends Writable> void registerReduce(
+ public final <S, R extends Writable> void registerReducer(
String name, ReduceOperation<S, R> reduceOp) {
- serviceMaster.getGlobalCommHandler().registerReduce(name, reduceOp);
+ serviceMaster.getGlobalCommHandler().registerReducer(name, reduceOp);
}
@Override
- public final <S, R extends Writable> void registerReduce(
+ public final <S, R extends Writable> void registerReducer(
String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
- serviceMaster.getGlobalCommHandler().registerReduce(
+ serviceMaster.getGlobalCommHandler().registerReducer(
name, reduceOp, globalInitialValue);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
index c3ce0ea..7ee9048 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
@@ -33,7 +33,7 @@ public interface MasterGlobalCommUsage {
* @param <S> Single value type
* @param <R> Reduced value type
*/
- <S, R extends Writable> void registerReduce(
+ <S, R extends Writable> void registerReducer(
String name, ReduceOperation<S, R> reduceOp);
/**
@@ -48,7 +48,7 @@ public interface MasterGlobalCommUsage {
* @param <S> Single value type
* @param <R> Reduced value type
*/
- <S, R extends Writable> void registerReduce(
+ <S, R extends Writable> void registerReducer(
String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java
new file mode 100644
index 0000000..9d603a1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.types.ops.DoubleTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Reducer for calculating max of values
+ * @param <T> Value type
+ */
+public class MaxReduce<T extends WritableComparable>
+ extends OnSameReduceOperation<T> {
+ /** DoubleWritable specialization */
+ public static final MaxReduce<DoubleWritable> DOUBLE =
+ new MaxReduce<>(DoubleTypeOps.INSTANCE);
+ /** LongWritable specialization */
+ public static final MaxReduce<LongWritable> LONG =
+ new MaxReduce<>(LongTypeOps.INSTANCE);
+
+ /** Value type operations */
+ private NumericTypeOps<T> typeOps;
+
+ /** Constructor used for deserialization only */
+ public MaxReduce() {
+ }
+
+ /**
+ * Constructor
+ * @param typeOps Value type operations
+ */
+ public MaxReduce(NumericTypeOps<T> typeOps) {
+ this.typeOps = typeOps;
+ }
+
+ @Override
+ public T createInitialValue() {
+ return typeOps.createMinNegativeValue();
+ }
+
+ @Override
+ public T reduceSingle(T curValue, T valueToReduce) {
+ if (curValue.compareTo(valueToReduce) < 0) {
+ typeOps.set(curValue, valueToReduce);
+ }
+ return curValue;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ TypeOpsUtils.writeTypeOps(typeOps, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ typeOps = TypeOpsUtils.readTypeOps(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java
new file mode 100644
index 0000000..9972340
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.types.ops.DoubleTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Reducer for calculating min of values
+ * @param <T> Value type
+ */
+public class MinReduce<T extends WritableComparable>
+ extends OnSameReduceOperation<T> {
+ /** DoubleWritable specialization */
+ public static final MinReduce<DoubleWritable> DOUBLE =
+ new MinReduce<>(DoubleTypeOps.INSTANCE);
+ /** LongWritable specialization */
+ public static final MinReduce<LongWritable> LONG =
+ new MinReduce<>(LongTypeOps.INSTANCE);
+
+ /** Value type operations */
+ private NumericTypeOps<T> typeOps;
+
+ /** Constructor used for deserialization only */
+ public MinReduce() {
+ }
+
+ /**
+ * Constructor
+ * @param typeOps Value type operations
+ */
+ public MinReduce(NumericTypeOps<T> typeOps) {
+ this.typeOps = typeOps;
+ }
+
+ @Override
+ public T createInitialValue() {
+ return typeOps.createMaxPositiveValue();
+ }
+
+ @Override
+ public T reduceSingle(T curValue, T valueToReduce) {
+ if (curValue.compareTo(valueToReduce) > 0) {
+ typeOps.set(curValue, valueToReduce);
+ }
+ return curValue;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ TypeOpsUtils.writeTypeOps(typeOps, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ typeOps = TypeOpsUtils.readTypeOps(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java
new file mode 100644
index 0000000..3138733
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.types.ops.DoubleTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Reducer for calculating sum of values
+ * @param <T> Value type
+ */
+public class SumReduce<T extends Writable>
+ extends OnSameReduceOperation<T> {
+ /** DoubleWritable specialization */
+ public static final SumReduce<DoubleWritable> DOUBLE =
+ new SumReduce<>(DoubleTypeOps.INSTANCE);
+ /** LongWritable specialization */
+ public static final SumReduce<LongWritable> LONG =
+ new SumReduce<>(LongTypeOps.INSTANCE);
+
+ /** Value type operations */
+ private NumericTypeOps<T> typeOps;
+
+ /** Constructor used for deserialization only */
+ public SumReduce() {
+ }
+
+ /**
+ * Constructor
+ * @param typeOps Value type operations
+ */
+ public SumReduce(NumericTypeOps<T> typeOps) {
+ this.typeOps = typeOps;
+ }
+
+ @Override
+ public T createInitialValue() {
+ return typeOps.createZero();
+ }
+
+ @Override
+ public T reduceSingle(T curValue, T valueToReduce) {
+ typeOps.plusInto(curValue, valueToReduce);
+ return curValue;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ TypeOpsUtils.writeTypeOps(typeOps, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ typeOps = TypeOpsUtils.readTypeOps(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java
new file mode 100644
index 0000000..ba61ce8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Giraph reducers.
+ */
+package org.apache.giraph.reducers.impl;
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
index af8c38f..1ca7796 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
@@ -21,7 +21,9 @@ import org.apache.giraph.types.ops.collections.BasicArrayList.BasicDoubleArrayLi
import org.apache.hadoop.io.DoubleWritable;
/** TypeOps implementation for working with DoubleWritable type */
-public enum DoubleTypeOps implements PrimitiveTypeOps<DoubleWritable> {
+public enum DoubleTypeOps
+ implements PrimitiveTypeOps<DoubleWritable>,
+ NumericTypeOps<DoubleWritable> {
/** Singleton instance */
INSTANCE();
@@ -49,4 +51,34 @@ public enum DoubleTypeOps implements PrimitiveTypeOps<DoubleWritable> {
public BasicDoubleArrayList createArrayList(int capacity) {
return new BasicDoubleArrayList(capacity);
}
+
+ @Override
+ public DoubleWritable createMinNegativeValue() {
+ return new DoubleWritable(Double.NEGATIVE_INFINITY);
+ }
+
+ @Override
+ public DoubleWritable createMaxPositiveValue() {
+ return new DoubleWritable(Double.POSITIVE_INFINITY);
+ }
+
+ @Override
+ public DoubleWritable createZero() {
+ return new DoubleWritable(0);
+ }
+
+ @Override
+ public void plusInto(DoubleWritable value, DoubleWritable increment) {
+ value.set(value.get() + increment.get());
+ }
+
+ @Override
+ public void multiplyInto(DoubleWritable value, DoubleWritable multiplier) {
+ value.set(value.get() * multiplier.get());
+ }
+
+ @Override
+ public void negate(DoubleWritable value) {
+ value.set(-value.get());
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
index 3ca8409..3c69868 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
@@ -21,7 +21,8 @@ import org.apache.giraph.types.ops.collections.BasicArrayList.BasicFloatArrayLis
import org.apache.hadoop.io.FloatWritable;
/** TypeOps implementation for working with FloatWritable type */
-public enum FloatTypeOps implements PrimitiveTypeOps<FloatWritable> {
+public enum FloatTypeOps
+ implements PrimitiveTypeOps<FloatWritable>, NumericTypeOps<FloatWritable> {
/** Singleton instance */
INSTANCE();
@@ -49,4 +50,34 @@ public enum FloatTypeOps implements PrimitiveTypeOps<FloatWritable> {
public BasicFloatArrayList createArrayList(int capacity) {
return new BasicFloatArrayList(capacity);
}
+
+ @Override
+ public FloatWritable createMinNegativeValue() {
+ return new FloatWritable(Float.NEGATIVE_INFINITY);
+ }
+
+ @Override
+ public FloatWritable createMaxPositiveValue() {
+ return new FloatWritable(Float.POSITIVE_INFINITY);
+ }
+
+ @Override
+ public FloatWritable createZero() {
+ return new FloatWritable(0);
+ }
+
+ @Override
+ public void plusInto(FloatWritable value, FloatWritable increment) {
+ value.set(value.get() + increment.get());
+ }
+
+ @Override
+ public void multiplyInto(FloatWritable value, FloatWritable multiplier) {
+ value.set(value.get() * multiplier.get());
+ }
+
+ @Override
+ public void negate(FloatWritable value) {
+ value.set(-value.get());
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
index f9a32c0..57e1b53 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
@@ -26,7 +26,8 @@ import org.apache.giraph.types.ops.collections.BasicSet.BasicIntOpenHashSet;
import org.apache.hadoop.io.IntWritable;
/** TypeOps implementation for working with IntWritable type */
-public enum IntTypeOps implements PrimitiveIdTypeOps<IntWritable> {
+public enum IntTypeOps
+ implements PrimitiveIdTypeOps<IntWritable>, NumericTypeOps<IntWritable> {
/** Singleton instance */
INSTANCE;
@@ -65,4 +66,34 @@ public enum IntTypeOps implements PrimitiveIdTypeOps<IntWritable> {
int capacity) {
return new BasicInt2ObjectOpenHashMap<>(capacity);
}
+
+ @Override
+ public IntWritable createMinNegativeValue() {
+ return new IntWritable(Integer.MIN_VALUE);
+ }
+
+ @Override
+ public IntWritable createMaxPositiveValue() {
+ return new IntWritable(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public IntWritable createZero() {
+ return new IntWritable(0);
+ }
+
+ @Override
+ public void plusInto(IntWritable value, IntWritable increment) {
+ value.set(value.get() + increment.get());
+ }
+
+ @Override
+ public void multiplyInto(IntWritable value, IntWritable multiplier) {
+ value.set(value.get() * multiplier.get());
+ }
+
+ @Override
+ public void negate(IntWritable value) {
+ value.set(-value.get());
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
index 4e5ca54..d7fa198 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
@@ -26,7 +26,8 @@ import org.apache.giraph.types.ops.collections.BasicSet.BasicLongOpenHashSet;
import org.apache.hadoop.io.LongWritable;
/** TypeOps implementation for working with LongWritable type */
-public enum LongTypeOps implements PrimitiveIdTypeOps<LongWritable> {
+public enum LongTypeOps
+ implements PrimitiveIdTypeOps<LongWritable>, NumericTypeOps<LongWritable> {
/** Singleton instance */
INSTANCE;
@@ -65,4 +66,34 @@ public enum LongTypeOps implements PrimitiveIdTypeOps<LongWritable> {
int capacity) {
return new BasicLong2ObjectOpenHashMap<>(capacity);
}
+
+ @Override
+ public LongWritable createMinNegativeValue() {
+ return new LongWritable(Long.MIN_VALUE);
+ }
+
+ @Override
+ public LongWritable createMaxPositiveValue() {
+ return new LongWritable(Long.MAX_VALUE);
+ }
+
+ @Override
+ public LongWritable createZero() {
+ return new LongWritable(0);
+ }
+
+ @Override
+ public void plusInto(LongWritable value, LongWritable increment) {
+ value.set(value.get() + increment.get());
+ }
+
+ @Override
+ public void multiplyInto(LongWritable value, LongWritable multiplier) {
+ value.set(value.get() * multiplier.get());
+ }
+
+ @Override
+ public void negate(LongWritable value) {
+ value.set(-value.get());
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
new file mode 100644
index 0000000..396c914
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+/**
+ * Numeric type operations, allowing working generically with types,
+ * but still having efficient code.
+ *
+ * Using any of the provided operations should lead to no boxing/unboxing.
+ *
+ * @param <T> Type
+ */
+public interface NumericTypeOps<T> extends TypeOps<T> {
+ /**
+ * Minimal negative value representable via current type.
+ * Negative infinity for floating point numbers.
+ * @return New object with min negative value
+ */
+ T createMinNegativeValue();
+ /**
+ * Maximal positive value representable via current type.
+ * Positive infinity for floating point numbers.
+ * @return New object with max positive value
+ */
+ T createMaxPositiveValue();
+ /**
+ * Value of zero
+ * @return New object with value of zero
+ */
+ T createZero();
+
+ /**
+ * value+=adder
+ *
+ * @param value Value to modify
+ * @param increment Increment
+ */
+ void plusInto(T value, T increment);
+ /**
+ * value*=multiplier
+ *
+ * @param value Value to modify
+ * @param multiplier Multiplier
+ */
+ void multiplyInto(T value, T multiplier);
+
+ /**
+ * -value
+ * @param value Value to negate
+ */
+ void negate(T value);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
index b7f9479..c4bd702 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
@@ -23,6 +23,9 @@ package org.apache.giraph.types.ops;
* but still having efficient code.
* For example, by reducing object allocation via reuse.
*
+ * Use enum singleton pattern, having single enum value - INSTANCE.
+ * Serialization code depends on implementations being enums.
+ *
* @param <T> Type
*/
public interface TypeOps<T> {
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
index df5f2bd..785fda1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
@@ -17,6 +17,11 @@
*/
package org.apache.giraph.types.ops;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.DoubleWritable;
@@ -146,4 +151,26 @@ public class TypeOpsUtils {
type + " not supported in TypeOps");
}
}
+
+ /**
+ * Write TypeOps object into a stream
+ * @param typeOps type ops instance
+ * @param output output stream
+ * @param <T> Corresponding type
+ */
+ public static <T> void writeTypeOps(TypeOps<T> typeOps,
+ DataOutput output) throws IOException {
+ WritableUtils.writeEnum((Enum) typeOps, output);
+ }
+
+ /**
+ * Read TypeOps object from the stream
+ * @param input input stream
+ * @param <O> Concrete TypeOps type
+ * @return type ops instance
+ */
+ public static <O extends TypeOps<?>> O readTypeOps(
+ DataInput input) throws IOException {
+ return (O) WritableUtils.readEnum(input);
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java
index df5ca24..a96fb69 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java
@@ -58,6 +58,17 @@ public abstract class BasicArrayList<T> implements Writable {
*/
public abstract int size();
/**
+ * Sets the size of this list.
+ *
+ * <P>
+ * If the specified size is smaller than the current size,
+ * the last elements are discarded.
+ * Otherwise, they are filled with 0/<code>null</code>/<code>false</code>.
+ *
+ * @param newSize the new size.
+ */
+ public abstract void size(int newSize);
+ /**
* Capacity of currently allocated memory
* @return capacity
*/
@@ -168,6 +179,11 @@ public abstract class BasicArrayList<T> implements Writable {
}
@Override
+ public void size(int newSize) {
+ list.size(newSize);
+ }
+
+ @Override
public int capacity() {
return list.elements().length;
}
@@ -250,6 +266,11 @@ public abstract class BasicArrayList<T> implements Writable {
}
@Override
+ public void size(int newSize) {
+ list.size(newSize);
+ }
+
+ @Override
public int capacity() {
return list.elements().length;
}
@@ -332,6 +353,11 @@ public abstract class BasicArrayList<T> implements Writable {
}
@Override
+ public void size(int newSize) {
+ list.size(newSize);
+ }
+
+ @Override
public int capacity() {
return list.elements().length;
}
@@ -414,6 +440,11 @@ public abstract class BasicArrayList<T> implements Writable {
}
@Override
+ public void size(int newSize) {
+ list.size(newSize);
+ }
+
+ @Override
public int capacity() {
return list.elements().length;
}
@@ -496,6 +527,11 @@ public abstract class BasicArrayList<T> implements Writable {
}
@Override
+ public void size(int newSize) {
+ list.size(newSize);
+ }
+
+ @Override
public int capacity() {
return list.elements().length;
}
@@ -578,6 +614,11 @@ public abstract class BasicArrayList<T> implements Writable {
}
@Override
+ public void size(int newSize) {
+ list.size(newSize);
+ }
+
+ @Override
public int capacity() {
return list.elements().length;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
new file mode 100644
index 0000000..89d4e90
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+/**
+ * This Code is Copied from main/java/org/apache/mahout/math/Varint.java
+ *
+ * Only modification is throwing exceptions for passing negative values to
+ * unsigned functions, instead of serializing them.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * <p>
+ * Encodes signed and unsigned values using a common variable-length scheme,
+ * found for example in <a
+ * href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+ * Google's Protocol Buffers</a>. It uses fewer bytes to encode smaller values,
+ * but will use slightly more bytes to encode large values.
+ * </p>
+ * <p/>
+ * <p>
+ * Signed values are further encoded using so-called zig-zag encoding in order
+ * to make them "compatible" with variable-length encoding.
+ * </p>
+ */
+public final class Varint {
+
+ /**
+ * private constructor
+ */
+ private Varint() {
+ }
+
+ /**
+ * Encodes a value using the variable-length encoding from <a
+ * href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+ * Google Protocol Buffers</a>. Zig-zag is not used, so input must not be
+ * negative. If values can be negative, use
+ * {@link #writeSignedVarLong(long, DataOutput)} instead. This method treats
+ * negative input as like a large unsigned value.
+ *
+ * @param value
+ * value to encode
+ * @param out
+ * to write bytes to
+ * @throws IOException
+ * if {@link DataOutput} throws {@link IOException}
+ */
+ public static void writeUnsignedVarLong(
+ long value, DataOutput out) throws IOException {
+ if (value < 0) {
+ throw new IllegalArgumentException(
+ "Negative value passed into writeUnsignedVarLong - " + value);
+ }
+ while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
+ out.writeByte(((int) value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ out.writeByte((int) value & 0x7F);
+ }
+
+ /**
+ * @see #writeUnsignedVarLong(long, DataOutput)
+ * @param value
+ * value to encode
+ * @param out
+ * to write bytes to
+ */
+ public static void writeUnsignedVarInt(
+ int value, DataOutput out) throws IOException {
+ if (value < 0) {
+ throw new IllegalArgumentException(
+ "Negative value passed into writeUnsignedVarInt - " + value);
+ }
+ while ((value & 0xFFFFFF80) != 0L) {
+ out.writeByte((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ out.writeByte(value & 0x7F);
+ }
+
+ /**
+ * @param in
+ * to read bytes from
+ * @return decode value
+ * @throws IOException
+ * if {@link DataInput} throws {@link IOException}
+ * @throws IllegalArgumentException
+ * if variable-length value does not terminate after 9 bytes have
+ * been read
+ * @see #writeUnsignedVarLong(long, DataOutput)
+ */
+ public static long readUnsignedVarLong(DataInput in) throws IOException {
+ long value = 0L;
+ int i = 0;
+ long b = in.readByte();
+ while ((b & 0x80L) != 0) {
+ value |= (b & 0x7F) << i;
+ i += 7;
+ if (i > 63) {
+ throw new IllegalArgumentException(
+ "Variable length quantity is too long");
+ }
+ b = in.readByte();
+ }
+ return value | (b << i);
+ }
+
+ /**
+ * @throws IllegalArgumentException
+ * if variable-length value does not terminate after
+ * 5 bytes have been read
+ * @throws IOException
+ * if {@link DataInput} throws {@link IOException}
+ * @param in to read bytes from.
+ * @return decode value.
+ */
+ public static int readUnsignedVarInt(DataInput in) throws IOException {
+ int value = 0;
+ int i = 0;
+ int b = in.readByte();
+ while ((b & 0x80) != 0) {
+ value |= (b & 0x7F) << i;
+ i += 7;
+ if (i > 35) {
+ throw new IllegalArgumentException(
+ "Variable length quantity is too long");
+ }
+ b = in.readByte();
+ }
+ return value | (b << i);
+ }
+ /**
+ * Simulation for what will happen when writing an unsigned long value
+ * as varlong.
+ * @param value the value
+ * @return the number of bytes needed to write value.
+ * @throws IOException
+ */
+ public static long sizeOfUnsignedVarLong(long value) throws IOException {
+ long cnt = 0;
+ while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
+ cnt++;
+ value >>>= 7;
+ }
+ return ++cnt;
+ }
+
+ /**
+ * Simulation for what will happen when writing an unsigned int value
+ * as varint.
+ * @param value the value
+ * @return the number of bytes needed to write value.
+ * @throws IOException
+ */
+ public static long sizeOfUnsignedVarInt(int value) throws IOException {
+ long cnt = 0;
+ while ((value & 0xFFFFFF80) != 0L) {
+ cnt++;
+ value >>>= 7;
+ }
+ return ++cnt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 3c37bec..592ef7e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -27,6 +27,7 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
@@ -857,4 +858,42 @@ public class WritableUtils {
return res;
}
+ /**
+ * Writes enum into a stream, by serializing class name and it's index
+ * @param enumValue Enum value
+ * @param output Output stream
+ * @param <T> Enum type
+ */
+ public static <T extends Enum<T>> void writeEnum(T enumValue,
+ DataOutput output) throws IOException {
+ writeClass(
+ enumValue != null ? enumValue.getDeclaringClass() : null, output);
+ if (enumValue != null) {
+ Varint.writeUnsignedVarInt(enumValue.ordinal(), output);
+ }
+ }
+
+ /**
+ * Reads enum from the stream, serialized by writeEnum
+ * @param input Input stream
+ * @param <T> Enum type
+ * @return Enum value
+ */
+ public static <T extends Enum<T>> T readEnum(DataInput input) throws
+ IOException {
+ Class<T> clazz = readClass(input);
+ if (clazz != null) {
+ int ordinal = Varint.readUnsignedVarInt(input);
+ try {
+ T[] values = (T[]) clazz.getDeclaredMethod("values").invoke(null);
+ return values[ordinal];
+ } catch (IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException | NoSuchMethodException
+ | SecurityException e) {
+ throw new IOException("Cannot read enum", e);
+ }
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
index 916e7a0..6472850 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
@@ -54,6 +54,11 @@ public abstract class WorkerAggregatorDelegator<I extends WritableComparable,
}
@Override
+ public void reducePartial(String name, Writable value) {
+ workerGlobalCommUsage.reducePartial(name, value);
+ }
+
+ @Override
public final <B extends Writable> B getBroadcast(String name) {
return workerGlobalCommUsage.getBroadcast(name);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index ee47542..96d239d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -113,7 +113,8 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
* @param name Name of the reducer
* @param valueToReduce Partial value to reduce
*/
- protected void reducePartial(String name, Writable valueToReduce) {
+ @Override
+ public void reducePartial(String name, Writable valueToReduce) {
Reducer<Object, Writable> reducer = reducerMap.get(name);
if (reducer != null) {
progressable.progress();
@@ -329,6 +330,19 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
}
@Override
+ public void reducePartial(String name, Writable value) {
+ Reducer<Object, Writable> reducer = threadReducerMap.get(name);
+ if (reducer != null) {
+ progressable.progress();
+ reducer.reducePartial(value);
+ } else {
+ throw new IllegalStateException("reducePartial: " +
+ AggregatorUtils.getUnregisteredAggregatorMessage(name,
+ threadReducerMap.size() != 0, conf));
+ }
+ }
+
+ @Override
public <B extends Writable> B getBroadcast(String name) {
return WorkerAggregatorHandler.this.getBroadcast(name);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
index 9c2e90d..fe7cd32 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
@@ -17,6 +17,8 @@
*/
package org.apache.giraph.worker;
+import org.apache.hadoop.io.Writable;
+
/**
* Methods on worker can provide values to reduce through this interface
*/
@@ -27,4 +29,11 @@ public interface WorkerReduceUsage {
* @param value Single value to reduce
*/
void reduce(String name, Object value);
+
+ /**
+ * Reduce given partial value.
+ * @param name Name of the reducer
+ * @param value Single value to reduce
+ */
+ void reducePartial(String name, Writable value);
}