You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:27 UTC
[44/47] git commit: updated refs/heads/release-1.1 to 4c139ee
Fix using aggregators before aggregation
Summary:
If we register aggregator and immediatelly ask for aggregated value,
previous code was returning initial value, so we have to do the same.
Additionally - cleaning up errors/exceptions to be more understandable
(vs NullPointerEx for example)
Test Plan:
mvn install
AggregatorsBenchmark and ReducersBenchmark
Reviewers: majakabiljo, pavanka, sergey.edunov, maja.kabiljo
Differential Revision: https://reviews.facebook.net/D24951
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d32c429a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d32c429a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d32c429a
Branch: refs/heads/release-1.1
Commit: d32c429a1d475b322b3fe44738f0cc8f30a97b48
Parents: 7c61dcf
Author: Igor Kabiljo <ik...@fb.com>
Authored: Mon Oct 20 09:50:55 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Oct 20 09:54:37 2014 -0700
----------------------------------------------------------------------
.../aggregators/ArrayAggregatorFactory.java | 128 -------------------
.../aggregators/ClassAggregatorFactory.java | 72 -----------
.../giraph/benchmark/ReducersBenchmark.java | 3 +-
.../comm/aggregators/AggregatorUtils.java | 51 ++++++++
.../requests/SendAggregatorsToOwnerRequest.java | 2 +-
.../giraph/master/AggregatorBroadcast.java | 75 +++++++++++
.../master/AggregatorReduceOperation.java | 66 +++++++---
.../AggregatorToGlobalCommTranslation.java | 113 +++++++++++-----
.../apache/giraph/master/BspServiceMaster.java | 2 +-
.../giraph/master/MasterAggregatorHandler.java | 29 ++++-
.../giraph/master/MasterAggregatorUsage.java | 16 ---
.../org/apache/giraph/master/MasterCompute.java | 9 --
.../giraph/reducers/OnSameReduceOperation.java | 4 +-
.../apache/giraph/reducers/ReduceOperation.java | 10 +-
.../org/apache/giraph/reducers/Reducer.java | 41 ++++--
.../apache/giraph/utils/WritableFactory.java | 28 ----
.../org/apache/giraph/utils/WritableUtils.java | 6 +-
.../worker/WorkerAggregatorDelegator.java | 4 +-
.../giraph/worker/WorkerAggregatorHandler.java | 8 +-
.../giraph/worker/WorkerBroadcastUsage.java | 33 +++++
.../giraph/worker/WorkerGlobalCommUsage.java | 17 +--
.../apache/giraph/worker/WorkerReduceUsage.java | 30 +++++
.../giraph/aggregators/TestArrayAggregator.java | 50 --------
23 files changed, 396 insertions(+), 401 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
deleted file mode 100644
index c977c57..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.aggregators;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Array;
-
-import org.apache.giraph.utils.ArrayWritable;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Generic array aggregator factory, used to aggregate elements
- * of ArrayWritable via passed element aggregator.
- *
- * @param <A> Type of individual element
- */
-public class ArrayAggregatorFactory<A extends Writable>
- implements WritableFactory<Aggregator<ArrayWritable<A>>> {
- /** number of elements in array */
- private int n;
- /** element aggregator class */
- private WritableFactory<? extends Aggregator<A>> elementAggregatorFactory;
-
- /**
- * Constructor
- * @param n Number of elements in array
- * @param elementAggregatorClass Type of element aggregator
- */
- public ArrayAggregatorFactory(
- int n, Class<? extends Aggregator<A>> elementAggregatorClass) {
- this(n, new ClassAggregatorFactory<>(elementAggregatorClass));
- }
-
- /**
- * Constructor
- * @param n Number of elements in array
- * @param elementAggregatorFactory Element aggregator factory
- */
- public ArrayAggregatorFactory(int n,
- WritableFactory<? extends Aggregator<A>> elementAggregatorFactory) {
- this.n = n;
- this.elementAggregatorFactory = elementAggregatorFactory;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- n = in.readInt();
- elementAggregatorFactory = WritableUtils.readWritableObject(in, null);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(n);
- WritableUtils.writeWritableObject(elementAggregatorFactory, out);
- }
-
- @Override
- public Aggregator<ArrayWritable<A>> create() {
- return new ArrayAggregator<>(
- n, elementAggregatorFactory.create());
- }
-
- /**
- * Stateful aggregator that aggregates ArrayWritable by
- * aggregating individual elements
- *
- * @param <A> Type of individual element
- */
- public static class ArrayAggregator<A extends Writable>
- extends BasicAggregator<ArrayWritable<A>> {
- /** number of elements in array */
- private final int n;
- /** element aggregator */
- private final Aggregator<A> elementAggregator;
-
- /**
- * Constructor
- * @param n Number of elements in array
- * @param elementAggregator Element aggregator
- */
- public ArrayAggregator(int n, Aggregator<A> elementAggregator) {
- super(null);
- this.n = n;
- this.elementAggregator = elementAggregator;
- reset();
- }
-
- @Override
- public void aggregate(ArrayWritable<A> other) {
- A[] array = getAggregatedValue().get();
- for (int i = 0; i < n; i++) {
- elementAggregator.setAggregatedValue(array[i]);
- elementAggregator.aggregate(other.get()[i]);
- array[i] = elementAggregator.getAggregatedValue();
- }
- }
-
- @Override
- public ArrayWritable<A> createInitialValue() {
- Class<A> elementClass =
- (Class) elementAggregator.createInitialValue().getClass();
- A[] array = (A[]) Array.newInstance(elementClass, n);
- for (int i = 0; i < n; i++) {
- array[i] = elementAggregator.createInitialValue();
- }
- return new ArrayWritable<>(elementClass, array);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
deleted file mode 100644
index a022480..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.aggregators;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Aggregator factory based on aggregatorClass.
- *
- * @param <T> Aggregated value type
- */
-public class ClassAggregatorFactory<T extends Writable>
- implements WritableFactory<Aggregator<T>> {
- /** Aggregator class */
- private Class<? extends Aggregator<T>> aggregatorClass;
-
- /** Constructor */
- public ClassAggregatorFactory() {
- }
-
- /**
- * Constructor
- * @param aggregatorClass Aggregator class
- */
- public ClassAggregatorFactory(
- Class<? extends Aggregator<T>> aggregatorClass) {
- Preconditions.checkNotNull(aggregatorClass,
- "aggregatorClass cannot be null in ClassAggregatorFactory");
- this.aggregatorClass = aggregatorClass;
- }
-
- @Override
- public Aggregator<T> create() {
- return ReflectionUtils.newInstance(aggregatorClass, null);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- aggregatorClass = WritableUtils.readClass(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Preconditions.checkNotNull(aggregatorClass,
- "aggregatorClass cannot be null in ClassAggregatorFactory");
- WritableUtils.writeClass(aggregatorClass, out);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
index ce5c96e..263274d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
@@ -64,9 +64,10 @@ public class ReducersBenchmark extends GiraphBenchmark {
}
@Override
- public void reduceSingle(
+ public LongWritable reduceSingle(
LongWritable curValue, LongWritable valueToReduce) {
curValue.set(curValue.get() + valueToReduce.get());
+ return curValue;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
index ecb3a6b..dc0ceed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
@@ -99,4 +99,55 @@ public class AggregatorUtils {
}
return message;
}
+
+ /**
+ * Get the warning message about usage of unregistered reducer to be
+ * printed to user. If user didn't register any reducers also provide
+ * the explanation on how to do so.
+ *
+ * @param reducerName The name of the aggregator which user tried to
+ * access
+ * @param hasRegisteredReducers True iff user registered some aggregators
+ * @param conf Giraph configuration
+ * @return Warning message
+ */
+ public static String getUnregisteredReducerMessage(
+ String reducerName, boolean hasRegisteredReducers,
+ ImmutableClassesGiraphConfiguration conf) {
+ String message = "Tried to access reducer which wasn't registered " +
+ reducerName;
+ if (!hasRegisteredReducers) {
+ message = message + "; Aggregators can be registered from " +
+ "MasterCompute by calling registerReducer function. " +
+ "Also be sure that you are correctly setting MasterCompute class, " +
+ "currently using " + conf.getMasterComputeClass().getName();
+ }
+ return message;
+ }
+
+ /**
+ * Get the warning message when user tries to access broadcast, without
+ * previously setting it, to be printed to user.
+ * If user didn't broadcast any value also provide
+ * the explanation on how to do so.
+ *
+ * @param broadcastName The name of the broadcast which user tried to
+ * access
+ * @param hasBroadcasted True iff user has broadcasted value before
+ * @param conf Giraph configuration
+ * @return Warning message
+ */
+ public static String getUnregisteredBroadcastMessage(
+ String broadcastName, boolean hasBroadcasted,
+ ImmutableClassesGiraphConfiguration conf) {
+ String message = "Tried to access broadcast which wasn't set before " +
+ broadcastName;
+ if (!hasBroadcasted) {
+ message = message + "; Values can be broadcasted from " +
+ "MasterCompute by calling broadcast function. " +
+ "Also be sure that you are correctly setting MasterCompute class, " +
+ "currently using " + conf.getMasterComputeClass().getName();
+ }
+ return message;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
index 2d5cc51..8f168a2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
@@ -77,7 +77,7 @@ public class SendAggregatorsToOwnerRequest
if (type == GlobalCommType.REDUCE_OPERATIONS) {
ReduceOperation<Object, Writable> reduceOpCopy =
(ReduceOperation<Object, Writable>)
- WritableUtils.createCopy(reusedOut, reusedIn, value);
+ WritableUtils.createCopy(reusedOut, reusedIn, value, conf);
serverData.getOwnerAggregatorData().registerReducer(
name, reduceOpCopy);
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java
new file mode 100644
index 0000000..81ea654
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Writable representation of aggregated value
+ *
+ * @param <A> Aggregation object type
+ */
+public class AggregatorBroadcast<A extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable
+ implements Writable {
+ /** Aggregator class */
+ private Class<? extends Aggregator<A>> aggregatorClass;
+ /** Aggregated value */
+ private A value;
+
+ /** Constructor */
+ public AggregatorBroadcast() {
+ }
+
+ /**
+ * Constructor
+ * @param aggregatorClass Aggregator class
+ * @param value Aggregated value
+ */
+ public AggregatorBroadcast(
+ Class<? extends Aggregator<A>> aggregatorClass, A value) {
+ this.aggregatorClass = aggregatorClass;
+ this.value = value;
+ }
+
+ public A getValue() {
+ return value;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeClass(aggregatorClass, out);
+ value.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ aggregatorClass = WritableUtils.readClass(in);
+ value = ReflectionUtils.newInstance(aggregatorClass, getConf())
+ .createInitialValue();
+ value.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
index 1673f6d..54d421b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
@@ -22,8 +22,10 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.reducers.OnSameReduceOperation;
-import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
@@ -33,11 +35,13 @@ import org.apache.hadoop.io.Writable;
* @param <A> Aggregation object type
*/
public class AggregatorReduceOperation<A extends Writable>
- extends OnSameReduceOperation<A> {
- /** Aggregator factory */
- private WritableFactory<? extends Aggregator<A>> aggregatorFactory;
+ extends OnSameReduceOperation<A> implements GiraphConfigurationSettable {
+ /** Aggregator class */
+ private Class<? extends Aggregator<A>> aggregatorClass;
/** Aggregator */
private Aggregator<A> aggregator;
+ /** Configuration */
+ private ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
/** Constructor */
public AggregatorReduceOperation() {
@@ -45,18 +49,32 @@ public class AggregatorReduceOperation<A extends Writable>
/**
* Constructor
- * @param aggregatorFactory Aggregator factory
+ * @param aggregatorClass Aggregator class
+ * @param conf Configuration
*/
public AggregatorReduceOperation(
- WritableFactory<? extends Aggregator<A>> aggregatorFactory) {
- this.aggregatorFactory = aggregatorFactory;
- this.aggregator = aggregatorFactory.create();
- this.aggregator.setAggregatedValue(null);
+ Class<? extends Aggregator<A>> aggregatorClass,
+ ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
+ this.aggregatorClass = aggregatorClass;
+ this.conf = conf;
+ initAggregator();
+ }
+
+ /** Initialize aggregator */
+ private void initAggregator() {
+ aggregator = ReflectionUtils.newInstance(aggregatorClass, conf);
+ aggregator.setAggregatedValue(null);
}
@Override
public A createInitialValue() {
- return aggregator.createInitialValue();
+ A agg = aggregator.createInitialValue();
+ if (agg == null) {
+ throw new IllegalStateException(
+ "Aggregators initial value must not be null, but is for " +
+ aggregator);
+ }
+ return agg;
}
/**
@@ -64,29 +82,37 @@ public class AggregatorReduceOperation<A extends Writable>
* @return copy
*/
public AggregatorReduceOperation<A> createCopy() {
- return new AggregatorReduceOperation<>(aggregatorFactory);
+ return new AggregatorReduceOperation<>(aggregatorClass, conf);
+ }
+
+ public Class<? extends Aggregator<A>> getAggregatorClass() {
+ return aggregatorClass;
}
@Override
- public synchronized void reduceSingle(A curValue, A valueToReduce) {
+ public synchronized A reduceSingle(A curValue, A valueToReduce) {
aggregator.setAggregatedValue(curValue);
aggregator.aggregate(valueToReduce);
- if (curValue != aggregator.getAggregatedValue()) {
- throw new IllegalStateException(
- "Aggregator " + aggregator + " aggregates by creating new value");
- }
+ A aggregated = aggregator.getAggregatedValue();
aggregator.setAggregatedValue(null);
+ return aggregated;
+ }
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
+ this.conf = conf;
}
@Override
public void write(DataOutput out) throws IOException {
- WritableUtils.writeWritableObject(aggregatorFactory, out);
+ WritableUtils.writeClass(aggregatorClass, out);
}
@Override
public void readFields(DataInput in) throws IOException {
- aggregatorFactory = WritableUtils.readWritableObject(in, null);
- aggregator = aggregatorFactory.create();
- this.aggregator.setAggregatedValue(null);
+ aggregatorClass = WritableUtils.readClass(in);
+ initAggregator();
}
+
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
index 7492fc7..36a4553 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
@@ -24,10 +24,10 @@ import java.util.HashMap;
import java.util.Map.Entry;
import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.aggregators.ClassAggregatorFactory;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
import com.google.common.base.Preconditions;
@@ -36,8 +36,11 @@ import com.google.common.base.Preconditions;
* reduce and broadcast operations supported by the MasterAggregatorHandler.
*/
public class AggregatorToGlobalCommTranslation
- extends DefaultImmutableClassesGiraphConfigurable
implements MasterAggregatorUsage, Writable {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(AggregatorToGlobalCommTranslation.class);
+
/** Class providing reduce and broadcast interface to use */
private final MasterGlobalCommUsage globalComm;
/** List of registered aggregators */
@@ -45,21 +48,64 @@ public class AggregatorToGlobalCommTranslation
registeredAggregators = new HashMap<>();
/**
+ * List of init aggregator values, in case someone tries to
+ * access aggregator immediatelly after registering it.
+ *
+ * Instead of simply returning value, we need to store it during
+ * that superstep, so consecutive calls will return identical object,
+ * which they can modify.
+ */
+ private final HashMap<String, Writable>
+ initAggregatorValues = new HashMap<>();
+
+ /** Conf */
+ private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
+
+ /**
* Constructor
+ * @param conf Configuration
* @param globalComm Global communication interface
*/
- public AggregatorToGlobalCommTranslation(MasterGlobalCommUsage globalComm) {
+ public AggregatorToGlobalCommTranslation(
+ ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
+ MasterGlobalCommUsage globalComm) {
+ this.conf = conf;
this.globalComm = globalComm;
}
@Override
public <A extends Writable> A getAggregatedValue(String name) {
- return globalComm.getReduced(name);
+ AggregatorWrapper<Writable> agg = registeredAggregators.get(name);
+ if (agg == null) {
+ LOG.warn("getAggregatedValue: " +
+ AggregatorUtils.getUnregisteredAggregatorMessage(name,
+ registeredAggregators.size() != 0, conf));
+ // to make sure we are not accessing reducer of the same name.
+ return null;
+ }
+
+ A value = globalComm.getReduced(name);
+ if (value == null) {
+ value = (A) initAggregatorValues.get(name);
+ }
+
+ if (value == null) {
+ value = (A) agg.getReduceOp().createInitialValue();
+ initAggregatorValues.put(name, value);
+ }
+
+ Preconditions.checkState(value != null);
+ return value;
}
@Override
public <A extends Writable> void setAggregatedValue(String name, A value) {
AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name);
+ if (aggregator == null) {
+ throw new IllegalArgumentException("setAggregatedValue: " +
+ AggregatorUtils.getUnregisteredAggregatorMessage(name,
+ registeredAggregators.size() != 0, conf));
+ }
aggregator.setCurrentValue(value);
}
@@ -72,14 +118,15 @@ public class AggregatorToGlobalCommTranslation
// register reduce with the same value
for (Entry<String, AggregatorWrapper<Writable>> entry :
registeredAggregators.entrySet()) {
- Writable value = entry.getValue().currentValue != null ?
- entry.getValue().getCurrentValue() :
- globalComm.getReduced(entry.getKey());
+ Writable value = entry.getValue().getCurrentValue();
if (value == null) {
- value = entry.getValue().getReduceOp().createInitialValue();
+ value = globalComm.getReduced(entry.getKey());
}
+ Preconditions.checkState(value != null);
+
+ globalComm.broadcast(entry.getKey(), new AggregatorBroadcast<>(
+ entry.getValue().getReduceOp().getAggregatorClass(), value));
- globalComm.broadcast(entry.getKey(), value);
// Always register clean instance of reduceOp, not to conflict with
// reduceOp from previous superstep.
AggregatorReduceOperation<Writable> cleanReduceOp =
@@ -93,31 +140,21 @@ public class AggregatorToGlobalCommTranslation
}
entry.getValue().setCurrentValue(null);
}
+ initAggregatorValues.clear();
}
@Override
public <A extends Writable> boolean registerAggregator(String name,
Class<? extends Aggregator<A>> aggregatorClass) throws
InstantiationException, IllegalAccessException {
- ClassAggregatorFactory<A> aggregatorFactory =
- new ClassAggregatorFactory<A>(aggregatorClass);
- return registerAggregator(name, aggregatorFactory, false) != null;
- }
-
- @Override
- public <A extends Writable> boolean registerAggregator(String name,
- WritableFactory<? extends Aggregator<A>> aggregator) throws
- InstantiationException, IllegalAccessException {
- return registerAggregator(name, aggregator, false) != null;
+ return registerAggregator(name, aggregatorClass, false) != null;
}
@Override
public <A extends Writable> boolean registerPersistentAggregator(String name,
Class<? extends Aggregator<A>> aggregatorClass) throws
InstantiationException, IllegalAccessException {
- ClassAggregatorFactory<A> aggregatorFactory =
- new ClassAggregatorFactory<A>(aggregatorClass);
- return registerAggregator(name, aggregatorFactory, true) != null;
+ return registerAggregator(name, aggregatorClass, true) != null;
}
@Override
@@ -140,27 +177,35 @@ public class AggregatorToGlobalCommTranslation
agg.readFields(in);
registeredAggregators.put(name, agg);
}
+ initAggregatorValues.clear();
}
/**
* Helper function for registering aggregators.
*
- * @param name Name of the aggregator
- * @param aggregatorFactory Aggregator factory
- * @param persistent Whether aggregator is persistent or not
- * @param <A> Aggregated value type
+ * @param name Name of the aggregator
+ * @param aggregatorClass Aggregator class
+ * @param persistent Whether aggregator is persistent or not
+ * @param <A> Aggregated value type
* @return Newly registered aggregator or aggregator which was previously
* created with selected name, if any
*/
private <A extends Writable> AggregatorWrapper<A> registerAggregator
- (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+ (String name, Class<? extends Aggregator<A>> aggregatorClass,
boolean persistent) throws InstantiationException,
IllegalAccessException {
AggregatorWrapper<A> aggregatorWrapper =
(AggregatorWrapper<A>) registeredAggregators.get(name);
if (aggregatorWrapper == null) {
aggregatorWrapper =
- new AggregatorWrapper<A>(aggregatorFactory, persistent);
+ new AggregatorWrapper<A>(aggregatorClass, persistent);
+ // postMasterCompute uses previously reduced value to broadcast,
+ // unless current value is set. After aggregator is registered,
+ // there was no previously reduced value, so set current value
+ // to default to avoid calling getReduced() on unregistered reducer.
+ // (which logs unnecessary warnings)
+ aggregatorWrapper.setCurrentValue(
+ aggregatorWrapper.getReduceOp().createInitialValue());
registeredAggregators.put(
name, (AggregatorWrapper<Writable>) aggregatorWrapper);
}
@@ -171,7 +216,7 @@ public class AggregatorToGlobalCommTranslation
* Object holding all needed data related to single Aggregator
* @param <A> Aggregated value type
*/
- private static class AggregatorWrapper<A extends Writable>
+ private class AggregatorWrapper<A extends Writable>
implements Writable {
/** False iff aggregator should be reset at the end of each super step */
private boolean persistent;
@@ -186,14 +231,14 @@ public class AggregatorToGlobalCommTranslation
/**
* Constructor
- * @param aggregatorFactory Aggregator factory
+ * @param aggregatorClass Aggregator class
* @param persistent Is persistent
*/
public AggregatorWrapper(
- WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+ Class<? extends Aggregator<A>> aggregatorClass,
boolean persistent) {
this.persistent = persistent;
- this.reduceOp = new AggregatorReduceOperation<>(aggregatorFactory);
+ this.reduceOp = new AggregatorReduceOperation<>(aggregatorClass, conf);
}
public AggregatorReduceOperation<A> getReduceOp() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index ab1289d..af7e5fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -895,7 +895,7 @@ public class BspServiceMaster<I extends WritableComparable,
globalCommHandler = new MasterAggregatorHandler(
getConfiguration(), getContext());
aggregatorTranslation = new AggregatorToGlobalCommTranslation(
- globalCommHandler);
+ getConfiguration(), globalCommHandler);
globalCommHandler.initialize(this);
masterCompute = getConfiguration().createMasterCompute();
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 5f7bd73..ccee656 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -28,6 +28,7 @@ import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.SuperstepState;
import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.reducers.Reducer;
@@ -61,6 +62,9 @@ public class MasterAggregatorHandler
/** Progressable used to report progress */
private final Progressable progressable;
+ /** Conf */
+ private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
+
/**
* Constructor
*
@@ -71,6 +75,7 @@ public class MasterAggregatorHandler
ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
Progressable progressable) {
this.progressable = progressable;
+ this.conf = conf;
aggregatorWriter = conf.createAggregatorWriter();
}
@@ -86,10 +91,18 @@ public class MasterAggregatorHandler
R globalInitialValue) {
if (reducerMap.containsKey(name)) {
throw new IllegalArgumentException(
- "Reducer with name " + name + " was already registered");
+ "Reducer with name " + name + " was already registered, " +
+ " and is " + reducerMap.get(name) + ", and we are trying to " +
+ " register " + reduceOp);
}
if (reduceOp == null) {
- throw new IllegalArgumentException("null reduce cannot be registered");
+ throw new IllegalArgumentException(
+ "null reducer cannot be registered, with name " + name);
+ }
+ if (globalInitialValue == null) {
+ throw new IllegalArgumentException(
+ "global initial value for reducer cannot be null, but is for " +
+ reduceOp + " with naem" + name);
}
Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
@@ -98,7 +111,13 @@ public class MasterAggregatorHandler
@Override
public <T extends Writable> T getReduced(String name) {
- return (T) reducedMap.get(name);
+ T value = (T) reducedMap.get(name);
+ if (value == null) {
+ LOG.warn("getReduced: " +
+ AggregatorUtils.getUnregisteredReducerMessage(name,
+ reducedMap.size() != 0, conf));
+ }
+ return value;
}
@Override
@@ -310,14 +329,14 @@ public class MasterAggregatorHandler
for (int i = 0; i < numReducers; i++) {
String name = in.readUTF();
Reducer<Object, Writable> reducer = new Reducer<>();
- reducer.readFields(in);
+ reducer.readFields(in, conf);
reducerMap.put(name, reducer);
}
int numBroadcast = in.readInt();
for (int i = 0; i < numBroadcast; i++) {
String name = in.readUTF();
- Writable value = WritableUtils.readWritableObject(in, null);
+ Writable value = WritableUtils.readWritableObject(in, conf);
broadcastMap.put(name, value);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
index 91f5d24..cadae67 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
@@ -20,7 +20,6 @@ package org.apache.giraph.master;
import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.aggregators.AggregatorUsage;
-import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.Writable;
/**
@@ -41,21 +40,6 @@ public interface MasterAggregatorUsage extends AggregatorUsage {
InstantiationException, IllegalAccessException;
/**
- * Register an aggregator in preSuperstep() and/or preApplication(). This
- * aggregator will have its value reset at the end of each super step.
- *
- * Aggregator should either implement Writable, or have no-arg constructor.
- *
- * @param name of aggregator
- * @param aggregatorFactory aggregator factory
- * @param <A> Aggregator type
- * @return True iff aggregator wasn't already registered
- */
- <A extends Writable> boolean registerAggregator(String name,
- WritableFactory<? extends Aggregator<A>> aggregatorFactory) throws
- InstantiationException, IllegalAccessException;
-
- /**
* Register persistent aggregator in preSuperstep() and/or
* preApplication(). This aggregator will not reset value at the end of
* super step.
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 72e4d0a..68eb416 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -25,7 +25,6 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.reducers.ReduceOperation;
-import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -222,14 +221,6 @@ public abstract class MasterCompute
}
@Override
- public final <A extends Writable> boolean registerAggregator(
- String name, WritableFactory<? extends Aggregator<A>> aggregator)
- throws InstantiationException, IllegalAccessException {
- return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
- name, aggregator);
- }
-
- @Override
public final <A extends Writable> boolean registerPersistentAggregator(
String name,
Class<? extends Aggregator<A>> aggregatorClass) throws
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
index a675f4d..cb9f6e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable;
public abstract class OnSameReduceOperation<R extends Writable>
implements ReduceOperation<R, R> {
@Override
- public final void reducePartial(R curValue, R valueToReduce) {
- reduceSingle(curValue, valueToReduce);
+ public final R reducePartial(R curValue, R valueToReduce) {
+ return reduceSingle(curValue, valueToReduce);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
index 434e21a..adbc4d8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
@@ -43,15 +43,21 @@ public interface ReduceOperation<S, R extends Writable> extends Writable {
* Add a new value.
* Needs to be commutative and associative
*
+ * Commonly, returned value should be same as curValue argument.
+ *
* @param curValue Partial value into which to reduce and store the result
* @param valueToReduce Single value to be reduced
+ * @return reduced value
*/
- void reduceSingle(R curValue, S valueToReduce);
+ R reduceSingle(R curValue, S valueToReduce);
/**
* Add partially reduced value to current partially reduced value.
*
+ * Commonly, returned value should be same as curValue argument.
+ *
* @param curValue Partial value into which to reduce and store the result
* @param valueToReduce Partial value to be reduced
+ * @return reduced value
*/
- void reducePartial(R curValue, R valueToReduce);
+ R reducePartial(R curValue, R valueToReduce);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
index 9f821b4..6759276 100644
--- a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
@@ -32,7 +33,7 @@ import org.apache.hadoop.io.Writable;
* @param <S> Single value type, objects passed on workers
* @param <R> Reduced value type
*/
-public class Reducer<S, R extends Writable> implements Writable {
+public class Reducer<S, R extends Writable> {
/** Reduce operations */
private ReduceOperation<S, R> reduceOp;
/** Current (partially) reduced value*/
@@ -49,7 +50,7 @@ public class Reducer<S, R extends Writable> implements Writable {
*/
public Reducer(ReduceOperation<S, R> reduceOp) {
this.reduceOp = reduceOp;
- this.currentValue = reduceOp.createInitialValue();
+ this.currentValue = createInitialValue();
}
/**
* Constructor
@@ -66,21 +67,26 @@ public class Reducer<S, R extends Writable> implements Writable {
* @param valueToReduce Single value to reduce
*/
public void reduceSingle(S valueToReduce) {
- reduceOp.reduceSingle(currentValue, valueToReduce);
+ currentValue = reduceOp.reduceSingle(currentValue, valueToReduce);
}
/**
* Reduce given partially reduced value into current reduced value.
* @param valueToReduce Partial value to reduce
*/
public void reducePartial(R valueToReduce) {
- reduceOp.reducePartial(currentValue, valueToReduce);
+ currentValue = reduceOp.reducePartial(currentValue, valueToReduce);
}
/**
* Return new initial reduced value.
* @return New initial reduced value
*/
public R createInitialValue() {
- return reduceOp.createInitialValue();
+ R value = reduceOp.createInitialValue();
+ if (value == null) {
+ throw new IllegalStateException(
+ "Initial value for reducer cannot be null, but is for " + reduceOp);
+ }
+ return value;
}
public ReduceOperation<S, R> getReduceOp() {
@@ -95,16 +101,31 @@ public class Reducer<S, R extends Writable> implements Writable {
this.currentValue = currentValue;
}
- @Override
+ /**
+ * Serialize the fields of this object to <code>out</code>.
+ *
+ * @param out <code>DataOuput</code> to serialize this object into.
+ * @throws IOException
+ */
public void write(DataOutput out) throws IOException {
WritableUtils.writeWritableObject(reduceOp, out);
currentValue.write(out);
}
- @Override
- public void readFields(DataInput in) throws IOException {
- reduceOp = WritableUtils.readWritableObject(in, null);
- currentValue = reduceOp.createInitialValue();
+ /**
+ * Deserialize the fields of this object from <code>in</code>.
+ *
+ * <p>For efficiency, implementations should attempt to re-use storage in the
+ * existing object where possible.</p>
+ *
+ * @param in <code>DataInput</code> to deseriablize this object from.
+ * @param conf Configuration
+ * @throws IOException
+ */
+ public void readFields(DataInput in,
+ ImmutableClassesGiraphConfiguration conf) throws IOException {
+ reduceOp = WritableUtils.readWritableObject(in, conf);
+ currentValue = createInitialValue();
currentValue.readFields(in);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
deleted file mode 100644
index 43bed7e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Factory that can be serialized.
- * @param <T> Type of object factory creates
- */
-public interface WritableFactory<T> extends Writable, Factory<T> {
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 923d369..8c24697 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -745,13 +745,15 @@ public class WritableUtils {
* @param reusableOut Reusable output stream to serialize into
* @param reusableIn Reusable input stream to deserialize out of
* @param original Original value of which to make a copy
+ * @param conf Configuration
* @param <T> Type of the object
* @return Copy of the original value
*/
public static <T extends Writable> T createCopy(
UnsafeByteArrayOutputStream reusableOut,
- UnsafeReusableByteArrayInput reusableIn, T original) {
- T copy = (T) createWritable(original.getClass(), null);
+ UnsafeReusableByteArrayInput reusableIn, T original,
+ ImmutableClassesGiraphConfiguration conf) {
+ T copy = (T) createWritable(original.getClass(), conf);
try {
reusableOut.reset();
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
index 5238a07..916e7a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
@@ -18,6 +18,7 @@
package org.apache.giraph.worker;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.master.AggregatorBroadcast;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -64,6 +65,7 @@ public abstract class WorkerAggregatorDelegator<I extends WritableComparable,
@Override
public final <A extends Writable> A getAggregatedValue(String name) {
- return this.<A>getBroadcast(name);
+ AggregatorBroadcast<A> broadcast = workerGlobalCommUsage.getBroadcast(name);
+ return broadcast.getValue();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 05a13a7..ee47542 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -87,7 +87,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
B value = (B) broadcastedMap.get(name);
if (value == null) {
LOG.warn("getBroadcast: " +
- AggregatorUtils.getUnregisteredAggregatorMessage(name,
+ AggregatorUtils.getUnregisteredBroadcastMessage(name,
broadcastedMap.size() != 0, conf));
}
return value;
@@ -103,7 +103,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
}
} else {
throw new IllegalStateException("reduce: " +
- AggregatorUtils.getUnregisteredAggregatorMessage(name,
+ AggregatorUtils.getUnregisteredReducerMessage(name,
reducerMap.size() != 0, conf));
}
}
@@ -122,7 +122,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
}
} else {
throw new IllegalStateException("reduce: " +
- AggregatorUtils.getUnregisteredAggregatorMessage(name,
+ AggregatorUtils.getUnregisteredReducerMessage(name,
reducerMap.size() != 0, conf));
}
}
@@ -309,7 +309,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
entry.getValue().getReduceOp();
ReduceOperation<Object, Writable> threadLocalCopy =
- WritableUtils.createCopy(out, in, globalReduceOp);
+ WritableUtils.createCopy(out, in, globalReduceOp, conf);
threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java
new file mode 100644
index 0000000..9b4e160
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Methods on worker can access broadcasted values through this interface
+ */
+public interface WorkerBroadcastUsage {
+ /**
+ * Get value broadcasted from master
+ * @param name Name of the broadcasted value
+ * @return Broadcasted value
+ * @param <B> Broadcast value type
+ */
+ <B extends Writable> B getBroadcast(String name);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
index 39566f5..fa31bc2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
@@ -17,24 +17,11 @@
*/
package org.apache.giraph.worker;
-import org.apache.hadoop.io.Writable;
/**
* Methods on worker can access broadcasted values and provide
* values to reduce through this interface
*/
-public interface WorkerGlobalCommUsage {
- /**
- * Reduce given value.
- * @param name Name of the reducer
- * @param value Single value to reduce
- */
- void reduce(String name, Object value);
- /**
- * Get value broadcasted from master
- * @param name Name of the broadcasted value
- * @return Broadcasted value
- * @param <B> Broadcast value type
- */
- <B extends Writable> B getBroadcast(String name);
+public interface WorkerGlobalCommUsage
+ extends WorkerBroadcastUsage, WorkerReduceUsage {
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
new file mode 100644
index 0000000..9c2e90d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+/**
+ * Methods on worker can provide values to reduce through this interface
+ */
+public interface WorkerReduceUsage {
+ /**
+ * Reduce given value.
+ * @param name Name of the reducer
+ * @param value Single value to reduce
+ */
+ void reduce(String name, Object value);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
deleted file mode 100644
index 2898647..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.aggregators;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.giraph.utils.ArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.junit.Test;
-
-public class TestArrayAggregator {
- @Test
- public void testMaxAggregator() {
- Aggregator<ArrayWritable<LongWritable>> max = new ArrayAggregatorFactory<>(2, LongMaxAggregator.class).create();
-
- ArrayWritable<LongWritable> tmp = max.createInitialValue();
-
- tmp.get()[0].set(2);
- max.aggregate(tmp);
-
- tmp.get()[0].set(3);
- tmp.get()[1].set(1);
- max.aggregate(tmp);
-
- assertEquals(3L, max.getAggregatedValue().get()[0].get());
- assertEquals(1L, max.getAggregatedValue().get()[1].get());
-
- tmp.get()[0].set(-1);
- tmp.get()[1].set(-1);
- max.setAggregatedValue(tmp);
-
- assertEquals(-1L, max.getAggregatedValue().get()[0].get());
- assertEquals(-1L, max.getAggregatedValue().get()[1].get());
- }
-}