You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/08/01 17:20:53 UTC
git commit: updated refs/heads/trunk to ce97134
Repository: giraph
Updated Branches:
refs/heads/trunk 930352220 -> ce97134d2
GIRAPH-934: Allow having state in aggregators (ikabiljo via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ce97134d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ce97134d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ce97134d
Branch: refs/heads/trunk
Commit: ce97134d253c4e9fca48b7cede2048e60f36ff79
Parents: 9303522
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Aug 1 08:20:10 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Aug 1 08:20:10 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../giraph/aggregators/AggregatorWrapper.java | 22 ++--
.../aggregators/ArrayAggregatorFactory.java | 128 +++++++++++++++++++
.../giraph/aggregators/BasicAggregator.java | 9 ++
.../aggregators/ClassAggregatorFactory.java | 87 +++++++++++++
.../org/apache/giraph/comm/MasterClient.java | 9 +-
.../java/org/apache/giraph/comm/ServerData.java | 14 +-
.../aggregators/AggregatorOutputStream.java | 12 +-
.../comm/aggregators/AggregatorUtils.java | 39 +-----
.../aggregators/AllAggregatorServerData.java | 69 ++++------
.../aggregators/OwnerAggregatorServerData.java | 26 ++--
.../comm/aggregators/SendAggregatorCache.java | 15 ++-
.../giraph/comm/netty/NettyMasterClient.java | 17 +--
.../requests/SendAggregatorsToOwnerRequest.java | 19 +--
.../SendAggregatorsToWorkerRequest.java | 17 +--
.../giraph/comm/requests/WritableRequest.java | 2 +-
.../giraph/master/MasterAggregatorHandler.java | 66 ++++++----
.../giraph/master/MasterAggregatorUsage.java | 16 +++
.../org/apache/giraph/master/MasterCompute.java | 8 ++
.../org/apache/giraph/utils/ArrayWritable.java | 100 +++++++++++++++
.../apache/giraph/utils/WritableFactory.java | 28 ++++
.../org/apache/giraph/utils/WritableUtils.java | 84 ++++++++++--
.../giraph/worker/WorkerAggregatorHandler.java | 52 +++++---
.../giraph/aggregators/TestArrayAggregator.java | 50 ++++++++
24 files changed, 686 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 54ed3a3..dbb134a 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-934: Allow having state in aggregators (ikabiljo via majakabiljo)
+
GIRAPH-932: Adding .arcconfig to GIRAPH for Arcanist support (aching)
GIRAPH-927: Decouple netty server threads from message processing (edunov via pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
index 7150402..fa18a64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
@@ -19,7 +19,7 @@
package org.apache.giraph.aggregators;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.Writable;
/**
@@ -34,21 +34,25 @@ public class AggregatorWrapper<A extends Writable> {
private final boolean persistent;
/** Value aggregated in previous super step */
private A previousAggregatedValue;
+ /** Aggregator factory */
+ private final WritableFactory<? extends Aggregator<A>> aggregatorFactory;
/** Aggregator for next super step */
private final Aggregator<A> currentAggregator;
/** Whether anyone changed current value since the moment it was reset */
private boolean changed;
/**
- * @param aggregatorClass Class type of the aggregator
- * @param persistent False iff aggregator should be reset at the end of
- * each super step
- * @param conf Configuration
+ * @param aggregatorFactory Aggregator Factory
+ * @param persistent False iff aggregator should be reset at the end
+ * of each super step
+ * @param conf Configuration
*/
- public AggregatorWrapper(Class<? extends Aggregator<A>> aggregatorClass,
+ public AggregatorWrapper(
+ WritableFactory<? extends Aggregator<A>> aggregatorFactory,
boolean persistent, ImmutableClassesGiraphConfiguration conf) {
this.persistent = persistent;
- currentAggregator = ReflectionUtils.newInstance(aggregatorClass, conf);
+ this.aggregatorFactory = aggregatorFactory;
+ currentAggregator = aggregatorFactory.create();
changed = false;
previousAggregatedValue = currentAggregator.createInitialValue();
}
@@ -140,7 +144,7 @@ public class AggregatorWrapper<A extends Writable> {
*
* @return Aggregator class
*/
- public Class<? extends Aggregator> getAggregatorClass() {
- return currentAggregator.getClass();
+ public WritableFactory<? extends Aggregator<A>> getAggregatorFactory() {
+ return aggregatorFactory;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
new file mode 100644
index 0000000..c977c57
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+
+import org.apache.giraph.utils.ArrayWritable;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Generic array aggregator factory, used to aggregate elements
+ * of ArrayWritable via passed element aggregator.
+ *
+ * @param <A> Type of individual element
+ */
+public class ArrayAggregatorFactory<A extends Writable>
+ implements WritableFactory<Aggregator<ArrayWritable<A>>> {
+ /** number of elements in array */
+ private int n;
+ /** element aggregator class */
+ private WritableFactory<? extends Aggregator<A>> elementAggregatorFactory;
+
+ /**
+ * Constructor
+ * @param n Number of elements in array
+ * @param elementAggregatorClass Type of element aggregator
+ */
+ public ArrayAggregatorFactory(
+ int n, Class<? extends Aggregator<A>> elementAggregatorClass) {
+ this(n, new ClassAggregatorFactory<>(elementAggregatorClass));
+ }
+
+ /**
+ * Constructor
+ * @param n Number of elements in array
+ * @param elementAggregatorFactory Element aggregator factory
+ */
+ public ArrayAggregatorFactory(int n,
+ WritableFactory<? extends Aggregator<A>> elementAggregatorFactory) {
+ this.n = n;
+ this.elementAggregatorFactory = elementAggregatorFactory;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ n = in.readInt();
+ elementAggregatorFactory = WritableUtils.readWritableObject(in, null);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(n);
+ WritableUtils.writeWritableObject(elementAggregatorFactory, out);
+ }
+
+ @Override
+ public Aggregator<ArrayWritable<A>> create() {
+ return new ArrayAggregator<>(
+ n, elementAggregatorFactory.create());
+ }
+
+ /**
+ * Stateful aggregator that aggregates ArrayWritable by
+ * aggregating individual elements
+ *
+ * @param <A> Type of individual element
+ */
+ public static class ArrayAggregator<A extends Writable>
+ extends BasicAggregator<ArrayWritable<A>> {
+ /** number of elements in array */
+ private final int n;
+ /** element aggregator */
+ private final Aggregator<A> elementAggregator;
+
+ /**
+ * Constructor
+ * @param n Number of elements in array
+ * @param elementAggregator Element aggregator
+ */
+ public ArrayAggregator(int n, Aggregator<A> elementAggregator) {
+ super(null);
+ this.n = n;
+ this.elementAggregator = elementAggregator;
+ reset();
+ }
+
+ @Override
+ public void aggregate(ArrayWritable<A> other) {
+ A[] array = getAggregatedValue().get();
+ for (int i = 0; i < n; i++) {
+ elementAggregator.setAggregatedValue(array[i]);
+ elementAggregator.aggregate(other.get()[i]);
+ array[i] = elementAggregator.getAggregatedValue();
+ }
+ }
+
+ @Override
+ public ArrayWritable<A> createInitialValue() {
+ Class<A> elementClass =
+ (Class) elementAggregator.createInitialValue().getClass();
+ A[] array = (A[]) Array.newInstance(elementClass, n);
+ for (int i = 0; i < n; i++) {
+ array[i] = elementAggregator.createInitialValue();
+ }
+ return new ArrayWritable<>(elementClass, array);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
index 07a4100..c351846 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
@@ -40,6 +40,15 @@ public abstract class BasicAggregator<A extends Writable> implements
value = createInitialValue();
}
+ /**
+ * Constructor
+ * @param initialValue initial value
+ */
+ public BasicAggregator(A initialValue) {
+ value = initialValue;
+ }
+
+
@Override
public A getAggregatedValue() {
return value;
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
new file mode 100644
index 0000000..944656e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Aggregator factory based on aggregatorClass.
+ *
+ * @param <T> Aggregated value type
+ */
+public class ClassAggregatorFactory<T extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable
+ implements WritableFactory<Aggregator<T>> {
+ /** Aggregator class */
+ private Class<? extends Aggregator<T>> aggregatorClass;
+
+ /** Constructor */
+ public ClassAggregatorFactory() {
+ }
+
+ /**
+ * Constructor
+ * @param aggregatorClass Aggregator class
+ */
+ public ClassAggregatorFactory(
+ Class<? extends Aggregator<T>> aggregatorClass) {
+ this(aggregatorClass, null);
+
+ }
+
+ /**
+ * Constructor
+ * @param aggregatorClass Aggregator class
+ * @param conf Configuration
+ */
+ public ClassAggregatorFactory(Class<? extends Aggregator<T>> aggregatorClass,
+ ImmutableClassesGiraphConfiguration conf) {
+ Preconditions.checkNotNull(aggregatorClass,
+ "aggregatorClass cannot be null in ClassAggregatorFactory");
+ this.aggregatorClass = aggregatorClass;
+ setConf(conf);
+ }
+
+ @Override
+ public Aggregator<T> create() {
+ return ReflectionUtils.newInstance(aggregatorClass, getConf());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ aggregatorClass = WritableUtils.readClass(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Preconditions.checkNotNull(aggregatorClass,
+ "aggregatorClass cannot be null in ClassAggregatorFactory");
+ WritableUtils.writeClass(aggregatorClass, out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
index 793d059..b7718a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -18,11 +18,12 @@
package org.apache.giraph.comm;
+import java.io.IOException;
+
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.Writable;
-import java.io.IOException;
-
/**
* Interface for master to send messages to workers
*/
@@ -36,12 +37,12 @@ public interface MasterClient {
* Sends aggregator to its owner
*
* @param aggregatorName Name of the aggregator
- * @param aggregatorClass Class of the aggregator
+ * @param aggregatorFactory Aggregator factory
* @param aggregatedValue Value of the aggregator
* @throws IOException
*/
void sendAggregator(String aggregatorName,
- Class<? extends Aggregator> aggregatorClass,
+ WritableFactory<? extends Aggregator> aggregatorFactory,
Writable aggregatedValue) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 29488fc..a92cd1c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -18,6 +18,12 @@
package org.apache.giraph.comm;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
@@ -36,12 +42,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Anything that the server stores
*
@@ -123,7 +123,7 @@ public class ServerData<I extends WritableComparable,
EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
edgeStoreFactory.initialize(service, conf, context);
edgeStore = edgeStoreFactory.newStore();
- ownerAggregatorData = new OwnerAggregatorServerData(context, conf);
+ ownerAggregatorData = new OwnerAggregatorServerData(context);
allAggregatorData = new AllAggregatorServerData(context, conf);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
index 627b4cc..79bc08a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
@@ -18,11 +18,13 @@
package org.apache.giraph.comm.aggregators;
+import java.io.IOException;
+
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
-import java.io.IOException;
-
/**
* Implementation of {@link CountingOutputStream} which allows writing of
* aggregators in the form of triple (name, classname, value)
@@ -32,17 +34,17 @@ public class AggregatorOutputStream extends CountingOutputStream {
* Write aggregator to the stream and increment internal counter
*
* @param aggregatorName Name of the aggregator
- * @param aggregatorClass Class of aggregator
+ * @param aggregatorFactory Aggregator factory
* @param aggregatedValue Value of aggregator
* @return Number of bytes occupied by the stream
* @throws IOException
*/
public int addAggregator(String aggregatorName,
- Class<? extends Aggregator> aggregatorClass,
+ WritableFactory<? extends Aggregator> aggregatorFactory,
Writable aggregatedValue) throws IOException {
incrementCounter();
dataOutput.writeUTF(aggregatorName);
- dataOutput.writeUTF(aggregatorClass.getName());
+ WritableUtils.writeWritableObject(aggregatorFactory, dataOutput);
aggregatedValue.write(dataOutput);
return getSize();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
index ceb30a8..a94ab38 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
@@ -18,13 +18,10 @@
package org.apache.giraph.comm.aggregators;
+import java.util.List;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-
-import java.util.List;
/**
* Class for aggregator constants and utility methods
@@ -36,6 +33,7 @@ public class AggregatorUtils {
*/
public static final String SPECIAL_COUNT_AGGREGATOR =
"__aggregatorRequestCount";
+
/** How big a single aggregator request can be (in bytes) */
public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST =
"giraph.maxBytesPerAggregatorRequest";
@@ -58,37 +56,6 @@ public class AggregatorUtils {
private AggregatorUtils() { }
/**
- * Get aggregator class from class name, catch all exceptions.
- *
- * @param aggregatorClassName Class nam of aggregator class
- * @return Aggregator class
- */
- public static Class<Aggregator<Writable>> getAggregatorClass(String
- aggregatorClassName) {
- try {
- return (Class<Aggregator<Writable>>) Class.forName(aggregatorClassName);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException("getAggregatorClass: " +
- "ClassNotFoundException for aggregator class " + aggregatorClassName,
- e);
- }
- }
-
- /**
- * Create new aggregator instance from aggregator class,
- * catch all exceptions.
- *
- * @param aggregatorClass Class of aggregator
- * @param conf Configuration
- * @return New aggregator
- */
- public static Aggregator<Writable> newAggregatorInstance(
- Class<Aggregator<Writable>> aggregatorClass,
- ImmutableClassesGiraphConfiguration conf) {
- return ReflectionUtils.newInstance(aggregatorClass, conf);
- }
-
- /**
* Get owner of aggregator with selected name from the list of workers
*
* @param aggregatorName Name of the aggregators
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
index 177e738..effc9bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
@@ -18,10 +18,18 @@
package org.apache.giraph.comm.aggregators;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.utils.Factory;
import org.apache.giraph.utils.TaskIdsPermitsBarrier;
+import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
@@ -29,12 +37,6 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
/**
* Accepts aggregators and their values from previous superstep from master
* and workers which own aggregators. Keeps data received from master so it
@@ -49,16 +51,9 @@ public class AllAggregatorServerData {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(AllAggregatorServerData.class);
- /**
- * Map from aggregator class to aggregator object which we need in order
- * to create initial aggregated values
- */
- private final
- ConcurrentMap<Class<Aggregator<Writable>>, Aggregator<Writable>>
- aggregatorTypesMap = Maps.newConcurrentMap();
- /** Map of aggregator classes */
- private final ConcurrentMap<String, Class<Aggregator<Writable>>>
- aggregatorClassMap = Maps.newConcurrentMap();
+ /** Map of aggregator factories */
+ private final ConcurrentMap<String, WritableFactory<Aggregator<Writable>>>
+ aggregatorFactoriesMap = Maps.newConcurrentMap();
/** Map of values of aggregators from previous superstep */
private final ConcurrentMap<String, Writable>
aggregatedValuesMap = Maps.newConcurrentMap();
@@ -104,16 +99,12 @@ public class AllAggregatorServerData {
/**
* Register the class of the aggregator, received by master or worker.
*
- * @param name Aggregator name
- * @param aggregatorClass Class of the aggregator
+ * @param name Aggregator name
+ * @param aggregatorFactory Aggregator factory
*/
public void registerAggregatorClass(String name,
- Class<Aggregator<Writable>> aggregatorClass) {
- aggregatorClassMap.put(name, aggregatorClass);
- if (!aggregatorTypesMap.containsKey(aggregatorClass)) {
- aggregatorTypesMap.putIfAbsent(aggregatorClass,
- AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
- }
+ WritableFactory<Aggregator<Writable>> aggregatorFactory) {
+ aggregatorFactoriesMap.put(name, aggregatorFactory);
progressable.progress();
}
@@ -139,10 +130,10 @@ public class AllAggregatorServerData {
* @return Empty aggregated value for this aggregator
*/
public Writable createAggregatorInitialValue(String name) {
- Class<Aggregator<Writable>> aggregatorClass = aggregatorClassMap.get(name);
- Aggregator<Writable> aggregator = aggregatorTypesMap.get(aggregatorClass);
- synchronized (aggregator) {
- return aggregator.createInitialValue();
+ WritableFactory<Aggregator<Writable>> aggregatorFactory =
+ aggregatorFactoriesMap.get(name);
+ synchronized (aggregatorFactory) {
+ return aggregatorFactory.create().createInitialValue();
}
}
@@ -211,29 +202,25 @@ public class AllAggregatorServerData {
* @param workerIds All workers in the job apart from the current one
* @param previousAggregatedValuesMap Map of values from previous
* superstep to fill out
- * @param currentAggregatorMap Map of aggregators for current superstep to
- * fill out. All aggregators in this map will
- * be set to initial value.
+ * @param currentAggregatorFactoryMap Map of aggregators factories for
+ * current superstep to fill out.
*/
public void fillNextSuperstepMapsWhenReady(
Set<Integer> workerIds,
Map<String, Writable> previousAggregatedValuesMap,
- Map<String, Aggregator<Writable>> currentAggregatorMap) {
+ Map<String, Factory<Aggregator<Writable>>> currentAggregatorFactoryMap) {
workersBarrier.waitForRequiredPermits(workerIds);
if (LOG.isDebugEnabled()) {
LOG.debug("fillNextSuperstepMapsWhenReady: Aggregators ready");
}
previousAggregatedValuesMap.clear();
previousAggregatedValuesMap.putAll(aggregatedValuesMap);
- for (Map.Entry<String, Class<Aggregator<Writable>>> entry :
- aggregatorClassMap.entrySet()) {
- Aggregator<Writable> aggregator =
- currentAggregatorMap.get(entry.getKey());
- if (aggregator == null) {
- currentAggregatorMap.put(entry.getKey(),
- AggregatorUtils.newAggregatorInstance(entry.getValue(), conf));
- } else {
- aggregator.reset();
+ for (Map.Entry<String, WritableFactory<Aggregator<Writable>>> entry :
+ aggregatorFactoriesMap.entrySet()) {
+ Factory<Aggregator<Writable>> aggregatorFactory =
+ currentAggregatorFactoryMap.get(entry.getKey());
+ if (aggregatorFactory == null) {
+ currentAggregatorFactoryMap.put(entry.getKey(), entry.getValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
index eb25a2e..2f3d5e5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
@@ -18,9 +18,14 @@
package org.apache.giraph.comm.aggregators;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.TaskIdsPermitsBarrier;
+import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
@@ -29,11 +34,6 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-import java.util.AbstractMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
/**
* Class for holding aggregators which current worker owns,
* and aggregating partial aggregator values from workers.
@@ -73,19 +73,14 @@ public class OwnerAggregatorServerData {
private final TaskIdsPermitsBarrier workersBarrier;
/** Progressable used to report progress */
private final Progressable progressable;
- /** Configuration */
- private final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor
*
* @param progressable Progressable used to report progress
- * @param conf Configuration
*/
- public OwnerAggregatorServerData(Progressable progressable,
- ImmutableClassesGiraphConfiguration conf) {
+ public OwnerAggregatorServerData(Progressable progressable) {
this.progressable = progressable;
- this.conf = conf;
workersBarrier = new TaskIdsPermitsBarrier(progressable);
}
@@ -93,15 +88,14 @@ public class OwnerAggregatorServerData {
* Register an aggregator which current worker owns. Thread-safe.
*
* @param name Name of aggregator
- * @param aggregatorClass Aggregator class
+ * @param aggregatorFactory Aggregator factory
*/
public void registerAggregator(String name,
- Class<Aggregator<Writable>> aggregatorClass) {
+ WritableFactory<Aggregator<Writable>> aggregatorFactory) {
if (LOG.isDebugEnabled() && myAggregatorMap.isEmpty()) {
LOG.debug("registerAggregator: The first registration after a reset()");
}
- myAggregatorMap.putIfAbsent(name,
- AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
+ myAggregatorMap.putIfAbsent(name, aggregatorFactory.create());
progressable.progress();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
index adc2aa8..8f880b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
@@ -18,15 +18,16 @@
package org.apache.giraph.comm.aggregators;
+import java.io.IOException;
+import java.util.Map;
+
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.util.Map;
-
/**
* Takes and serializes aggregators and keeps them grouped by owner
* partition id, to be sent in bulk.
@@ -41,20 +42,20 @@ public class SendAggregatorCache extends CountingCache {
*
* @param taskId Task id of worker which owns the aggregator
* @param aggregatorName Name of the aggregator
- * @param aggregatorClass Class of the aggregator
+ * @param aggregatorFactory Aggregator factory
* @param aggregatedValue Value of the aggregator
* @return Number of bytes in serialized data for this worker
* @throws IOException
*/
public int addAggregator(Integer taskId, String aggregatorName,
- Class<? extends Aggregator> aggregatorClass,
+ WritableFactory<? extends Aggregator> aggregatorFactory,
Writable aggregatedValue) throws IOException {
AggregatorOutputStream out = aggregatorMap.get(taskId);
if (out == null) {
out = new AggregatorOutputStream();
aggregatorMap.put(taskId, out);
}
- return out.addAggregator(aggregatorName, aggregatorClass,
+ return out.addAggregator(aggregatorName, aggregatorFactory,
aggregatedValue);
}
@@ -86,6 +87,6 @@ public class SendAggregatorCache extends CountingCache {
// current number of requests, plus one for the last flush
long totalCount = getCount(taskId) + 1;
addAggregator(taskId, AggregatorUtils.SPECIAL_COUNT_AGGREGATOR,
- Aggregator.class, new LongWritable(totalCount));
+ null, new LongWritable(totalCount));
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index 1218d29..51277c9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -18,20 +18,21 @@
package org.apache.giraph.comm.netty;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.SendAggregatorCache;
import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.WritableFactory;
import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Progressable;
-import java.io.IOException;
-
/**
* Netty implementation of {@link MasterClient}
*/
@@ -39,7 +40,7 @@ public class NettyMasterClient implements MasterClient {
/** Netty client that does the actual I/O */
private final NettyClient nettyClient;
/** Worker information for current superstep */
- private CentralizedServiceMaster<?, ?, ?> service;
+ private final CentralizedServiceMaster<?, ?, ?> service;
/** Cached map of partition ids to serialized aggregator data */
private final SendAggregatorCache sendAggregatorCache =
new SendAggregatorCache();
@@ -78,12 +79,12 @@ public class NettyMasterClient implements MasterClient {
@Override
public void sendAggregator(String aggregatorName,
- Class<? extends Aggregator> aggregatorClass,
+ WritableFactory<? extends Aggregator> aggregatorFactory,
Writable aggregatedValue) throws IOException {
WorkerInfo owner =
AggregatorUtils.getOwner(aggregatorName, service.getWorkerInfoList());
int currentSize = sendAggregatorCache.addAggregator(owner.getTaskId(),
- aggregatorName, aggregatorClass, aggregatedValue);
+ aggregatorName, aggregatorFactory, aggregatedValue);
if (currentSize >= maxBytesPerAggregatorRequest) {
flushAggregatorsToWorker(owner);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
index e2681ee..10d8d02 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
@@ -18,16 +18,18 @@
package org.apache.giraph.comm.requests;
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import java.io.DataInput;
-import java.io.IOException;
-
/**
* Request to send final aggregatd values from master to worker which owns
* the aggregators
@@ -59,23 +61,22 @@ public class SendAggregatorsToOwnerRequest
int numAggregators = input.readInt();
for (int i = 0; i < numAggregators; i++) {
String aggregatorName = input.readUTF();
- String aggregatorClassName = input.readUTF();
+ WritableFactory<Aggregator<Writable>> aggregatorFactory =
+ WritableUtils.readWritableObject(input, conf);
if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
LongWritable count = new LongWritable(0);
count.readFields(input);
aggregatorData.receivedRequestCountFromMaster(count.get(),
getSenderTaskId());
} else {
- Class<Aggregator<Writable>> aggregatorClass =
- AggregatorUtils.getAggregatorClass(aggregatorClassName);
aggregatorData.registerAggregatorClass(aggregatorName,
- aggregatorClass);
+ aggregatorFactory);
Writable aggregatorValue =
aggregatorData.createAggregatorInitialValue(aggregatorName);
aggregatorValue.readFields(input);
aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
serverData.getOwnerAggregatorData().registerAggregator(
- aggregatorName, aggregatorClass);
+ aggregatorName, aggregatorFactory);
}
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
index 52e4cba..d469e96 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
@@ -18,16 +18,18 @@
package org.apache.giraph.comm.requests;
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import java.io.DataInput;
-import java.io.IOException;
-
/**
* Request to send final aggregated values from worker which owns them to
* other workers
@@ -59,17 +61,16 @@ public class SendAggregatorsToWorkerRequest extends
int numAggregators = input.readInt();
for (int i = 0; i < numAggregators; i++) {
String aggregatorName = input.readUTF();
- String aggregatorClassName = input.readUTF();
+ WritableFactory<Aggregator<Writable>> aggregatorFactory =
+ WritableUtils.readWritableObject(input, conf);
if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
LongWritable count = new LongWritable(0);
count.readFields(input);
aggregatorData.receivedRequestCountFromWorker(count.get(),
getSenderTaskId());
} else {
- Class<Aggregator<Writable>> aggregatorClass =
- AggregatorUtils.getAggregatorClass(aggregatorClassName);
aggregatorData.registerAggregatorClass(aggregatorName,
- aggregatorClass);
+ aggregatorFactory);
Writable aggregatorValue =
aggregatorData.createAggregatorInitialValue(aggregatorName);
aggregatorValue.readFields(input);
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
index 14c8c0d..62ab7f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
@@ -44,7 +44,7 @@ public abstract class WritableRequest<I extends WritableComparable,
public static final int UNKNOWN_SIZE = -1;
/** Configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E> conf;
+ protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Client id */
private int clientId = -1;
/** Request id */
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 325d91f..2bc08e9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -18,15 +18,24 @@
package org.apache.giraph.master;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Map;
+
import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.aggregators.AggregatorWrapper;
import org.apache.giraph.aggregators.AggregatorWriter;
+import org.apache.giraph.aggregators.ClassAggregatorFactory;
import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.MasterLoggingAggregator;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
@@ -35,12 +44,6 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Map;
-
/** Handler for aggregators on master */
public class MasterAggregatorHandler implements MasterAggregatorUsage,
Writable {
@@ -106,7 +109,17 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
Class<? extends Aggregator<A>> aggregatorClass) throws
InstantiationException, IllegalAccessException {
checkAggregatorName(name);
- return registerAggregator(name, aggregatorClass, false) != null;
+ ClassAggregatorFactory<A> aggregatorFactory =
+ new ClassAggregatorFactory<A>(aggregatorClass, conf);
+ return registerAggregator(name, aggregatorFactory, false) != null;
+ }
+
+ @Override
+ public <A extends Writable> boolean registerAggregator(String name,
+ WritableFactory<? extends Aggregator<A>> aggregator) throws
+ InstantiationException, IllegalAccessException {
+ checkAggregatorName(name);
+ return registerAggregator(name, aggregator, false) != null;
}
@Override
@@ -114,7 +127,9 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
Class<? extends Aggregator<A>> aggregatorClass) throws
InstantiationException, IllegalAccessException {
checkAggregatorName(name);
- return registerAggregator(name, aggregatorClass, true) != null;
+ ClassAggregatorFactory<A> aggregatorFactory =
+ new ClassAggregatorFactory<A>(aggregatorClass, conf);
+ return registerAggregator(name, aggregatorFactory, true) != null;
}
/**
@@ -134,22 +149,22 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
/**
* Helper function for registering aggregators.
*
- * @param name Name of the aggregator
- * @param aggregatorClass Class of the aggregator
- * @param persistent Whether aggregator is persistent or not
- * @param <A> Aggregated value type
+ * @param name Name of the aggregator
+ * @param aggregatorFactory Aggregator factory
+ * @param persistent Whether aggregator is persistent or not
+ * @param <A> Aggregated value type
* @return Newly registered aggregator or aggregator which was previously
* created with selected name, if any
*/
private <A extends Writable> AggregatorWrapper<A> registerAggregator
- (String name, Class<? extends Aggregator<A>> aggregatorClass,
+ (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
boolean persistent) throws InstantiationException,
IllegalAccessException {
AggregatorWrapper<A> aggregatorWrapper =
(AggregatorWrapper<A>) aggregatorMap.get(name);
if (aggregatorWrapper == null) {
aggregatorWrapper =
- new AggregatorWrapper<A>(aggregatorClass, persistent, conf);
+ new AggregatorWrapper<A>(aggregatorFactory, persistent, conf);
aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
}
return aggregatorWrapper;
@@ -207,7 +222,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
aggregatorMap.entrySet()) {
masterClient.sendAggregator(entry.getKey(),
- entry.getValue().getAggregatorClass(),
+ entry.getValue().getAggregatorFactory(),
entry.getValue().getPreviousAggregatedValue());
progressable.progress();
}
@@ -322,7 +337,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
aggregatorMap.entrySet()) {
out.writeUTF(entry.getKey());
- out.writeUTF(entry.getValue().getAggregatorClass().getName());
+ entry.getValue().getAggregatorFactory().write(out);
out.writeBoolean(entry.getValue().isPersistent());
entry.getValue().getPreviousAggregatedValue().write(out);
progressable.progress();
@@ -336,15 +351,16 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
try {
for (int i = 0; i < numAggregators; i++) {
String aggregatorName = in.readUTF();
- String aggregatorClassName = in.readUTF();
+ WritableFactory<Aggregator<Writable>> aggregatorFactory =
+ WritableUtils.readWritableObject(in, conf);
boolean isPersistent = in.readBoolean();
- AggregatorWrapper<Writable> aggregator = registerAggregator(
+ AggregatorWrapper<Writable> aggregatorWrapper = registerAggregator(
aggregatorName,
- AggregatorUtils.getAggregatorClass(aggregatorClassName),
+ aggregatorFactory,
isPersistent);
- Writable value = aggregator.createInitialValue();
+ Writable value = aggregatorWrapper.createInitialValue();
value.readFields(in);
- aggregator.setPreviousAggregatedValue(value);
+ aggregatorWrapper.setPreviousAggregatedValue(value);
progressable.progress();
}
} catch (InstantiationException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
index cadae67..91f5d24 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
@@ -20,6 +20,7 @@ package org.apache.giraph.master;
import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.Writable;
/**
@@ -40,6 +41,21 @@ public interface MasterAggregatorUsage extends AggregatorUsage {
InstantiationException, IllegalAccessException;
/**
+ * Register an aggregator in preSuperstep() and/or preApplication(). This
+ * aggregator will have its value reset at the end of each super step.
+ *
+ * Aggregator should either implement Writable, or have no-arg constructor.
+ *
+ * @param name of aggregator
+ * @param aggregatorFactory aggregator factory
+ * @param <A> Aggregator type
+ * @return True iff aggregator wasn't already registered
+ */
+ <A extends Writable> boolean registerAggregator(String name,
+ WritableFactory<? extends Aggregator<A>> aggregatorFactory) throws
+ InstantiationException, IllegalAccessException;
+
+ /**
* Register persistent aggregator in preSuperstep() and/or
* preApplication(). This aggregator will not reset value at the end of
* super step.
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index d77a9b5..c2a1f9a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -23,6 +23,7 @@ import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.utils.WritableFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -195,6 +196,13 @@ public abstract class MasterCompute
}
@Override
+ public final <A extends Writable> boolean registerAggregator(
+ String name, WritableFactory<? extends Aggregator<A>> aggregator)
+ throws InstantiationException, IllegalAccessException {
+ return masterAggregatorUsage.registerAggregator(name, aggregator);
+ }
+
+ @Override
public final <A extends Writable> boolean registerPersistentAggregator(
String name,
Class<? extends Aggregator<A>> aggregatorClass) throws
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java
new file mode 100644
index 0000000..9ea24c3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A Writable for arrays containing instances of a class. The elements of this
+ * writable must all be instances of the same class.
+ *
+ * @param <T> element type
+ */
+public class ArrayWritable<T extends Writable> implements Writable {
+ /** Element type class */
+ private Class<T> valueClass;
+ /** Array */
+ private T[] values;
+
+ /** Constructor */
+ public ArrayWritable() {
+ }
+
+ /**
+ * Constructor
+ * @param valueClass Element type class
+ * @param values Array of elements
+ */
+ public ArrayWritable(Class<T> valueClass, T[] values) {
+ Preconditions.checkNotNull(valueClass,
+ "valueClass cannot be null in ArrayWritable");
+ this.valueClass = valueClass;
+ this.values = values;
+ }
+
+ /**
+ * Get element type class
+ * @return element type class
+ */
+ public Class<T> getValueClass() {
+ return valueClass;
+ }
+
+ /**
+ * Set array
+ * @param values array
+ */
+ public void set(T[] values) { this.values = values; }
+
+ /**
+ * Ger array
+ * @return array
+ */
+ public T[] get() { return values; }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ valueClass = WritableUtils.readClass(in);
+ values = (T[]) Array.newInstance(valueClass, in.readInt());
+
+ for (int i = 0; i < values.length; i++) {
+ T value = (T) WritableFactories.newInstance(valueClass);
+ value.readFields(in); // read a value
+ values[i] = value; // store it in values
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Preconditions.checkNotNull(valueClass,
+ "valueClass cannot be null in ArrayWritable");
+ WritableUtils.writeClass(valueClass, out);
+ out.writeInt(values.length); // write values
+ for (int i = 0; i < values.length; i++) {
+ values[i].write(out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
new file mode 100644
index 0000000..43bed7e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Factory that can be serialized.
+ * @param <T> Type of object factory creates
+ */
+public interface WritableFactory<T> extends Writable, Factory<T> {
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 3f8382e..763f59d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -18,6 +18,18 @@
package org.apache.giraph.utils;
+import static org.apache.hadoop.util.ReflectionUtils.newInstance;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.OutEdges;
@@ -33,18 +45,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.hadoop.util.ReflectionUtils.newInstance;
-
/**
* Helper static methods for working with Writable objects.
*/
@@ -70,6 +70,23 @@ public class WritableUtils {
}
/**
+ * Instantiate a new Writable, checking for NullWritable along the way.
+ *
+ * @param klass Class
+ * @param configuration Configuration
+ * @param <W> type
+ * @return new instance of class
+ */
+ public static <W extends Writable> W createWritable(
+ Class<W> klass,
+ ImmutableClassesGiraphConfiguration configuration) {
+ W result = createWritable(klass);
+ ConfigurationUtils.configureIfPossible(result, configuration);
+ return result;
+ }
+
+
+ /**
* Read fields from byteArray to a Writeable object.
*
* @param byteArray Byte array to find the fields in.
@@ -616,4 +633,47 @@ public class WritableUtils {
return null;
}
}
+
+ /**
+ * Write object to output stream
+ * @param object Object
+ * @param output Output stream
+ * @throws IOException
+ */
+ public static void writeWritableObject(
+ Writable object, DataOutput output)
+ throws IOException {
+ output.writeBoolean(object != null);
+ if (object != null) {
+ output.writeUTF(object.getClass().getName());
+ object.write(output);
+ }
+ }
+
+ /**
+ * Reads object from input stream
+ * @param input Input stream
+ * @param conf Configuration
+ * @param <T> Object type
+ * @return Object
+ * @throws IOException
+ */
+ public static <T extends Writable>
+ T readWritableObject(DataInput input,
+ ImmutableClassesGiraphConfiguration conf) throws IOException {
+ if (input.readBoolean()) {
+ String className = input.readUTF();
+ try {
+ T object =
+ (T) ReflectionUtils.newInstance(Class.forName(className), conf);
+ object.readFields(input);
+ return object;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("readWritableObject: No class found " +
+ className);
+ }
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 9bfd7b5..45ca665 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -18,14 +18,19 @@
package org.apache.giraph.worker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.Factory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
@@ -33,10 +38,6 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
/**
* Handler for aggregators on worker. Provides the aggregated values and
* performs aggregations from user vertex code (thread-safe). Also has
@@ -58,10 +59,13 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
private static final Logger LOG =
Logger.getLogger(WorkerAggregatorHandler.class);
/** Map of values from previous superstep */
- private Map<String, Writable> previousAggregatedValueMap =
+ private final Map<String, Writable> previousAggregatedValueMap =
Maps.newHashMap();
+ /** Map of aggregator factories for current superstep */
+ private final Map<String, Factory<Aggregator<Writable>>>
+ currentAggregatorFactoryMap = Maps.newHashMap();
/** Map of aggregators for current superstep */
- private Map<String, Aggregator<Writable>> currentAggregatorMap =
+ private final Map<String, Aggregator<Writable>> currentAggregatorMap =
Maps.newHashMap();
/** Service worker */
private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
@@ -143,7 +147,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
// Wait for all other aggregators and store them
allAggregatorData.fillNextSuperstepMapsWhenReady(
getOtherWorkerIdsSet(), previousAggregatedValueMap,
- currentAggregatorMap);
+ currentAggregatorFactoryMap);
+ fillAndInitAggregatorsMap(currentAggregatorMap);
allAggregatorData.reset();
if (LOG.isDebugEnabled()) {
LOG.debug("prepareSuperstep: Aggregators prepared");
@@ -151,6 +156,25 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
}
/**
+ * Fills aggregators map from currentAggregatorFactoryMap.
+ * All aggregators in this map will be set to initial value.
+ * @param aggregatorMap Map to fill.
+ */
+ private void fillAndInitAggregatorsMap(
+ Map<String, Aggregator<Writable>> aggregatorMap) {
+ for (Map.Entry<String, Factory<Aggregator<Writable>>> entry :
+ currentAggregatorFactoryMap.entrySet()) {
+ Aggregator<Writable> aggregator =
+ aggregatorMap.get(entry.getKey());
+ if (aggregator == null) {
+ aggregatorMap.put(entry.getKey(), entry.getValue().create());
+ } else {
+ aggregator.reset();
+ }
+ }
+ }
+
+ /**
* Send aggregators to their owners and in the end to the master
*
* @param requestProcessor Request processor for aggregators
@@ -286,13 +310,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
public ThreadLocalWorkerAggregatorUsage() {
threadAggregatorMap = Maps.newHashMapWithExpectedSize(
WorkerAggregatorHandler.this.currentAggregatorMap.size());
- for (Map.Entry<String, Aggregator<Writable>> entry :
- WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
- threadAggregatorMap.put(entry.getKey(),
- AggregatorUtils.newAggregatorInstance(
- (Class<Aggregator<Writable>>) entry.getValue().getClass(),
- conf));
- }
+ fillAndInitAggregatorsMap(threadAggregatorMap);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
new file mode 100644
index 0000000..2898647
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+public class TestArrayAggregator {
+ @Test
+ public void testMaxAggregator() {
+ Aggregator<ArrayWritable<LongWritable>> max = new ArrayAggregatorFactory<>(2, LongMaxAggregator.class).create();
+
+ ArrayWritable<LongWritable> tmp = max.createInitialValue();
+
+ tmp.get()[0].set(2);
+ max.aggregate(tmp);
+
+ tmp.get()[0].set(3);
+ tmp.get()[1].set(1);
+ max.aggregate(tmp);
+
+ assertEquals(3L, max.getAggregatedValue().get()[0].get());
+ assertEquals(1L, max.getAggregatedValue().get()[1].get());
+
+ tmp.get()[0].set(-1);
+ tmp.get()[1].set(-1);
+ max.setAggregatedValue(tmp);
+
+ assertEquals(-1L, max.getAggregatedValue().get()[0].get());
+ assertEquals(-1L, max.getAggregatedValue().get()[1].get());
+ }
+}