You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/10/26 03:00:55 UTC

svn commit: r1402363 [2/2] - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/benchmark/ giraph/src/main/java/org/apache/giraph/bsp/ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/aggregators/ giraph...

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java?rev=1402363&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java Fri Oct 26 01:00:54 2012
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.graph.Aggregator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * Request to send final aggregatd values from master to worker which owns
+ * the aggregators
+ */
+public class SendAggregatorsToOwnerRequest extends ByteArrayRequest
+    implements WorkerRequest {
+
+  /**
+   * Constructor
+   *
+   * @param data Serialized aggregator data
+   */
+  public SendAggregatorsToOwnerRequest(byte[] data) {
+    super(data);
+  }
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendAggregatorsToOwnerRequest() {
+  }
+
+  @Override
+  public void doRequest(ServerData serverData) {
+    DataInput input = getDataInput();
+    AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
+    try {
+      int numAggregators = input.readInt();
+      for (int i = 0; i < numAggregators; i++) {
+        String aggregatorName = input.readUTF();
+        String aggregatorClassName = input.readUTF();
+        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
+          LongWritable count = new LongWritable(0);
+          count.readFields(input);
+          aggregatorData.receivedRequestCountFromMaster(count.get());
+        } else {
+          Class<Aggregator<Writable>> aggregatorClass =
+              AggregatorUtils.getAggregatorClass(aggregatorClassName);
+          aggregatorData.registerAggregatorClass(aggregatorName,
+              aggregatorClass);
+          Writable aggregatorValue =
+              aggregatorData.createAggregatorInitialValue(aggregatorName);
+          aggregatorValue.readFields(input);
+          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
+          serverData.getOwnerAggregatorData().registerAggregator(
+              aggregatorName, aggregatorClass);
+        }
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("doRequest: " +
+          "IOException occurred while processing request", e);
+    }
+    aggregatorData.receivedRequestFromMaster(getData());
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST;
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java?rev=1402363&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java Fri Oct 26 01:00:54 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.graph.Aggregator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * Request to send final aggregated values from worker which owns them to
+ * other workers
+ */
+public class SendAggregatorsToWorkerRequest extends
+    ByteArrayRequest implements WorkerRequest {
+
+  /**
+   * Constructor
+   *
+   * @param data Serialized aggregator data
+   */
+  public SendAggregatorsToWorkerRequest(byte[] data) {
+    super(data);
+  }
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendAggregatorsToWorkerRequest() {
+  }
+
+  @Override
+  public void doRequest(ServerData serverData) {
+    DataInput input = getDataInput();
+    AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
+    try {
+      int numAggregators = input.readInt();
+      for (int i = 0; i < numAggregators; i++) {
+        String aggregatorName = input.readUTF();
+        String aggregatorClassName = input.readUTF();
+        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
+          LongWritable count = new LongWritable(0);
+          count.readFields(input);
+          aggregatorData.receivedRequestCountFromWorker(count.get());
+        } else {
+          Class<Aggregator<Writable>> aggregatorClass =
+              AggregatorUtils.getAggregatorClass(aggregatorClassName);
+          aggregatorData.registerAggregatorClass(aggregatorName,
+              aggregatorClass);
+          Writable aggregatorValue =
+              aggregatorData.createAggregatorInitialValue(aggregatorName);
+          aggregatorValue.readFields(input);
+          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
+        }
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("doRequest: " +
+          "IOException occurred while processing request", e);
+    }
+    aggregatorData.receivedRequestFromWorker();
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST;
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java?rev=1402363&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java Fri Oct 26 01:00:54 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * Request to send partial aggregated values for current superstep (values
+ * which were computed by one worker's vertices)
+ */
+public class SendWorkerAggregatorsRequest extends
+    ByteArrayRequest implements WorkerRequest {
+
+  /**
+   * Constructor
+   *
+   * @param data Serialized aggregator data
+   */
+  public SendWorkerAggregatorsRequest(byte[] data) {
+    super(data);
+  }
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendWorkerAggregatorsRequest() {
+  }
+
+  @Override
+  public void doRequest(ServerData serverData) {
+    DataInput input = getDataInput();
+    OwnerAggregatorServerData aggregatorData =
+        serverData.getOwnerAggregatorData();
+    try {
+      int numAggregators = input.readInt();
+      for (int i = 0; i < numAggregators; i++) {
+        String aggregatorName = input.readUTF();
+        if (aggregatorName.equals(
+            AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
+          LongWritable count = new LongWritable(0);
+          count.readFields(input);
+          aggregatorData.receivedRequestCountFromWorker(count.get());
+        } else {
+          Writable aggregatedValue =
+              aggregatorData.createAggregatorInitialValue(aggregatorName);
+          aggregatedValue.readFields(input);
+          aggregatorData.aggregate(aggregatorName, aggregatedValue);
+        }
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("doRequest: " +
+          "IOException occurred while processing request", e);
+    }
+    aggregatorData.receivedRequestFromWorker();
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_WORKER_AGGREGATORS_REQUEST;
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java Fri Oct 26 01:00:54 2012
@@ -40,6 +40,10 @@ public class AggregatorsTestVertex exten
   private static final String MASTER_WRITE_AGG = "master";
   /** Value which master compute will use */
   private static final long MASTER_VALUE = 12345;
+  /** Prefix for name of aggregators in array */
+  private static final String ARRAY_PREFIX_AGG = "array";
+  /** Number of aggregators to use in array */
+  private static final int NUM_OF_AGGREGATORS_IN_ARRAY = 100;
 
   @Override
   public void compute(Iterable<DoubleWritable> messages) throws IOException {
@@ -62,6 +66,12 @@ public class AggregatorsTestVertex exten
     assertEquals(MASTER_VALUE * (1L << superstep),
         ((LongWritable) getAggregatedValue(MASTER_WRITE_AGG)).get());
 
+    for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+      aggregate(ARRAY_PREFIX_AGG + i, new LongWritable((superstep + 1) * i));
+      assertEquals(superstep * getTotalNumVertices() * i,
+          ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
+    }
+
     if (getSuperstep() == 10) {
       voteToHalt();
     }
@@ -88,6 +98,11 @@ public class AggregatorsTestVertex exten
       }
       assertEquals(nv * ((1L << superstep) - 1),
           ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+
+      for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+        assertEquals(superstep * getTotalNumVertices() * i,
+            ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
+      }
     }
 
     @Override
@@ -97,6 +112,10 @@ public class AggregatorsTestVertex exten
       registerPersistentAggregator(PERSISTENT_AGG,
           LongSumAggregator.class);
       registerAggregator(MASTER_WRITE_AGG, LongSumAggregator.class);
+
+      for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+        registerAggregator(ARRAY_PREFIX_AGG + i, LongSumAggregator.class);
+      }
     }
   }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java Fri Oct 26 01:00:54 2012
@@ -117,9 +117,6 @@ public abstract class BspService<I exten
   public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
   /** Denotes which workers have been cleaned up */
   public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
-  /** JSON aggregator value array key */
-  public static final String JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY =
-      "_aggregatorValueArrayKey";
   /** JSON partition stats key */
   public static final String JSONOBJ_PARTITION_STATS_KEY =
       "_partitionStatsKey";

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Fri Oct 26 01:00:54 2012
@@ -611,7 +611,7 @@ public class BspServiceMaster<I extends 
   }
 
   @Override
-  public MasterAggregatorUsage getAggregatorUsage() {
+  public MasterAggregatorHandler getAggregatorHandler() {
     return aggregatorHandler;
   }
 
@@ -650,7 +650,6 @@ public class BspServiceMaster<I extends 
     }
 
     aggregatorHandler.readFields(finalizedStream);
-    aggregatorHandler.finishSuperstep(superstep - 1, this);
     masterCompute.readFields(finalizedStream);
     finalizedStream.close();
 
@@ -768,7 +767,8 @@ public class BspServiceMaster<I extends 
               getTaskPartition() -
               currentMasterTaskPartitionCounter.getValue());
           masterCompute = getConfiguration().createMasterCompute();
-          aggregatorHandler = new MasterAggregatorHandler(getConfiguration());
+          aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
+              getContext());
           aggregatorHandler.initialize(this);
 
           commService = new NettyMasterClientServer(
@@ -1288,6 +1288,12 @@ public class BspServiceMaster<I extends 
         chosenWorkerInfoList,
         masterGraphPartitioner);
 
+    // We need to finalize aggregators from previous superstep (send them to
+    // worker owners) after new worker assignments
+    if (getSuperstep() >= 0) {
+      aggregatorHandler.finishSuperstep(commService);
+    }
+
     // Finalize the valid checkpoint file prefixes and possibly
     // the aggregators.
     if (checkpointFrequencyMet(getSuperstep())) {
@@ -1347,9 +1353,8 @@ public class BspServiceMaster<I extends 
 
     // Collect aggregator values, then run the master.compute() and
     // finally save the aggregator values
-    aggregatorHandler.prepareSuperstep(getSuperstep(), this);
+    aggregatorHandler.prepareSuperstep(commService);
     runMasterCompute(getSuperstep());
-    aggregatorHandler.finishSuperstep(getSuperstep(), this);
 
     // If the master is halted or all the vertices voted to halt and there
     // are no more messages in the system, stop the computation

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Fri Oct 26 01:00:54 2012
@@ -28,9 +28,11 @@ import org.apache.giraph.GiraphConfigura
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClient;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerServer;
@@ -105,6 +107,9 @@ public class BspServiceWorker<I extends 
   private final WorkerClient<I, V, E, M> workerClient;
   /** IPC Server */
   private final WorkerServer<I, V, E, M> workerServer;
+  /** Request processor for aggregator requests */
+  private final WorkerAggregatorRequestProcessor
+  workerAggregatorRequestProcessor;
   /** Master info */
   private WorkerInfo masterInfo = new WorkerInfo();
   /** List of workers */
@@ -149,14 +154,17 @@ public class BspServiceWorker<I extends 
     workerClient = new NettyWorkerClient<I, V, E, M>(context,
         getConfiguration(), this);
 
-
+    workerAggregatorRequestProcessor =
+        new NettyWorkerAggregatorRequestProcessor(getContext(),
+            getConfiguration(), this);
 
     workerInfo = new WorkerInfo(
         getHostname(), getTaskPartition(), workerServer.getPort());
     this.workerContext =
         getConfiguration().createWorkerContext(null);
 
-    aggregatorHandler = new WorkerAggregatorHandler();
+    aggregatorHandler =
+        new WorkerAggregatorHandler(this, getConfiguration(), context);
   }
 
   @Override
@@ -640,9 +648,6 @@ else[HADOOP_NON_SECURE]*/
           addressesAndPartitionsPath);
     }
 
-    if (getSuperstep() != INPUT_SUPERSTEP) {
-      aggregatorHandler.prepareSuperstep(getSuperstep(), this);
-    }
     getContext().setStatus("startSuperstep: " +
         getGraphMapper().getMapFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
@@ -684,14 +689,14 @@ else[HADOOP_NON_SECURE]*/
       getContext().progress();
     }
 
+    aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+
     if (LOG.isInfoEnabled()) {
       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
           ", messages = " + workerSentMessages + " " +
           MemoryUtils.getRuntimeMemoryStats());
     }
 
