You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/03/21 23:49:44 UTC

git commit: GIRAPH-566: Make option for aggregators to be configurable (majakabiljo)

Updated Branches:
  refs/heads/trunk a802fef1c -> f4bd1996e


GIRAPH-566: Make option for aggregators to be configurable (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f4bd1996
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f4bd1996
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f4bd1996

Branch: refs/heads/trunk
Commit: f4bd1996e66fbe3f4daf8aca1e7be1e9cead5dee
Parents: a802fef
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Thu Mar 21 15:49:12 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Thu Mar 21 15:49:12 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 ++
 .../java/org/apache/giraph/comm/ServerData.java    |    4 ++--
 .../giraph/comm/aggregators/AggregatorUtils.java   |   15 +++++----------
 .../comm/aggregators/AllAggregatorServerData.java  |   12 +++++++++---
 .../aggregators/OwnerAggregatorServerData.java     |   10 ++++++++--
 .../giraph/worker/WorkerAggregatorHandler.java     |    3 ++-
 6 files changed, 28 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3032873..a1446a7 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-566: Make option for aggregators to be configurable (majakabiljo)
+
   GIRAPH-575: update hive-io (nitay)
 
   GIRAPH-576: BspServiceMaster.failureCleanup() shouldn't pass null in

http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index e6dff8c..70dc156 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -107,8 +107,8 @@ public class ServerData<I extends WritableComparable,
           new SimplePartitionStore<I, V, E, M>(configuration, context);
     }
     edgeStore = new EdgeStore<I, V, E, M>(service, configuration, context);
-    ownerAggregatorData = new OwnerAggregatorServerData(context);
-    allAggregatorData = new AllAggregatorServerData(context);
+    ownerAggregatorData = new OwnerAggregatorServerData(context, configuration);
+    allAggregatorData = new AllAggregatorServerData(context, configuration);
   }
 
   public EdgeStore<I, V, E, M> getEdgeStore() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
index 0abd7e1..ceb30a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm.aggregators;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 
@@ -78,19 +79,13 @@ public class AggregatorUtils {
    * catch all exceptions.
    *
    * @param aggregatorClass Class of aggregator
+   * @param conf Configuration
    * @return New aggregator
    */
   public static Aggregator<Writable> newAggregatorInstance(
-      Class<Aggregator<Writable>> aggregatorClass) {
-    try {
-      return aggregatorClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("createAggregator: " +
-          "InstantiationException for aggregator class " + aggregatorClass, e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("createAggregator: " +
-          "IllegalAccessException for aggregator class " + aggregatorClass, e);
-    }
+      Class<Aggregator<Writable>> aggregatorClass,
+      ImmutableClassesGiraphConfiguration conf) {
+    return ReflectionUtils.newInstance(aggregatorClass, conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
index f38c6cd..177e738 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.comm.aggregators;
 
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
 import org.apache.hadoop.io.Writable;
@@ -83,14 +84,19 @@ public class AllAggregatorServerData {
   private final TaskIdsPermitsBarrier workersBarrier;
   /** Progressable used to report progress */
   private final Progressable progressable;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor
    *
    * @param progressable Progressable used to report progress
+   * @param conf Configuration
    */
-  public AllAggregatorServerData(Progressable progressable) {
+  public AllAggregatorServerData(Progressable progressable,
+      ImmutableClassesGiraphConfiguration conf) {
     this.progressable = progressable;
+    this.conf = conf;
     workersBarrier = new TaskIdsPermitsBarrier(progressable);
     masterBarrier = new TaskIdsPermitsBarrier(progressable);
   }
@@ -106,7 +112,7 @@ public class AllAggregatorServerData {
     aggregatorClassMap.put(name, aggregatorClass);
     if (!aggregatorTypesMap.containsKey(aggregatorClass)) {
       aggregatorTypesMap.putIfAbsent(aggregatorClass,
-          AggregatorUtils.newAggregatorInstance(aggregatorClass));
+          AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
     }
     progressable.progress();
   }
@@ -225,7 +231,7 @@ public class AllAggregatorServerData {
           currentAggregatorMap.get(entry.getKey());
       if (aggregator == null) {
         currentAggregatorMap.put(entry.getKey(),
-            AggregatorUtils.newAggregatorInstance(entry.getValue()));
+            AggregatorUtils.newAggregatorInstance(entry.getValue(), conf));
       } else {
         aggregator.reset();
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
index bd6068a..eb25a2e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.comm.aggregators;
 
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
@@ -72,14 +73,19 @@ public class OwnerAggregatorServerData {
   private final TaskIdsPermitsBarrier workersBarrier;
   /** Progressable used to report progress */
   private final Progressable progressable;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor
    *
    * @param progressable Progressable used to report progress
+   * @param conf         Configuration
    */
-  public OwnerAggregatorServerData(Progressable progressable) {
+  public OwnerAggregatorServerData(Progressable progressable,
+      ImmutableClassesGiraphConfiguration conf) {
     this.progressable = progressable;
+    this.conf = conf;
     workersBarrier = new TaskIdsPermitsBarrier(progressable);
   }
 
@@ -95,7 +101,7 @@ public class OwnerAggregatorServerData {
       LOG.debug("registerAggregator: The first registration after a reset()");
     }
     myAggregatorMap.putIfAbsent(name,
-        AggregatorUtils.newAggregatorInstance(aggregatorClass));
+        AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
     progressable.progress();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f4bd1996/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 3c18449..9a8a8b8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -290,7 +290,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
           WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
         threadAggregatorMap.put(entry.getKey(),
             AggregatorUtils.newAggregatorInstance(
-                (Class<Aggregator<Writable>>) entry.getValue().getClass()));
+                (Class<Aggregator<Writable>>) entry.getValue().getClass(),
+                conf));
       }
     }