You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:27 UTC

[44/47] git commit: updated refs/heads/release-1.1 to 4c139ee

Fix using aggregators before aggregation

Summary:
If we register aggregator and immediatelly ask for aggregated value,
previous code was returning initial value, so we have to do the same.

Additionally - cleaning up errors/exceptions to be more understandable
(vs NullPointerEx for example)

Test Plan:
mvn install
AggregatorsBenchmark and ReducersBenchmark

Reviewers: majakabiljo, pavanka, sergey.edunov, maja.kabiljo

Differential Revision: https://reviews.facebook.net/D24951


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d32c429a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d32c429a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d32c429a

Branch: refs/heads/release-1.1
Commit: d32c429a1d475b322b3fe44738f0cc8f30a97b48
Parents: 7c61dcf
Author: Igor Kabiljo <ik...@fb.com>
Authored: Mon Oct 20 09:50:55 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Oct 20 09:54:37 2014 -0700

----------------------------------------------------------------------
 .../aggregators/ArrayAggregatorFactory.java     | 128 -------------------
 .../aggregators/ClassAggregatorFactory.java     |  72 -----------
 .../giraph/benchmark/ReducersBenchmark.java     |   3 +-
 .../comm/aggregators/AggregatorUtils.java       |  51 ++++++++
 .../requests/SendAggregatorsToOwnerRequest.java |   2 +-
 .../giraph/master/AggregatorBroadcast.java      |  75 +++++++++++
 .../master/AggregatorReduceOperation.java       |  66 +++++++---
 .../AggregatorToGlobalCommTranslation.java      | 113 +++++++++++-----
 .../apache/giraph/master/BspServiceMaster.java  |   2 +-
 .../giraph/master/MasterAggregatorHandler.java  |  29 ++++-
 .../giraph/master/MasterAggregatorUsage.java    |  16 ---
 .../org/apache/giraph/master/MasterCompute.java |   9 --
 .../giraph/reducers/OnSameReduceOperation.java  |   4 +-
 .../apache/giraph/reducers/ReduceOperation.java |  10 +-
 .../org/apache/giraph/reducers/Reducer.java     |  41 ++++--
 .../apache/giraph/utils/WritableFactory.java    |  28 ----
 .../org/apache/giraph/utils/WritableUtils.java  |   6 +-
 .../worker/WorkerAggregatorDelegator.java       |   4 +-
 .../giraph/worker/WorkerAggregatorHandler.java  |   8 +-
 .../giraph/worker/WorkerBroadcastUsage.java     |  33 +++++
 .../giraph/worker/WorkerGlobalCommUsage.java    |  17 +--
 .../apache/giraph/worker/WorkerReduceUsage.java |  30 +++++
 .../giraph/aggregators/TestArrayAggregator.java |  50 --------
 23 files changed, 396 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