-    byte[] aggregatorArray =
-        aggregatorHandler.finishSuperstep(getSuperstep());
     Collection<PartitionStats> finalizedPartitionStats =
         workerGraphPartitioner.finalizePartitionStats(
             partitionStatsList, getPartitionStore());
@@ -701,8 +706,6 @@ else[HADOOP_NON_SECURE]*/
         WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
     JSONObject workerFinishedInfoObj = new JSONObject();
     try {
-      workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
-          Base64.encodeBytes(aggregatorArray));
       workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
           Base64.encodeBytes(partitionStatsBytes));
       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
@@ -1295,7 +1298,14 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public WorkerAggregatorUsage getAggregatorUsage() {
+  public WorkerAggregatorHandler getAggregatorHandler() {
     return aggregatorHandler;
   }
+
+  @Override
+  public void prepareSuperstep() {
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+    }
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Fri Oct 26 01:00:54 2012
@@ -129,7 +129,7 @@ public class GraphMapper<I extends Writa
    * @return Worker aggregator usage interface
    */
   public final WorkerAggregatorUsage getWorkerAggregatorUsage() {
-    return serviceWorker.getAggregatorUsage();
+    return serviceWorker.getAggregatorHandler();
   }
 
   /**
@@ -138,7 +138,7 @@ public class GraphMapper<I extends Writa
    * @return Master aggregator usage interface
    */
   public final MasterAggregatorUsage getMasterAggregatorUsage() {
-    return serviceMaster.getAggregatorUsage();
+    return serviceMaster.getAggregatorHandler();
   }
 
   public final WorkerContext getWorkerContext() {
@@ -488,6 +488,8 @@ public class GraphMapper<I extends Writa
         serviceWorker.storeCheckpoint();
       }
 
+      serviceWorker.prepareSuperstep();
+
       serviceWorker.getWorkerContext().setGraphState(graphState);
       serviceWorker.getWorkerContext().preSuperstep();
       context.progress();

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java Fri Oct 26 01:00:54 2012
@@ -18,51 +18,68 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.SuperstepState;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.comm.MasterClientServer;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.json.JSONException;
-import org.json.JSONObject;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
-import net.iharder.Base64;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.AbstractMap;
-import java.util.List;
 import java.util.Map;
 
-/** Master implementation of {@link AggregatorHandler} */
-public class MasterAggregatorHandler extends AggregatorHandler implements
-    MasterAggregatorUsage, Writable {
+/** Handler for aggregators on master */
+public class MasterAggregatorHandler implements MasterAggregatorUsage,
+    Writable {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(MasterAggregatorHandler.class);
+  /**
+   * Map of aggregators.
+   * This map is used to store final aggregated values received from worker
+   * owners, and also to read and write values provided during master.compute.
+   */
+  private final Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+      Maps.newHashMap();
   /** Aggregator writer */
   private final AggregatorWriter aggregatorWriter;
+  /** Progressable used to report progress */
+  private final Progressable progressable;
 
   /**
-   * @param config Hadoop configuration
+   * Constructor
+   *
+   * @param conf         Giraph configuration
+   * @param progressable Progressable used for reporting progress
    */
-  public MasterAggregatorHandler(Configuration config) {
-    aggregatorWriter = BspUtils.createAggregatorWriter(config);
+  public MasterAggregatorHandler(
+      ImmutableClassesGiraphConfiguration<?, ?, ?, ?> conf,
+      Progressable progressable) {
+    this.progressable = progressable;
+    aggregatorWriter = conf.createAggregatorWriter();
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
+    if (aggregator == null) {
+      return null;
+    } else {
+      return (A) aggregator.getPreviousAggregatedValue();
+    }
   }
 
   @Override
   public <A extends Writable> void setAggregatedValue(String name, A value) {
-    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
     if (aggregator == null) {
       throw new IllegalStateException(
           "setAggregatedValue: Tried to set value of aggregator which wasn't" +
@@ -75,6 +92,7 @@ public class MasterAggregatorHandler ext
   public <A extends Writable> boolean registerAggregator(String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
+    checkAggregatorName(name);
     return registerAggregator(name, aggregatorClass, false) != null;
   }
 
@@ -82,115 +100,81 @@ public class MasterAggregatorHandler ext
   public <A extends Writable> boolean registerPersistentAggregator(String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
+    checkAggregatorName(name);
     return registerAggregator(name, aggregatorClass, true) != null;
   }
 
   /**
-   * Get aggregator values supplied by workers for a particular superstep and
-   * aggregate them
+   * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as
+   * the name of aggregator. Throw an exception if he tries to use it.
    *
-   * @param superstep Superstep which we are preparing for
-   * @param service BspService to get zookeeper info from
+   * @param name Name of the aggregator to check.
    */
-  public void prepareSuperstep(long superstep, BspService service) {
-    String workerFinishedPath =
-        service.getWorkerFinishedPath(
-            service.getApplicationAttempt(), superstep);
-    List<String> hostnameIdPathList = null;
-    try {
-      hostnameIdPathList =
-          service.getZkExt().getChildrenExt(
-              workerFinishedPath, false, false, true);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "collectAndProcessAggregatorValues: KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "collectAndProcessAggregatorValues: InterruptedException", e);
+  private void checkAggregatorName(String name) {
+    if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
+      throw new IllegalStateException("checkAggregatorName: " +
+          AggregatorUtils.SPECIAL_COUNT_AGGREGATOR +
+          " is not allowed for the name of aggregator");
     }
+  }
 
-    for (String hostnameIdPath : hostnameIdPathList) {
-      JSONObject workerFinishedInfoObj = null;
-      byte[] aggregatorArray = null;
-      try {
-        byte[] zkData =
-            service.getZkExt().getData(hostnameIdPath, false, null);
-        workerFinishedInfoObj = new JSONObject(new String(zkData));
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: KeeperException", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: InterruptedException",
-            e);
-      } catch (JSONException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: JSONException", e);
-      }
-      try {
-        aggregatorArray = Base64.decode(workerFinishedInfoObj.getString(
-            service.JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY));
-      } catch (JSONException e) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("collectAndProcessAggregatorValues: " +
-              "No aggregators" + " for " + hostnameIdPath);
-        }
-        continue;
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: IOException", e);
-      }
-
-      DataInputStream input =
-          new DataInputStream(new ByteArrayInputStream(aggregatorArray));
-      try {
-        while (input.available() > 0) {
-          String aggregatorName = input.readUTF();
-          AggregatorWrapper<Writable> aggregator =
-              getAggregatorMap().get(aggregatorName);
-          if (aggregator == null) {
-            throw new IllegalStateException(
-                "collectAndProcessAggregatorValues: " +
-                    "Master received aggregator which isn't registered: " +
-                    aggregatorName);
-          }
-          Writable aggregatorValue = aggregator.createInitialValue();
-          aggregatorValue.readFields(input);
-          aggregator.aggregateCurrent(aggregatorValue);
-        }
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: " +
-                "IOException when reading aggregator data", e);
-      }
+  /**
+   * Helper function for registering aggregators.
+   *
+   * @param name            Name of the aggregator
+   * @param aggregatorClass Class of the aggregator
+   * @param persistent      Whether aggregator is persistent or not
+   * @param <A>             Aggregated value type
+   * @return Newly registered aggregator or aggregator which was previously
+   *         created with selected name, if any
+   */
+  private <A extends Writable> AggregatorWrapper<A> registerAggregator
+  (String name, Class<? extends Aggregator<A>> aggregatorClass,
+      boolean persistent) throws InstantiationException,
+      IllegalAccessException {
+    AggregatorWrapper<A> aggregatorWrapper =
+        (AggregatorWrapper<A>) aggregatorMap.get(name);
+    if (aggregatorWrapper == null) {
+      aggregatorWrapper =
+          new AggregatorWrapper<A>(aggregatorClass, persistent);
+      aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
     }
+    return aggregatorWrapper;
+  }
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("collectAndProcessAggregatorValues: Processed aggregators");
+  /**
+   * Prepare aggregators for current superstep
+   *
+   * @param commService Communication service
+   */
+  public void prepareSuperstep(MasterClientServer commService) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("prepareSuperstep: Start preapring aggregators");
     }
-
     // prepare aggregators for master compute
-    for (AggregatorWrapper<Writable> aggregator :
-        getAggregatorMap().values()) {
+    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
       if (aggregator.isPersistent()) {
         aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
       }
       aggregator.setPreviousAggregatedValue(
           aggregator.getCurrentAggregatedValue());
       aggregator.resetCurrentAggregator();
+      progressable.progress();
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("prepareSuperstep: Aggregators prepared");
     }
   }
 
   /**
-   * Save the supplied aggregator values.
+   * Finalize aggregators for current superstep and share them with workers
    *
-   * @param superstep Superstep which we are finishing.
-   * @param service BspService to get zookeeper info from
+   * @param commService Communication service
    */
-  public void finishSuperstep(long superstep, BspService service) {
-    Map<String, AggregatorWrapper<Writable>> aggregatorMap =
-        getAggregatorMap();
-
+  public void finishSuperstep(MasterClientServer commService) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Start finishing aggregators");
+    }
     for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
       if (aggregator.isChanged()) {
         // if master compute changed the value, use the one he chose
@@ -199,74 +183,86 @@ public class MasterAggregatorHandler ext
         // reset aggregator for the next superstep
         aggregator.resetCurrentAggregator();
       }
+      progressable.progress();
     }
 
-    if (aggregatorMap.size() > 0) {
-      String mergedAggregatorPath =
-          service.getMergedAggregatorPath(
-              service.getApplicationAttempt(),
-              superstep);
-
-      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-      DataOutput output = new DataOutputStream(outputStream);
-      try {
-        output.writeInt(aggregatorMap.size());
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
+    // send aggregators to their owners
+    // TODO: if aggregator owner and it's value didn't change,
+    //       we don't need to resend it
+    try {
       for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
           aggregatorMap.entrySet()) {
-        try {
-          output.writeUTF(entry.getKey());
-          output.writeUTF(entry.getValue().getAggregatorClass().getName());
-          entry.getValue().getPreviousAggregatedValue().write(output);
-        } catch (IOException e) {
-          throw new IllegalStateException("saveAggregatorValues: " +
-              "IllegalStateException", e);
-        }
+        commService.sendAggregator(entry.getKey(),
+            entry.getValue().getAggregatorClass(),
+            entry.getValue().getPreviousAggregatedValue());
+        progressable.progress();
       }
+      commService.finishSendingAggregatedValues();
+    } catch (IOException e) {
+      throw new IllegalStateException("finishSuperstep: " +
+          "IOException occurred while sending aggregators", e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Aggregators finished");
+    }
+  }
 
-      try {
-        service.getZkExt().createExt(mergedAggregatorPath,
-            outputStream.toByteArray(),
-            ZooDefs.Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT,
-            true);
-      } catch (KeeperException.NodeExistsException e) {
-        LOG.warn("saveAggregatorValues: " +
-            mergedAggregatorPath + " already exists!");
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "saveAggregatorValues: KeeperException", e);
-      } catch (InterruptedException e) {
+  /**
+   * Accept aggregated values sent by worker. Every aggregator will be sent
+   * only once, by its owner.
+   * We don't need to count the number of these requests because global
+   * superstep barrier will happen after workers ensure all requests of this
+   * type have been received and processed by master.
+   *
+   * @param aggregatedValuesInput Input in which aggregated values are
+   *                              written in the following format:
+   *                              number_of_aggregators
+   *                              name_1  value_1
+   *                              name_2  value_2
+   *                              ...
+   * @throws IOException
+   */
+  public void acceptAggregatedValues(
+      DataInput aggregatedValuesInput) throws IOException {
+    int numAggregators = aggregatedValuesInput.readInt();
+    for (int i = 0; i < numAggregators; i++) {
+      String aggregatorName = aggregatedValuesInput.readUTF();
+      AggregatorWrapper<Writable> aggregator =
+          aggregatorMap.get(aggregatorName);
+      if (aggregator == null) {
         throw new IllegalStateException(
-            "saveAggregatorValues: IllegalStateException",
-            e);
-      }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("saveAggregatorValues: Finished loading " +
-            mergedAggregatorPath);
+            "acceptAggregatedValues: " +
+                "Master received aggregator which isn't registered: " +
+                aggregatorName);
       }
+      Writable aggregatorValue = aggregator.createInitialValue();
+      aggregatorValue.readFields(aggregatedValuesInput);
+      aggregator.setCurrentAggregatedValue(aggregatorValue);
+      progressable.progress();
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("acceptAggregatedValues: Accepted one set with " +
+          numAggregators + " aggregated values");
     }
   }
 
   /**
    * Write aggregators to {@link AggregatorWriter}
    *
-   * @param superstep Superstep which just finished
+   * @param superstep      Superstep which just finished
    * @param superstepState State of the superstep which just finished
    */
-  public void writeAggregators(long superstep,
-      SuperstepState superstepState) {
+  public void writeAggregators(long superstep, SuperstepState superstepState) {
     try {
       Iterable<Map.Entry<String, Writable>> iter =
           Iterables.transform(
-              getAggregatorMap().entrySet(),
+              aggregatorMap.entrySet(),
               new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
                   Map.Entry<String, Writable>>() {
                 @Override
                 public Map.Entry<String, Writable> apply(
                     Map.Entry<String, AggregatorWrapper<Writable>> entry) {
+                  progressable.progress();
                   return new AbstractMap.SimpleEntry<String,
                       Writable>(entry.getKey(),
                       entry.getValue().getPreviousAggregatedValue());
@@ -292,7 +288,7 @@ public class MasterAggregatorHandler ext
       aggregatorWriter.initialize(service.getContext(),
           service.getApplicationAttempt());
     } catch (IOException e) {
-      throw new IllegalStateException("MasterAggregatorHandler: " +
+      throw new IllegalStateException("initialize: " +
           "Couldn't initialize aggregatorWriter", e);
     }
   }
@@ -308,8 +304,6 @@ public class MasterAggregatorHandler ext
 
   @Override
   public void write(DataOutput out) throws IOException {
-    Map<String, AggregatorWrapper<Writable>> aggregatorMap =
-        getAggregatorMap();
     out.writeInt(aggregatorMap.size());
     for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
         aggregatorMap.entrySet()) {
@@ -317,25 +311,34 @@ public class MasterAggregatorHandler ext
       out.writeUTF(entry.getValue().getAggregatorClass().getName());
       out.writeBoolean(entry.getValue().isPersistent());
       entry.getValue().getPreviousAggregatedValue().write(out);
+      progressable.progress();
     }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    Map<String, AggregatorWrapper<Writable>> aggregatorMap =
-        getAggregatorMap();
     aggregatorMap.clear();
     int numAggregators = in.readInt();
-    for (int i = 0; i < numAggregators; i++) {
-      String aggregatorName = in.readUTF();
-      String aggregatorClassName = in.readUTF();
-      boolean isPersistent = in.readBoolean();
-      AggregatorWrapper<Writable> aggregator =
-          registerAggregator(aggregatorName, aggregatorClassName,
-              isPersistent);
-      Writable value = aggregator.createInitialValue();
-      value.readFields(in);
-      aggregator.setPreviousAggregatedValue(value);
+    try {
+      for (int i = 0; i < numAggregators; i++) {
+        String aggregatorName = in.readUTF();
+        String aggregatorClassName = in.readUTF();
+        boolean isPersistent = in.readBoolean();
+        AggregatorWrapper<Writable> aggregator = registerAggregator(
+            aggregatorName,
+            AggregatorUtils.getAggregatorClass(aggregatorClassName),
+            isPersistent);
+        Writable value = aggregator.createInitialValue();
+        value.readFields(in);
+        aggregator.setPreviousAggregatedValue(value);
+        progressable.progress();
+      }
+    } catch (InstantiationException e) {
+      throw new IllegalStateException("readFields: " +
+          "InstantiationException occurred", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException("readFields: " +
+          "IllegalAccessException occurred", e);
     }
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java Fri Oct 26 01:00:54 2012
@@ -18,134 +18,199 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import com.google.common.collect.Maps;
+
 import java.io.IOException;
 import java.util.Map;
 
 /**
- * Worker implementation of {@link AggregatorHandler}
+ * Handler for aggregators on worker. Provides the aggregated values and
+ * performs aggregations from user vertex code (thread-safe). Also has
+ * methods for all superstep coordination related to aggregators.
+ *
+ * At the beginning of any superstep any worker calls prepareSuperstep(),
+ * which blocks until the final aggregates from the previous superstep have
+ * been delivered to the worker.
+ * Next, during the superstep worker can call aggregate() and
+ * getAggregatedValue() (both methods are thread safe) the former
+ * computes partial aggregates for this superstep from the worker,
+ * the latter returns (read-only) final aggregates from the previous superstep.
+ * Finally, at the end of the superstep, the worker calls finishSuperstep(),
+ * which propagates non-owned partial aggregates to the owner workers,
+ * and sends the final aggregate from the owner worker to the master.
  */
-public class WorkerAggregatorHandler extends AggregatorHandler implements
-    WorkerAggregatorUsage {
+public class WorkerAggregatorHandler implements WorkerAggregatorUsage {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(WorkerAggregatorHandler.class);
+  /** Map of values from previous superstep */
+  private Map<String, Writable> previousAggregatedValueMap =
+      Maps.newHashMap();
+  /** Map of aggregators for current superstep */
+  private Map<String, Aggregator<Writable>> currentAggregatorMap =
+      Maps.newHashMap();
+  /** Service worker */
+  private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
+  /** Progressable for reporting progress */
+  private final Progressable progressable;
+  /** How big a single aggregator request can be */
+  private final int maxBytesPerAggregatorRequest;
+
+  /**
+   * Constructor
+   *
+   * @param serviceWorker Service worker
+   * @param conf          Giraph configuration
+   * @param progressable  Progressable for reporting progress
+   */
+  public WorkerAggregatorHandler(
+      CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+      ImmutableClassesGiraphConfiguration conf,
+      Progressable progressable) {
+    this.serviceWorker = serviceWorker;
+    this.progressable = progressable;
+    maxBytesPerAggregatorRequest = conf.getInt(
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
+  }
 
   @Override
   public <A extends Writable> void aggregate(String name, A value) {
-    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+    Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
     if (aggregator != null) {
-      ((AggregatorWrapper<A>) aggregator).aggregateCurrent(value);
+      // TODO we can later improve this for mutlithreading to have local
+      // copies of aggregators per thread
+      synchronized (aggregator) {
+        aggregator.aggregate(value);
+      }
     } else {
       throw new IllegalStateException("aggregate: Tried to aggregate value " +
           "to unregistered aggregator " + name);
     }
   }
 
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return (A) previousAggregatedValueMap.get(name);
+  }
+
   /**
-   * Get aggregator values aggregated by master in previous superstep
+   * Prepare aggregators for current superstep
    *
-   * @param superstep Superstep which we are preparing for
-   * @param service BspService to get zookeeper info from
+   * @param requestProcessor Request processor for aggregators
    */
-  public void prepareSuperstep(long superstep, BspService service) {
-    // prepare aggregators for reading and next superstep
-    for (AggregatorWrapper<Writable> aggregator :
-        getAggregatorMap().values()) {
-      aggregator.setPreviousAggregatedValue(aggregator.createInitialValue());
-      aggregator.resetCurrentAggregator();
-    }
-    String mergedAggregatorPath =
-        service.getMergedAggregatorPath(service.getApplicationAttempt(),
-            superstep - 1);
-
-    byte[] aggregatorArray;
-    try {
-      aggregatorArray =
-          service.getZkExt().getData(mergedAggregatorPath, false, null);
-    } catch (KeeperException.NoNodeException e) {
-      LOG.info("getAggregatorValues: no aggregators in " +
-          mergedAggregatorPath + " on superstep " + superstep);
-      return;
-    } catch (KeeperException e) {
-      throw new IllegalStateException("Failed to get data for " +
-          mergedAggregatorPath + " with KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("Failed to get data for " +
-          mergedAggregatorPath + " with InterruptedException", e);
-    }
-
-    DataInput input =
-        new DataInputStream(new ByteArrayInputStream(aggregatorArray));
-    int numAggregators = 0;
-
+  public void prepareSuperstep(
+      WorkerAggregatorRequestProcessor requestProcessor) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("prepareSuperstep: Start preparing aggregators");
+    }
+    AllAggregatorServerData allAggregatorData =
+        serviceWorker.getServerData().getAllAggregatorData();
+    // Wait for my aggregators
+    Iterable<byte[]> dataToDistribute =
+        allAggregatorData.getDataFromMasterWhenReady();
     try {
-      numAggregators = input.readInt();
+      // Distribute my aggregators
+      requestProcessor.distributeAggregators(dataToDistribute);
     } catch (IOException e) {
-      throw new IllegalStateException("getAggregatorValues: " +
-          "Failed to decode data", e);
+      throw new IllegalStateException("prepareSuperstep: " +
+          "IOException occurred while trying to distribute aggregators", e);
+    }
+    // Wait for all other aggregators and store them
+    allAggregatorData.fillNextSuperstepMapsWhenReady(
+        serviceWorker.getWorkerInfoList().size(), previousAggregatedValueMap,
+        currentAggregatorMap);
+    allAggregatorData.reset();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("prepareSuperstep: Aggregators prepared");
     }
+  }
 
-    for (int i = 0; i < numAggregators; i++) {
+  /**
+   * Send aggregators to their owners and in the end to the master
+   *
+   * @param requestProcessor Request processor for aggregators
+   */
+  public void finishSuperstep(
+      WorkerAggregatorRequestProcessor requestProcessor) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Start finishing aggregators");
+    }
+    OwnerAggregatorServerData ownerAggregatorData =
+        serviceWorker.getServerData().getOwnerAggregatorData();
+    // First send partial aggregated values to their owners and determine
+    // which aggregators belong to this worker
+    for (Map.Entry<String, Aggregator<Writable>> entry :
+        currentAggregatorMap.entrySet()) {
       try {
-        String aggregatorName = input.readUTF();
-        String aggregatorClassName = input.readUTF();
-        AggregatorWrapper<Writable> aggregator =
-            registerAggregator(aggregatorName, aggregatorClassName, false);
-        Writable aggregatorValue = aggregator.createInitialValue();
-        aggregatorValue.readFields(input);
-        aggregator.setPreviousAggregatedValue(aggregatorValue);
+        boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
+            entry.getValue().getAggregatedValue());
+        if (!sent) {
+          // If it's my aggregator, add it directly
+          ownerAggregatorData.aggregate(entry.getKey(),
+              entry.getValue().getAggregatedValue());
+        }
       } catch (IOException e) {
-        throw new IllegalStateException(
-            "Failed to decode data for index " + i, e);
+        throw new IllegalStateException("finishSuperstep: " +
+            "IOException occurred while sending aggregator " +
+            entry.getKey() + " to its owner", e);
       }
+      progressable.progress();
     }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getAggregatorValues: Finished loading " +
-          mergedAggregatorPath);
+    try {
+      // Flush
+      requestProcessor.flush();
+    } catch (IOException e) {
+      throw new IllegalStateException("finishSuperstep: " +
+          "IOException occurred while sending aggregators to owners", e);
     }
-  }
 
-  /**
-   * Put aggregator values of the worker to a byte array that will later be
-   * aggregated by master.
-   *
-   * @param superstep Superstep which we are finishing.
-   * @return Byte array of the aggreagtor values
-   */
-  public byte[] finishSuperstep(long superstep) {
-    if (superstep == BspService.INPUT_SUPERSTEP) {
-      return new byte[0];
-    }
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    DataOutputStream output = new DataOutputStream(outputStream);
-    for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-        getAggregatorMap().entrySet()) {
-      if (entry.getValue().isChanged()) {
-        try {
-          output.writeUTF(entry.getKey());
-          entry.getValue().getCurrentAggregatedValue().write(output);
-        } catch (IOException e) {
-          throw new IllegalStateException("Failed to marshall aggregator " +
-              "with IOException " + entry.getKey(), e);
+    // Wait to receive partial aggregated values from all other workers
+    Iterable<Map.Entry<String, Writable>> myAggregators =
+        ownerAggregatorData.getMyAggregatorValuesWhenReady(
+            serviceWorker.getWorkerInfoList().size());
+
+    // Send final aggregated values to master
+    AggregatedValueOutputStream aggregatorOutput =
+        new AggregatedValueOutputStream();
+    for (Map.Entry<String, Writable> entry : myAggregators) {
+      try {
+        int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
+            entry.getValue());
+        if (currentSize > maxBytesPerAggregatorRequest) {
+          requestProcessor.sendAggregatedValuesToMaster(
+              aggregatorOutput.flush());
         }
+        progressable.progress();
+      } catch (IOException e) {
+        throw new IllegalStateException("finishSuperstep: " +
+            "IOException occurred while writing aggregator " +
+            entry.getKey(), e);
       }
     }
+    try {
+      requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
+    } catch (IOException e) {
+      throw new IllegalStateException("finishSuperstep: " +
+          "IOException occured while sending aggregators to master", e);
+    }
+    // Wait for master to receive aggregated values before proceeding
+    serviceWorker.getWorkerClient().waitAllRequests();
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info(
-          "marshalAggregatorValues: Finished assembling aggregator values");
+    ownerAggregatorData.reset();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Aggregators finished");
     }
-    return outputStream.toByteArray();
   }
 }

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java?rev=1402363&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java Fri Oct 26 01:00:54 2012
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+/**
+ * User must follow this protocol for concurrent access:
+ *
+ * (1) an object instance is constructed
+ * (2) arbitrarily many times
+ *     (2a) concurrent calls to requirePermits(), releasePermits() and
+ *          waitForRequiredPermits() are issued
+ *     (2b) waitForRequiredPermits() returns
+ *
+ * Note that the next cycle of  calls to requirePermits() or releasePermits()
+ * cannot start until the previous call to waitForRequiredPermits()
+ * has returned.
+ *
+ * Methods of this class are thread-safe.
+ */
+public class ExpectedBarrier {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(ExpectedBarrier.class);
+  /** Msecs to refresh the progress meter */
+  private static final int MSEC_PERIOD = 10000;
+  /** Progressable for reporting progress */
+  private final Progressable progressable;
+  /** Number of times permits were added */
+  private long timesRequired = 0;
+  /** Number of permits we are currently waiting for */
+  private long waitingOnPermits = 0;
+  /** Logger */
+  private final TimedLogger logger;
+
+  /**
+   * Constructor
+   *
+   * @param progressable Progressable for reporting progress
+   */
+  public ExpectedBarrier(Progressable progressable) {
+    this.progressable = progressable;
+    logger = new TimedLogger(MSEC_PERIOD, LOG);
+  }
+
+  /**
+   * Wait until permits have been required desired number of times,
+   * and all required permits are available
+   *
+   * @param desiredTimesRequired How many times should permits have been
+   *                             required
+   */
+  public synchronized void waitForRequiredPermits(
+      long desiredTimesRequired) {
+    while (timesRequired < desiredTimesRequired || waitingOnPermits > 0) {
+      try {
+        wait(MSEC_PERIOD);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("waitForRequiredPermits: " +
+            "InterruptedException occurred");
+      }
+      progressable.progress();
+      if (LOG.isInfoEnabled()) {
+        if (timesRequired < desiredTimesRequired) {
+          logger.info("waitForRequiredPermits: " +
+              "Waiting for times required to be " + desiredTimesRequired +
+              " (currently " + timesRequired + ") ");
+        } else {
+          logger.info("waitForRequiredPermits: " +
+              "Waiting for " + waitingOnPermits + " more permits.");
+        }
+      }
+    }
+
+    // Reset for the next time to use
+    timesRequired = 0;
+    waitingOnPermits = 0;
+  }
+
+  /**
+   * Require more permits. This will increase the number of times permits
+   * were required. Doesn't wait for permits to become available.
+   *
+   * @param permits Number of permits to require
+   */
+  public synchronized void requirePermits(long permits) {
+    timesRequired++;
+    waitingOnPermits += permits;
+    notifyAll();
+  }
+
+  /**
+   * Release one permit.
+   */
+  public synchronized void releaseOnePermit() {
+    releasePermits(1);
+  }
+
+  /**
+   * Release some permits.
+   *
+   * @param permits Number of permits to release
+   */
+  public synchronized void releasePermits(long permits) {
+    waitingOnPermits -= permits;
+    notifyAll();
+  }
+}

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java?rev=1402363&r1=1402362&r2=1402363&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java Fri Oct 26 01:00:54 2012
@@ -20,8 +20,10 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.BspCase;
 import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.examples.AggregatorsTestVertex;
 import org.apache.giraph.examples.SimpleCheckpointVertex;
 import org.apache.giraph.examples.SimplePageRankVertex;
@@ -30,7 +32,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -41,6 +47,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
 
 /** Tests if aggregators are handled on a proper way */
 public class TestAggregatorsHandling extends BspCase {
@@ -49,6 +57,21 @@ public class TestAggregatorsHandling ext
     super(TestAggregatorsHandling.class.getName());
   }
 
+  private Map<String, AggregatorWrapper<Writable>> getAggregatorMap
+      (MasterAggregatorHandler aggregatorHandler) {
+    try {
+      Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField
+          ("aggregatorMap");
+      aggregtorMapField.setAccessible(true);
+      return (Map<String, AggregatorWrapper<Writable>>)
+          aggregtorMapField.get(aggregatorHandler);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException(e);
+    } catch (NoSuchFieldException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
   /** Tests if aggregators are handled on a proper way during supersteps */
   @Test
   public void testAggregatorsHandling() throws IOException,
@@ -58,6 +81,9 @@ public class TestAggregatorsHandling ext
         SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
     job.getConfiguration().setMasterComputeClass(
         AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    // test with aggregators split in a few requests
+    job.getConfiguration().setInt(
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST, 50);
     assertTrue(job.run(true));
   }
 
@@ -65,8 +91,13 @@ public class TestAggregatorsHandling ext
   @Test
   public void testMasterAggregatorsSerialization() throws
       IllegalAccessException, InstantiationException, IOException {
+    ImmutableClassesGiraphConfiguration conf =
+        Mockito.mock(ImmutableClassesGiraphConfiguration.class);
+    Mockito.when(conf.getAggregatorWriterClass()).thenReturn(
+        TextAggregatorWriter.class);
+    Progressable progressable = Mockito.mock(Progressable.class);
     MasterAggregatorHandler handler =
-        new MasterAggregatorHandler(new Configuration());
+        new MasterAggregatorHandler(conf, progressable);
 
     String regularAggName = "regular";
     LongWritable regularValue = new LongWritable(5);
@@ -80,7 +111,7 @@ public class TestAggregatorsHandling ext
     handler.setAggregatedValue(persistentAggName, persistentValue);
 
     for (AggregatorWrapper<Writable> aggregator :
-        handler.getAggregatorMap().values()) {
+        getAggregatorMap(handler).values()) {
       aggregator.setPreviousAggregatedValue(
           aggregator.getCurrentAggregatedValue());
     }
@@ -89,14 +120,14 @@ public class TestAggregatorsHandling ext
     handler.write(new DataOutputStream(out));
 
     MasterAggregatorHandler restartedHandler =
-        new MasterAggregatorHandler(new Configuration());
+        new MasterAggregatorHandler(conf, progressable);
     restartedHandler.readFields(
         new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
 
-    assertEquals(2, restartedHandler.getAggregatorMap().size());
+    assertEquals(2, getAggregatorMap(restartedHandler).size());
 
     AggregatorWrapper<Writable> regularAgg =
-        restartedHandler.getAggregatorMap().get(regularAggName);
+        getAggregatorMap(restartedHandler).get(regularAggName);
     assertTrue(
         regularAgg.getAggregatorClass().equals(LongSumAggregator.class));
     assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
@@ -105,7 +136,7 @@ public class TestAggregatorsHandling ext
     assertFalse(regularAgg.isPersistent());
 
     AggregatorWrapper<Writable> persistentAgg =
-        restartedHandler.getAggregatorMap().get(persistentAggName);
+        getAggregatorMap(restartedHandler).get(persistentAggName);
     assertTrue(persistentAgg.getAggregatorClass().equals
         (DoubleOverwriteAggregator.class));
     assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());