You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/03/21 23:49:44 UTC
git commit: GIRAPH-566: Make option for aggregators to be
configurable (majakabiljo)
Updated Branches:
refs/heads/trunk a802fef1c -> f4bd1996e
GIRAPH-566: Make option for aggregators to be configurable (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f4bd1996
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f4bd1996
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f4bd1996
Branch: refs/heads/trunk
Commit: f4bd1996e66fbe3f4daf8aca1e7be1e9cead5dee
Parents: a802fef
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Thu Mar 21 15:49:12 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Thu Mar 21 15:49:12 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 ++
.../java/org/apache/giraph/comm/ServerData.java | 4 ++--
.../giraph/comm/aggregators/AggregatorUtils.java | 15 +++++----------
.../comm/aggregators/AllAggregatorServerData.java | 12 +++++++++---
.../aggregators/OwnerAggregatorServerData.java | 10 ++++++++--
.../giraph/worker/WorkerAggregatorHandler.java | 3 ++-
6 files changed, 28 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3032873..a1446a7 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-566: Make option for aggregators to be configurable (majakabiljo)
+
GIRAPH-575: update hive-io (nitay)
GIRAPH-576: BspServiceMaster.failureCleanup() shouldn't pass null in
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index e6dff8c..70dc156 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -107,8 +107,8 @@ public class ServerData<I extends WritableComparable,
new SimplePartitionStore<I, V, E, M>(configuration, context);
}
edgeStore = new EdgeStore<I, V, E, M>(service, configuration, context);
- ownerAggregatorData = new OwnerAggregatorServerData(context);
- allAggregatorData = new AllAggregatorServerData(context);
+ ownerAggregatorData = new OwnerAggregatorServerData(context, configuration);
+ allAggregatorData = new AllAggregatorServerData(context, configuration);
}
public EdgeStore<I, V, E, M> getEdgeStore() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
index 0abd7e1..ceb30a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm.aggregators;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
@@ -78,19 +79,13 @@ public class AggregatorUtils {
* catch all exceptions.
*
* @param aggregatorClass Class of aggregator
+ * @param conf Configuration
* @return New aggregator
*/
public static Aggregator<Writable> newAggregatorInstance(
- Class<Aggregator<Writable>> aggregatorClass) {
- try {
- return aggregatorClass.newInstance();
- } catch (InstantiationException e) {
- throw new IllegalStateException("createAggregator: " +
- "InstantiationException for aggregator class " + aggregatorClass, e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("createAggregator: " +
- "IllegalAccessException for aggregator class " + aggregatorClass, e);
- }
+ Class<Aggregator<Writable>> aggregatorClass,
+ ImmutableClassesGiraphConfiguration conf) {
+ return ReflectionUtils.newInstance(aggregatorClass, conf);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
index f38c6cd..177e738 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
@@ -19,6 +19,7 @@
package org.apache.giraph.comm.aggregators;
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.utils.TaskIdsPermitsBarrier;
import org.apache.hadoop.io.Writable;
@@ -83,14 +84,19 @@ public class AllAggregatorServerData {
private final TaskIdsPermitsBarrier workersBarrier;
/** Progressable used to report progress */
private final Progressable progressable;
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor
*
* @param progressable Progressable used to report progress
+ * @param conf Configuration
*/
- public AllAggregatorServerData(Progressable progressable) {
+ public AllAggregatorServerData(Progressable progressable,
+ ImmutableClassesGiraphConfiguration conf) {
this.progressable = progressable;
+ this.conf = conf;
workersBarrier = new TaskIdsPermitsBarrier(progressable);
masterBarrier = new TaskIdsPermitsBarrier(progressable);
}
@@ -106,7 +112,7 @@ public class AllAggregatorServerData {
aggregatorClassMap.put(name, aggregatorClass);
if (!aggregatorTypesMap.containsKey(aggregatorClass)) {
aggregatorTypesMap.putIfAbsent(aggregatorClass,
- AggregatorUtils.newAggregatorInstance(aggregatorClass));
+ AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
}
progressable.progress();
}
@@ -225,7 +231,7 @@ public class AllAggregatorServerData {
currentAggregatorMap.get(entry.getKey());
if (aggregator == null) {
currentAggregatorMap.put(entry.getKey(),
- AggregatorUtils.newAggregatorInstance(entry.getValue()));
+ AggregatorUtils.newAggregatorInstance(entry.getValue(), conf));
} else {
aggregator.reset();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
index bd6068a..eb25a2e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
@@ -19,6 +19,7 @@
package org.apache.giraph.comm.aggregators;
import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.TaskIdsPermitsBarrier;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
@@ -72,14 +73,19 @@ public class OwnerAggregatorServerData {
private final TaskIdsPermitsBarrier workersBarrier;
/** Progressable used to report progress */
private final Progressable progressable;
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor
*
* @param progressable Progressable used to report progress
+ * @param conf Configuration
*/
- public OwnerAggregatorServerData(Progressable progressable) {
+ public OwnerAggregatorServerData(Progressable progressable,
+ ImmutableClassesGiraphConfiguration conf) {
this.progressable = progressable;
+ this.conf = conf;
workersBarrier = new TaskIdsPermitsBarrier(progressable);
}
@@ -95,7 +101,7 @@ public class OwnerAggregatorServerData {
LOG.debug("registerAggregator: The first registration after a reset()");
}
myAggregatorMap.putIfAbsent(name,
- AggregatorUtils.newAggregatorInstance(aggregatorClass));
+ AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
progressable.progress();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 3c18449..9a8a8b8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -290,7 +290,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
threadAggregatorMap.put(entry.getKey(),
AggregatorUtils.newAggregatorInstance(
- (Class<Aggregator<Writable>>) entry.getValue().getClass()));
+ (Class<Aggregator<Writable>>) entry.getValue().getClass(),
+ conf));
}
}