deleted file mode 100644
index c977c57..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.aggregators;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Array;
-
-import org.apache.giraph.utils.ArrayWritable;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Generic array aggregator factory, used to aggregate elements
- * of ArrayWritable via passed element aggregator.
- *
- * @param <A> Type of individual element
- */
-public class ArrayAggregatorFactory<A extends Writable>
-    implements WritableFactory<Aggregator<ArrayWritable<A>>> {
-  /** number of elements in array */
-  private int n;
-  /** element aggregator class */
-  private WritableFactory<? extends Aggregator<A>> elementAggregatorFactory;
-
-  /**
-   * Constructor
-   * @param n Number of elements in array
-   * @param elementAggregatorClass Type of element aggregator
-   */
-  public ArrayAggregatorFactory(
-      int n, Class<? extends Aggregator<A>> elementAggregatorClass) {
-    this(n, new ClassAggregatorFactory<>(elementAggregatorClass));
-  }
-
-  /**
-   * Constructor
-   * @param n Number of elements in array
-   * @param elementAggregatorFactory Element aggregator factory
-   */
-  public ArrayAggregatorFactory(int n,
-      WritableFactory<? extends Aggregator<A>> elementAggregatorFactory) {
-    this.n = n;
-    this.elementAggregatorFactory = elementAggregatorFactory;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    n = in.readInt();
-    elementAggregatorFactory = WritableUtils.readWritableObject(in, null);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(n);
-    WritableUtils.writeWritableObject(elementAggregatorFactory, out);
-  }
-
-  @Override
-  public Aggregator<ArrayWritable<A>> create() {
-    return new ArrayAggregator<>(
-        n, elementAggregatorFactory.create());
-  }
-
-  /**
-   * Stateful aggregator that aggregates ArrayWritable by
-   * aggregating individual elements
-   *
-   * @param <A> Type of individual element
-   */
-  public static class ArrayAggregator<A extends Writable>
-      extends BasicAggregator<ArrayWritable<A>> {
-    /** number of elements in array */
-    private final int n;
-    /** element aggregator */
-    private final Aggregator<A> elementAggregator;
-
-    /**
-     * Constructor
-     * @param n Number of elements in array
-     * @param elementAggregator Element aggregator
-     */
-    public ArrayAggregator(int n, Aggregator<A> elementAggregator) {
-      super(null);
-      this.n = n;
-      this.elementAggregator = elementAggregator;
-      reset();
-    }
-
-    @Override
-    public void aggregate(ArrayWritable<A> other) {
-      A[] array = getAggregatedValue().get();
-      for (int i = 0; i < n; i++) {
-        elementAggregator.setAggregatedValue(array[i]);
-        elementAggregator.aggregate(other.get()[i]);
-        array[i] = elementAggregator.getAggregatedValue();
-      }
-    }
-
-    @Override
-    public ArrayWritable<A> createInitialValue() {
-      Class<A> elementClass =
-          (Class) elementAggregator.createInitialValue().getClass();
-      A[] array = (A[]) Array.newInstance(elementClass, n);
-      for (int i = 0; i < n; i++) {
-        array[i] = elementAggregator.createInitialValue();
-      }
-      return new ArrayWritable<>(elementClass, array);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
deleted file mode 100644
index a022480..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.aggregators;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Aggregator factory based on aggregatorClass.
- *
- * @param <T> Aggregated value type
- */
-public class ClassAggregatorFactory<T extends Writable>
-    implements WritableFactory<Aggregator<T>> {
-  /** Aggregator class */
-  private Class<? extends Aggregator<T>> aggregatorClass;
-
-  /** Constructor */
-  public ClassAggregatorFactory() {
-  }
-
-  /**
-   * Constructor
-   * @param aggregatorClass Aggregator class
-   */
-  public ClassAggregatorFactory(
-      Class<? extends Aggregator<T>> aggregatorClass) {
-    Preconditions.checkNotNull(aggregatorClass,
-        "aggregatorClass cannot be null in ClassAggregatorFactory");
-    this.aggregatorClass = aggregatorClass;
-  }
-
-  @Override
-  public Aggregator<T> create() {
-    return ReflectionUtils.newInstance(aggregatorClass, null);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    aggregatorClass = WritableUtils.readClass(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    Preconditions.checkNotNull(aggregatorClass,
-        "aggregatorClass cannot be null in ClassAggregatorFactory");
-    WritableUtils.writeClass(aggregatorClass, out);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
index ce5c96e..263274d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
@@ -64,9 +64,10 @@ public class ReducersBenchmark extends GiraphBenchmark {
     }
 
     @Override
-    public void reduceSingle(
+    public LongWritable reduceSingle(
         LongWritable curValue, LongWritable valueToReduce) {
       curValue.set(curValue.get() + valueToReduce.get());
+      return curValue;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
index ecb3a6b..dc0ceed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
@@ -99,4 +99,55 @@ public class AggregatorUtils {
     }
     return message;
   }
+
+  /**
+   * Get the warning message about usage of unregistered reducer to be
+   * printed to user. If user didn't register any reducers also provide
+   * the explanation on how to do so.
+   *
+   * @param reducerName The name of the aggregator which user tried to
+   *                       access
+   * @param hasRegisteredReducers True iff user registered some aggregators
+   * @param conf Giraph configuration
+   * @return Warning message
+   */
+  public static String getUnregisteredReducerMessage(
+      String reducerName, boolean hasRegisteredReducers,
+      ImmutableClassesGiraphConfiguration conf) {
+    String message = "Tried to access reducer which wasn't registered " +
+        reducerName;
+    if (!hasRegisteredReducers) {
+      message = message + "; Aggregators can be registered from " +
+          "MasterCompute by calling registerReducer function. " +
+          "Also be sure that you are correctly setting MasterCompute class, " +
+          "currently using " + conf.getMasterComputeClass().getName();
+    }
+    return message;
+  }
+
+  /**
+   * Get the warning message when user tries to access broadcast, without
+   * previously setting it, to be printed to user.
+   * If user didn't broadcast any value also provide
+   * the explanation on how to do so.
+   *
+   * @param broadcastName The name of the broadcast which user tried to
+   *                       access
+   * @param hasBroadcasted True iff user has broadcasted value before
+   * @param conf Giraph configuration
+   * @return Warning message
+   */
+  public static String getUnregisteredBroadcastMessage(
+      String broadcastName, boolean hasBroadcasted,
+      ImmutableClassesGiraphConfiguration conf) {
+    String message = "Tried to access broadcast which wasn't set before " +
+        broadcastName;
+    if (!hasBroadcasted) {
+      message = message + "; Values can be broadcasted from " +
+          "MasterCompute by calling broadcast function. " +
+          "Also be sure that you are correctly setting MasterCompute class, " +
+          "currently using " + conf.getMasterComputeClass().getName();
+    }
+    return message;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
index 2d5cc51..8f168a2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
@@ -77,7 +77,7 @@ public class SendAggregatorsToOwnerRequest
           if (type == GlobalCommType.REDUCE_OPERATIONS) {
             ReduceOperation<Object, Writable> reduceOpCopy =
                 (ReduceOperation<Object, Writable>)
-                WritableUtils.createCopy(reusedOut, reusedIn, value);
+                WritableUtils.createCopy(reusedOut, reusedIn, value, conf);
 
             serverData.getOwnerAggregatorData().registerReducer(
                 name, reduceOpCopy);

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java
new file mode 100644
index 0000000..81ea654
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Writable representation of aggregated value
+ *
+ * @param <A> Aggregation object type
+ */
+public class AggregatorBroadcast<A extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable
+  implements Writable {
+  /** Aggregator class */
+  private Class<? extends Aggregator<A>> aggregatorClass;
+  /** Aggregated value */
+  private A value;
+
+  /** Constructor */
+  public AggregatorBroadcast() {
+  }
+
+  /**
+   * Constructor
+   * @param aggregatorClass Aggregator class
+   * @param value Aggregated value
+   */
+  public AggregatorBroadcast(
+      Class<? extends Aggregator<A>> aggregatorClass, A value) {
+    this.aggregatorClass = aggregatorClass;
+    this.value = value;
+  }
+
+  public A getValue() {
+    return value;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeClass(aggregatorClass, out);
+    value.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    aggregatorClass = WritableUtils.readClass(in);
+    value = ReflectionUtils.newInstance(aggregatorClass, getConf())
+        .createInitialValue();
+    value.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
index 1673f6d..54d421b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
@@ -22,8 +22,10 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.reducers.OnSameReduceOperation;
-import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 
@@ -33,11 +35,13 @@ import org.apache.hadoop.io.Writable;
  * @param <A> Aggregation object type
  */
 public class AggregatorReduceOperation<A extends Writable>
-    extends OnSameReduceOperation<A> {
-  /** Aggregator factory */
-  private WritableFactory<? extends Aggregator<A>> aggregatorFactory;
+    extends OnSameReduceOperation<A> implements GiraphConfigurationSettable {
+  /** Aggregator class */
+  private Class<? extends Aggregator<A>> aggregatorClass;
   /** Aggregator */
   private Aggregator<A> aggregator;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
 
   /** Constructor */
   public AggregatorReduceOperation() {
@@ -45,18 +49,32 @@ public class AggregatorReduceOperation<A extends Writable>
 
   /**
    * Constructor
-   * @param aggregatorFactory Aggregator factory
+   * @param aggregatorClass Aggregator class
+   * @param conf Configuration
    */
   public AggregatorReduceOperation(
-      WritableFactory<? extends Aggregator<A>> aggregatorFactory) {
-    this.aggregatorFactory = aggregatorFactory;
-    this.aggregator = aggregatorFactory.create();
-    this.aggregator.setAggregatedValue(null);
+      Class<? extends Aggregator<A>> aggregatorClass,
+      ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
+    this.aggregatorClass = aggregatorClass;
+    this.conf = conf;
+    initAggregator();
+  }
+
+  /** Initialize aggregator */
+  private void initAggregator() {
+    aggregator = ReflectionUtils.newInstance(aggregatorClass, conf);
+    aggregator.setAggregatedValue(null);
   }
 
   @Override
   public A createInitialValue() {
-    return aggregator.createInitialValue();
+    A agg = aggregator.createInitialValue();
+    if (agg == null) {
+      throw new IllegalStateException(
+          "Aggregators initial value must not be null, but is for " +
+          aggregator);
+    }
+    return agg;
   }
 
   /**
@@ -64,29 +82,37 @@ public class AggregatorReduceOperation<A extends Writable>
    * @return copy
    */
   public AggregatorReduceOperation<A> createCopy() {
-    return new AggregatorReduceOperation<>(aggregatorFactory);
+    return new AggregatorReduceOperation<>(aggregatorClass, conf);
+  }
+
+  public Class<? extends Aggregator<A>> getAggregatorClass() {
+    return aggregatorClass;
   }
 
   @Override
-  public synchronized void reduceSingle(A curValue, A valueToReduce) {
+  public synchronized A reduceSingle(A curValue, A valueToReduce) {
     aggregator.setAggregatedValue(curValue);
     aggregator.aggregate(valueToReduce);
-    if (curValue != aggregator.getAggregatedValue()) {
-      throw new IllegalStateException(
-          "Aggregator " + aggregator + " aggregates by creating new value");
-    }
+    A aggregated = aggregator.getAggregatedValue();
     aggregator.setAggregatedValue(null);
+    return aggregated;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    WritableUtils.writeWritableObject(aggregatorFactory, out);
+    WritableUtils.writeClass(aggregatorClass, out);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    aggregatorFactory = WritableUtils.readWritableObject(in, null);
-    aggregator = aggregatorFactory.create();
-    this.aggregator.setAggregatedValue(null);
+    aggregatorClass = WritableUtils.readClass(in);
+    initAggregator();
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
index 7492fc7..36a4553 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
@@ -24,10 +24,10 @@ import java.util.HashMap;
 import java.util.Map.Entry;
 
 import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.aggregators.ClassAggregatorFactory;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
 
 import com.google.common.base.Preconditions;
 
@@ -36,8 +36,11 @@ import com.google.common.base.Preconditions;
  * reduce and broadcast operations supported by the MasterAggregatorHandler.
  */
 public class AggregatorToGlobalCommTranslation
-    extends DefaultImmutableClassesGiraphConfigurable
     implements MasterAggregatorUsage, Writable {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(AggregatorToGlobalCommTranslation.class);
+
   /** Class providing reduce and broadcast interface to use */
   private final MasterGlobalCommUsage globalComm;
   /** List of registered aggregators */
@@ -45,21 +48,64 @@ public class AggregatorToGlobalCommTranslation
   registeredAggregators = new HashMap<>();
 
   /**
+   * List of init aggregator values, in case someone tries to
+   * access aggregator immediatelly after registering it.
+   *
+   * Instead of simply returning value, we need to store it during
+   * that superstep, so consecutive calls will return identical object,
+   * which they can modify.
+   */
+  private final HashMap<String, Writable>
+  initAggregatorValues = new HashMap<>();
+
+  /** Conf */
+  private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
+
+  /**
    * Constructor
+   * @param conf Configuration
    * @param globalComm Global communication interface
    */
-  public AggregatorToGlobalCommTranslation(MasterGlobalCommUsage globalComm) {
+  public AggregatorToGlobalCommTranslation(
+      ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
+      MasterGlobalCommUsage globalComm) {
+    this.conf = conf;
     this.globalComm = globalComm;
   }
 
   @Override
   public <A extends Writable> A getAggregatedValue(String name) {
-    return globalComm.getReduced(name);
+    AggregatorWrapper<Writable> agg = registeredAggregators.get(name);
+    if (agg == null) {
+      LOG.warn("getAggregatedValue: " +
+        AggregatorUtils.getUnregisteredAggregatorMessage(name,
+            registeredAggregators.size() != 0, conf));
+      // to make sure we are not accessing reducer of the same name.
+      return null;
+    }
+
+    A value = globalComm.getReduced(name);
+    if (value == null) {
+      value = (A) initAggregatorValues.get(name);
+    }
+
+    if (value == null) {
+      value = (A) agg.getReduceOp().createInitialValue();
+      initAggregatorValues.put(name, value);
+    }
+
+    Preconditions.checkState(value != null);
+    return value;
   }
 
   @Override
   public <A extends Writable> void setAggregatedValue(String name, A value) {
     AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name);
+    if (aggregator == null) {
+      throw new IllegalArgumentException("setAggregatedValue: "  +
+          AggregatorUtils.getUnregisteredAggregatorMessage(name,
+              registeredAggregators.size() != 0, conf));
+    }
     aggregator.setCurrentValue(value);
   }
 
@@ -72,14 +118,15 @@ public class AggregatorToGlobalCommTranslation
     // register reduce with the same value
     for (Entry<String, AggregatorWrapper<Writable>> entry :
         registeredAggregators.entrySet()) {
-      Writable value = entry.getValue().currentValue != null ?
-          entry.getValue().getCurrentValue() :
-            globalComm.getReduced(entry.getKey());
+      Writable value = entry.getValue().getCurrentValue();
       if (value == null) {
-        value = entry.getValue().getReduceOp().createInitialValue();
+        value = globalComm.getReduced(entry.getKey());
       }
+      Preconditions.checkState(value != null);
+
+      globalComm.broadcast(entry.getKey(), new AggregatorBroadcast<>(
+          entry.getValue().getReduceOp().getAggregatorClass(), value));
 
-      globalComm.broadcast(entry.getKey(), value);
       // Always register clean instance of reduceOp, not to conflict with
       // reduceOp from previous superstep.
       AggregatorReduceOperation<Writable> cleanReduceOp =
@@ -93,31 +140,21 @@ public class AggregatorToGlobalCommTranslation
       }
       entry.getValue().setCurrentValue(null);
     }
+    initAggregatorValues.clear();
   }
 
   @Override
   public <A extends Writable> boolean registerAggregator(String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
-    ClassAggregatorFactory<A> aggregatorFactory =
-        new ClassAggregatorFactory<A>(aggregatorClass);
-    return registerAggregator(name, aggregatorFactory, false) != null;
-  }
-
-  @Override
-  public <A extends Writable> boolean registerAggregator(String name,
-      WritableFactory<? extends Aggregator<A>> aggregator) throws
-      InstantiationException, IllegalAccessException {
-    return registerAggregator(name, aggregator, false) != null;
+    return registerAggregator(name, aggregatorClass, false) != null;
   }
 
   @Override
   public <A extends Writable> boolean registerPersistentAggregator(String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
-    ClassAggregatorFactory<A> aggregatorFactory =
-        new ClassAggregatorFactory<A>(aggregatorClass);
-    return registerAggregator(name, aggregatorFactory, true) != null;
+    return registerAggregator(name, aggregatorClass, true) != null;
   }
 
   @Override
@@ -140,27 +177,35 @@ public class AggregatorToGlobalCommTranslation
       agg.readFields(in);
       registeredAggregators.put(name, agg);
     }
+    initAggregatorValues.clear();
   }
 
   /**
    * Helper function for registering aggregators.
    *
-   * @param name              Name of the aggregator
-   * @param aggregatorFactory Aggregator factory
-   * @param persistent        Whether aggregator is persistent or not
-   * @param <A>               Aggregated value type
+   * @param name            Name of the aggregator
+   * @param aggregatorClass Aggregator class
+   * @param persistent      Whether aggregator is persistent or not
+   * @param <A>             Aggregated value type
    * @return Newly registered aggregator or aggregator which was previously
    *         created with selected name, if any
    */
   private <A extends Writable> AggregatorWrapper<A> registerAggregator
-  (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+  (String name, Class<? extends Aggregator<A>> aggregatorClass,
       boolean persistent) throws InstantiationException,
       IllegalAccessException {
     AggregatorWrapper<A> aggregatorWrapper =
         (AggregatorWrapper<A>) registeredAggregators.get(name);
     if (aggregatorWrapper == null) {
       aggregatorWrapper =
-          new AggregatorWrapper<A>(aggregatorFactory, persistent);
+          new AggregatorWrapper<A>(aggregatorClass, persistent);
+      // postMasterCompute uses previously reduced value to broadcast,
+      // unless current value is set. After aggregator is registered,
+      // there was no previously reduced value, so set current value
+      // to default to avoid calling getReduced() on unregistered reducer.
+      // (which logs unnecessary warnings)
+      aggregatorWrapper.setCurrentValue(
+          aggregatorWrapper.getReduceOp().createInitialValue());
       registeredAggregators.put(
           name, (AggregatorWrapper<Writable>) aggregatorWrapper);
     }
@@ -171,7 +216,7 @@ public class AggregatorToGlobalCommTranslation
    * Object holding all needed data related to single Aggregator
    * @param <A> Aggregated value type
    */
-  private static class AggregatorWrapper<A extends Writable>
+  private class AggregatorWrapper<A extends Writable>
       implements Writable {
     /** False iff aggregator should be reset at the end of each super step */
     private boolean persistent;
@@ -186,14 +231,14 @@ public class AggregatorToGlobalCommTranslation
 
     /**
      * Constructor
-     * @param aggregatorFactory Aggregator factory
+     * @param aggregatorClass Aggregator class
      * @param persistent Is persistent
      */
     public AggregatorWrapper(
-        WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+        Class<? extends Aggregator<A>> aggregatorClass,
         boolean persistent) {
       this.persistent = persistent;
-      this.reduceOp = new AggregatorReduceOperation<>(aggregatorFactory);
+      this.reduceOp = new AggregatorReduceOperation<>(aggregatorClass, conf);
     }
 
     public AggregatorReduceOperation<A> getReduceOp() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index ab1289d..af7e5fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -895,7 +895,7 @@ public class BspServiceMaster<I extends WritableComparable,
           globalCommHandler = new MasterAggregatorHandler(
               getConfiguration(), getContext());
           aggregatorTranslation = new AggregatorToGlobalCommTranslation(
-              globalCommHandler);
+              getConfiguration(), globalCommHandler);
 
           globalCommHandler.initialize(this);
           masterCompute = getConfiguration().createMasterCompute();

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 5f7bd73..ccee656 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -28,6 +28,7 @@ import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.SuperstepState;
 import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.reducers.ReduceOperation;
 import org.apache.giraph.reducers.Reducer;
@@ -61,6 +62,9 @@ public class MasterAggregatorHandler
   /** Progressable used to report progress */
   private final Progressable progressable;
 
+  /** Conf */
+  private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
+
   /**
    * Constructor
    *
@@ -71,6 +75,7 @@ public class MasterAggregatorHandler
       ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
       Progressable progressable) {
     this.progressable = progressable;
+    this.conf = conf;
     aggregatorWriter = conf.createAggregatorWriter();
   }
 
@@ -86,10 +91,18 @@ public class MasterAggregatorHandler
       R globalInitialValue) {
     if (reducerMap.containsKey(name)) {
       throw new IllegalArgumentException(
-          "Reducer with name " + name + " was already registered");
+          "Reducer with name " + name + " was already registered, " +
+          " and is " + reducerMap.get(name) + ", and we are trying to " +
+          " register " + reduceOp);
     }
     if (reduceOp == null) {
-      throw new IllegalArgumentException("null reduce cannot be registered");
+      throw new IllegalArgumentException(
+          "null reducer cannot be registered, with name " + name);
+    }
+    if (globalInitialValue == null) {
+      throw new IllegalArgumentException(
+          "global initial value for reducer cannot be null, but is for " +
+          reduceOp + " with naem" + name);
     }
 
     Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
@@ -98,7 +111,13 @@ public class MasterAggregatorHandler
 
   @Override
   public <T extends Writable> T getReduced(String name) {
-    return (T) reducedMap.get(name);
+    T value = (T) reducedMap.get(name);
+    if (value == null) {
+      LOG.warn("getReduced: " +
+        AggregatorUtils.getUnregisteredReducerMessage(name,
+            reducedMap.size() != 0, conf));
+    }
+    return value;
   }
 
   @Override
@@ -310,14 +329,14 @@ public class MasterAggregatorHandler
     for (int i = 0; i < numReducers; i++) {
       String name = in.readUTF();
       Reducer<Object, Writable> reducer = new Reducer<>();
-      reducer.readFields(in);
+      reducer.readFields(in, conf);
       reducerMap.put(name, reducer);
     }
 
     int numBroadcast = in.readInt();
     for (int i = 0; i < numBroadcast; i++) {
       String name = in.readUTF();
-      Writable value = WritableUtils.readWritableObject(in, null);
+      Writable value = WritableUtils.readWritableObject(in, conf);
       broadcastMap.put(name, value);
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
index 91f5d24..cadae67 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
@@ -20,7 +20,6 @@ package org.apache.giraph.master;
 
 import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.aggregators.AggregatorUsage;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -41,21 +40,6 @@ public interface MasterAggregatorUsage extends AggregatorUsage {
       InstantiationException, IllegalAccessException;
 
   /**
-   * Register an aggregator in preSuperstep() and/or preApplication(). This
-   * aggregator will have its value reset at the end of each super step.
-   *
-   * Aggregator should either implement Writable, or have no-arg constructor.
-   *
-   * @param name of aggregator
-   * @param aggregatorFactory aggregator factory
-   * @param <A> Aggregator type
-   * @return True iff aggregator wasn't already registered
-   */
-  <A extends Writable> boolean registerAggregator(String name,
-      WritableFactory<? extends Aggregator<A>> aggregatorFactory) throws
-      InstantiationException, IllegalAccessException;
-
-  /**
    * Register persistent aggregator in preSuperstep() and/or
    * preApplication(). This aggregator will not reset value at the end of
    * super step.

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 72e4d0a..68eb416 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -25,7 +25,6 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.reducers.ReduceOperation;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
 
@@ -222,14 +221,6 @@ public abstract class MasterCompute
   }
 
   @Override
-  public final <A extends Writable> boolean registerAggregator(
-    String name, WritableFactory<? extends Aggregator<A>> aggregator)
-    throws InstantiationException, IllegalAccessException {
-    return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
-        name, aggregator);
-  }
-
-  @Override
   public final <A extends Writable> boolean registerPersistentAggregator(
       String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
index a675f4d..cb9f6e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable;
 public abstract class OnSameReduceOperation<R extends Writable>
     implements ReduceOperation<R, R> {
   @Override
-  public final void reducePartial(R curValue, R valueToReduce) {
-    reduceSingle(curValue, valueToReduce);
+  public final R reducePartial(R curValue, R valueToReduce) {
+    return reduceSingle(curValue, valueToReduce);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
index 434e21a..adbc4d8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
@@ -43,15 +43,21 @@ public interface ReduceOperation<S, R extends Writable> extends Writable {
    * Add a new value.
    * Needs to be commutative and associative
    *
+   * Commonly, returned value should be same as curValue argument.
+   *
    * @param curValue Partial value into which to reduce and store the result
    * @param valueToReduce Single value to be reduced
+   * @return reduced value
    */
-  void reduceSingle(R curValue, S valueToReduce);
+  R reduceSingle(R curValue, S valueToReduce);
   /**
    * Add partially reduced value to current partially reduced value.
    *
+   * Commonly, returned value should be same as curValue argument.
+   *
    * @param curValue Partial value into which to reduce and store the result
    * @param valueToReduce Partial value to be reduced
+   * @return reduced value
    */
-  void reducePartial(R curValue, R valueToReduce);
+  R reducePartial(R curValue, R valueToReduce);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
index 9f821b4..6759276 100644
--- a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 
@@ -32,7 +33,7 @@ import org.apache.hadoop.io.Writable;
  * @param <S> Single value type, objects passed on workers
  * @param <R> Reduced value type
  */
-public class Reducer<S, R extends Writable> implements Writable {
+public class Reducer<S, R extends Writable> {
   /** Reduce operations */
   private ReduceOperation<S, R> reduceOp;
   /** Current (partially) reduced value*/
@@ -49,7 +50,7 @@ public class Reducer<S, R extends Writable> implements Writable {
    */
   public Reducer(ReduceOperation<S, R> reduceOp) {
     this.reduceOp = reduceOp;
-    this.currentValue = reduceOp.createInitialValue();
+    this.currentValue = createInitialValue();
   }
   /**
    * Constructor
@@ -66,21 +67,26 @@ public class Reducer<S, R extends Writable> implements Writable {
    * @param valueToReduce Single value to reduce
    */
   public void reduceSingle(S valueToReduce) {
-    reduceOp.reduceSingle(currentValue, valueToReduce);
+    currentValue = reduceOp.reduceSingle(currentValue, valueToReduce);
   }
   /**
    * Reduce given partially reduced value into current reduced value.
    * @param valueToReduce Partial value to reduce
    */
   public void reducePartial(R valueToReduce) {
-    reduceOp.reducePartial(currentValue, valueToReduce);
+    currentValue = reduceOp.reducePartial(currentValue, valueToReduce);
   }
   /**
    * Return new initial reduced value.
    * @return New initial reduced value
    */
   public R createInitialValue() {
-    return reduceOp.createInitialValue();
+    R value = reduceOp.createInitialValue();
+    if (value == null) {
+      throw new IllegalStateException(
+          "Initial value for reducer cannot be null, but is for " + reduceOp);
+    }
+    return value;
   }
 
   public ReduceOperation<S, R> getReduceOp() {
@@ -95,16 +101,31 @@ public class Reducer<S, R extends Writable> implements Writable {
     this.currentValue = currentValue;
   }
 
-  @Override
+  /**
+   * Serialize the fields of this object to <code>out</code>.
+   *
+   * @param out <code>DataOuput</code> to serialize this object into.
+   * @throws IOException
+   */
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeWritableObject(reduceOp, out);
     currentValue.write(out);
   }
 
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    reduceOp = WritableUtils.readWritableObject(in, null);
-    currentValue = reduceOp.createInitialValue();
+  /**
+   * Deserialize the fields of this object from <code>in</code>.
+   *
+   * <p>For efficiency, implementations should attempt to re-use storage in the
+   * existing object where possible.</p>
+   *
+   * @param in <code>DataInput</code> to deseriablize this object from.
+   * @param conf Configuration
+   * @throws IOException
+   */
+  public void readFields(DataInput in,
+      ImmutableClassesGiraphConfiguration conf) throws IOException {
+    reduceOp = WritableUtils.readWritableObject(in, conf);
+    currentValue = createInitialValue();
     currentValue.readFields(in);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
deleted file mode 100644
index 43bed7e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Factory that can be serialized.
- * @param <T> Type of object factory creates
- */
-public interface WritableFactory<T> extends Writable, Factory<T> {
-
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 923d369..8c24697 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -745,13 +745,15 @@ public class WritableUtils {
    * @param reusableOut Reusable output stream to serialize into
    * @param reusableIn Reusable input stream to deserialize out of
    * @param original Original value of which to make a copy
+   * @param conf Configuration
    * @param <T> Type of the object
    * @return Copy of the original value
    */
   public static <T extends Writable> T createCopy(
       UnsafeByteArrayOutputStream reusableOut,
-      UnsafeReusableByteArrayInput reusableIn, T original) {
-    T copy = (T) createWritable(original.getClass(), null);
+      UnsafeReusableByteArrayInput reusableIn, T original,
+      ImmutableClassesGiraphConfiguration conf) {
+    T copy = (T) createWritable(original.getClass(), conf);
 
     try {
       reusableOut.reset();

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
index 5238a07..916e7a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.master.AggregatorBroadcast;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -64,6 +65,7 @@ public abstract class WorkerAggregatorDelegator<I extends WritableComparable,
 
   @Override
   public final <A extends Writable> A getAggregatedValue(String name) {
-    return this.<A>getBroadcast(name);
+    AggregatorBroadcast<A> broadcast = workerGlobalCommUsage.getBroadcast(name);
+    return broadcast.getValue();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 05a13a7..ee47542 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -87,7 +87,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
     B value = (B) broadcastedMap.get(name);
     if (value == null) {
       LOG.warn("getBroadcast: " +
-          AggregatorUtils.getUnregisteredAggregatorMessage(name,
+          AggregatorUtils.getUnregisteredBroadcastMessage(name,
               broadcastedMap.size() != 0, conf));
     }
     return value;
@@ -103,7 +103,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
       }
     } else {
       throw new IllegalStateException("reduce: " +
-          AggregatorUtils.getUnregisteredAggregatorMessage(name,
+          AggregatorUtils.getUnregisteredReducerMessage(name,
               reducerMap.size() != 0, conf));
     }
   }
@@ -122,7 +122,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
       }
     } else {
       throw new IllegalStateException("reduce: " +
-          AggregatorUtils.getUnregisteredAggregatorMessage(name,
+          AggregatorUtils.getUnregisteredReducerMessage(name,
               reducerMap.size() != 0, conf));
     }
   }
@@ -309,7 +309,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
             entry.getValue().getReduceOp();
 
         ReduceOperation<Object, Writable> threadLocalCopy =
-            WritableUtils.createCopy(out, in, globalReduceOp);
+            WritableUtils.createCopy(out, in, globalReduceOp, conf);
 
         threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy));
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java
new file mode 100644
index 0000000..9b4e160
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Methods on worker can access broadcasted values through this interface
+ */
+public interface WorkerBroadcastUsage {
+  /**
+   * Get value broadcasted from master
+   * @param name Name of the broadcasted value
+   * @return Broadcasted value
+   * @param <B> Broadcast value type
+   */
+  <B extends Writable> B getBroadcast(String name);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
index 39566f5..fa31bc2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
@@ -17,24 +17,11 @@
  */
 package org.apache.giraph.worker;
 
-import org.apache.hadoop.io.Writable;
 
 /**
  * Methods on worker can access broadcasted values and provide
  * values to reduce through this interface
  */
-public interface WorkerGlobalCommUsage {
-  /**
-   * Reduce given value.
-   * @param name Name of the reducer
-   * @param value Single value to reduce
-   */
-  void reduce(String name, Object value);
-  /**
-   * Get value broadcasted from master
-   * @param name Name of the broadcasted value
-   * @return Broadcasted value
-   * @param <B> Broadcast value type
-   */
-  <B extends Writable> B getBroadcast(String name);
+public interface WorkerGlobalCommUsage
+    extends WorkerBroadcastUsage, WorkerReduceUsage {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
new file mode 100644
index 0000000..9c2e90d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+/**
+ * Methods on worker can provide values to reduce through this interface
+ */
+public interface WorkerReduceUsage {
+  /**
+   * Reduce given value.
+   * @param name Name of the reducer
+   * @param value Single value to reduce
+   */
+  void reduce(String name, Object value);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
deleted file mode 100644
index 2898647..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.aggregators;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.giraph.utils.ArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.junit.Test;
-
-public class TestArrayAggregator {
-  @Test
-  public void testMaxAggregator() {
-    Aggregator<ArrayWritable<LongWritable>> max = new ArrayAggregatorFactory<>(2, LongMaxAggregator.class).create();
-
-    ArrayWritable<LongWritable> tmp = max.createInitialValue();
-
-    tmp.get()[0].set(2);
-    max.aggregate(tmp);
-
-    tmp.get()[0].set(3);
-    tmp.get()[1].set(1);
-    max.aggregate(tmp);
-
-    assertEquals(3L, max.getAggregatedValue().get()[0].get());
-    assertEquals(1L, max.getAggregatedValue().get()[1].get());
-
-    tmp.get()[0].set(-1);
-    tmp.get()[1].set(-1);
-    max.setAggregatedValue(tmp);
-
-    assertEquals(-1L, max.getAggregatedValue().get()[0].get());
-    assertEquals(-1L, max.getAggregatedValue().get()[1].get());
-  }
-}