You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/11/06 05:35:26 UTC

svn commit: r1406040 - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/comm/aggregators/ giraph/src/main/java/org/apache/giraph/graph/

Author: maja
Date: Tue Nov  6 04:35:25 2012
New Revision: 1406040

URL: http://svn.apache.org/viewvc?rev=1406040&view=rev
Log:
GIRAPH-397: We should have copies of aggregators per thread to avoid synchronizing on aggregate()

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerContext.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Nov  6 04:35:25 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-397: We should have copies of aggregators per thread
+  to avoid synchronizing on aggregate() (majakabiljo)
+
   GIRAPH-406: Enforce partition ids in [0, n-1] (majakabiljo)
 
   GIRAPH-402: slf4j dependency bug (nitay via apresta)

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java Tue Nov  6 04:35:25 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.aggregators;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Aggregator;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.hadoop.io.Writable;
@@ -40,6 +41,17 @@ public class AggregatorUtils {
   /** Default max size of single aggregator request (1MB) */
   public static final int MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT =
       1024 * 1024;
+  /**
+   * Whether or not to have a copy of aggregators for each compute thread.
+   * Unless aggregators are very large and it would hurt the application to
+   * have that many copies of them, user should use thread-local aggregators
+   * to prevent synchronization when aggregate() is called (and get better
+   * performance because of it).
+   */
+  public static final String USE_THREAD_LOCAL_AGGREGATORS =
+      "giraph.useThreadLocalAggregators";
+  /** Default is not to have a copy of aggregators for each thread */
+  public static final boolean USE_THREAD_LOCAL_AGGREGATORS_DEFAULT = false;
 
   /** Do not instantiate */
   private AggregatorUtils() { }
@@ -93,4 +105,16 @@ public class AggregatorUtils {
     int index = Math.abs(aggregatorName.hashCode()) % workers.size();
     return workers.get(index);
   }
+
+  /**
+   * Check if we should use thread local aggregators.
+   *
+   * @param conf Giraph configuration
+   * @return True iff we should use thread local aggregators
+   */
+  public static boolean
+  useThreadLocalAggregators(ImmutableClassesGiraphConfiguration conf) {
+    return conf.getBoolean(USE_THREAD_LOCAL_AGGREGATORS,
+        USE_THREAD_LOCAL_AGGREGATORS_DEFAULT);
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Nov  6 04:35:25 2012
@@ -1465,7 +1465,7 @@ public class BspServiceMaster<I extends 
     // increase the superstep counter it uses by one
     GraphState<I, V, E, M> graphState =
         new GraphState<I, V, E, M>(superstep + 1, vertexCounter.getValue(),
-            edgeCounter.getValue(), getContext(), getGraphMapper(), null);
+            edgeCounter.getValue(), getContext(), getGraphMapper(), null, null);
     masterCompute.setGraphState(graphState);
     if (superstep == INPUT_SUPERSTEP) {
       try {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Nov  6 04:35:25 2012
@@ -289,7 +289,7 @@ public class BspServiceWorker<I extends 
 
     GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
         INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
-        null);
+        null, null);
 
     VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
         new VertexInputSplitsCallableFactory<I, V, E, M>(
@@ -316,7 +316,7 @@ public class BspServiceWorker<I extends 
 
     GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
         INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
-        null);
+        null, null);
 
     EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
         new EdgeInputSplitsCallableFactory<I, V, E, M>(
@@ -456,7 +456,7 @@ public class BspServiceWorker<I extends 
     // Add the partitions that this worker owns
     GraphState<I, V, E, M> graphState =
         new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
-            getContext(), getGraphMapper(), null);
+            getContext(), getGraphMapper(), null, null);
     Collection<? extends PartitionOwner> masterSetPartitionOwners =
         startSuperstep(graphState);
     workerGraphPartitioner.updatePartitionOwners(

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java Tue Nov  6 04:35:25 2012
@@ -122,9 +122,13 @@ public class ComputeCallable<I extends W
     this.workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E, M>(
             context, configuration, serviceWorker);
+    WorkerThreadAggregatorUsage aggregatorUsage =
+        serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
+
     this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
         graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
-        context, graphState.getGraphMapper(), workerClientRequestProcessor);
+        context, graphState.getGraphMapper(), workerClientRequestProcessor,
+        aggregatorUsage);
 
     List<PartitionStats> partitionStatsList = Lists.newArrayList();
     while (!partitionIdQueue.isEmpty()) {
@@ -159,6 +163,7 @@ public class ComputeCallable<I extends W
     }
     try {
       workerClientRequestProcessor.flush();
+      aggregatorUsage.finishThreadComputation();
     } catch (IOException e) {
       throw new IllegalStateException("call: Flushing failed.", e);
     }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Tue Nov  6 04:35:25 2012
@@ -144,15 +144,6 @@ public class GraphMapper<I extends Writa
   }
 
   /**
-   * Get worker aggregator usage, a subset of the functionality
-   *
-   * @return Worker aggregator usage interface
-   */
-  public final WorkerAggregatorUsage getWorkerAggregatorUsage() {
-    return serviceWorker.getAggregatorHandler();
-  }
-
-  /**
    * Get master aggregator usage, a subset of the functionality
    *
    * @return Master aggregator usage interface
@@ -504,9 +495,11 @@ public class GraphMapper<I extends Writa
       return;
     }
 
+    WorkerAggregatorUsage aggregatorUsage =
+        serviceWorker.getAggregatorHandler();
     serviceWorker.getWorkerContext().setGraphState(
         new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
-            numVertices, numEdges, context, this, null));
+            numVertices, numEdges, context, this, null, aggregatorUsage));
     try {
       serviceWorker.getWorkerContext().preApplication();
     } catch (InstantiationException e) {
@@ -531,7 +524,7 @@ public class GraphMapper<I extends Writa
 
       GraphState<I, V, E, M> graphState =
           new GraphState<I, V, E, M>(superstep, numVertices, numEdges,
-              context, this, null);
+              context, this, null, aggregatorUsage);
 
       Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
           serviceWorker.startSuperstep(graphState);
@@ -558,7 +551,7 @@ public class GraphMapper<I extends Writa
         numVertices = vertexEdgeCount.getVertexCount();
         numEdges = vertexEdgeCount.getEdgeCount();
         graphState = new GraphState<I, V, E, M>(superstep, numVertices,
-            numEdges, context, this, null);
+            numEdges, context, this, null, aggregatorUsage);
       } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
         serviceWorker.storeCheckpoint();
       }
@@ -632,7 +625,7 @@ public class GraphMapper<I extends Writa
 
     serviceWorker.getWorkerContext().setGraphState(
         new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
-            numVertices, numEdges, context, this, null));
+            numVertices, numEdges, context, this, null, aggregatorUsage));
     serviceWorker.getWorkerContext().postApplication();
     context.progress();
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java Tue Nov  6 04:35:25 2012
@@ -46,6 +46,8 @@ E extends Writable, M extends Writable> 
   /** Handles requests */
   private final WorkerClientRequestProcessor<I, V, E, M>
   workerClientRequestProcessor;
+  /** Worker aggregator usage */
+  private final WorkerAggregatorUsage workerAggregatorUsage;
 
   /**
    * Constructor
@@ -56,18 +58,22 @@ E extends Writable, M extends Writable> 
    * @param context Context
    * @param graphMapper Graph mapper
    * @param workerClientRequestProcessor Handles all communication
+   * @param workerAggregatorUsage Aggregator usage
+   *
    */
   public GraphState(
       long superstep, long numVertices,
       long numEdges, Mapper.Context context,
       GraphMapper<I, V, E, M> graphMapper,
-      WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor) {
+      WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor,
+      WorkerAggregatorUsage workerAggregatorUsage) {
     this.superstep = superstep;
     this.numVertices = numVertices;
     this.numEdges = numEdges;
     this.context = context;
     this.graphMapper = graphMapper;
     this.workerClientRequestProcessor = workerClientRequestProcessor;
+    this.workerAggregatorUsage = workerAggregatorUsage;
   }
 
   public long getSuperstep() {
@@ -95,6 +101,10 @@ E extends Writable, M extends Writable> 
     return workerClientRequestProcessor;
   }
 
+  public WorkerAggregatorUsage getWorkerAggregatorUsage() {
+    return workerAggregatorUsage;
+  }
+
   @Override
   public String toString() {
     return "(superstep=" + superstep + ",numVertices=" + numVertices + "," +

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Tue Nov  6 04:35:25 2012
@@ -119,7 +119,8 @@ public abstract class InputSplitsCallabl
             context, configuration, bspServiceWorker);
     this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
         graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
-        context, graphState.getGraphMapper(), workerClientRequestProcessor);
+        context, graphState.getGraphMapper(), workerClientRequestProcessor,
+        null);
     try {
       splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
           inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort());

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java Tue Nov  6 04:35:25 2012
@@ -331,13 +331,13 @@ public abstract class Vertex<I extends W
 
   @Override
   public <A extends Writable> void aggregate(String name, A value) {
-    getGraphState().getGraphMapper().getWorkerAggregatorUsage().
+    getGraphState().getWorkerAggregatorUsage().
         aggregate(name, value);
   }
 
   @Override
   public <A extends Writable> A getAggregatedValue(String name) {
-    return getGraphState().getGraphMapper().getWorkerAggregatorUsage().
+    return getGraphState().getWorkerAggregatorUsage().
         <A>getAggregatedValue(name);
   }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java Tue Nov  6 04:35:25 2012
@@ -50,7 +50,7 @@ import java.util.Map;
  * which propagates non-owned partial aggregates to the owner workers,
  * and sends the final aggregate from the owner worker to the master.
  */
-public class WorkerAggregatorHandler implements WorkerAggregatorUsage {
+public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(WorkerAggregatorHandler.class);
@@ -66,6 +66,8 @@ public class WorkerAggregatorHandler imp
   private final Progressable progressable;
   /** How big a single aggregator request can be */
   private final int maxBytesPerAggregatorRequest;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor
@@ -80,6 +82,7 @@ public class WorkerAggregatorHandler imp
       Progressable progressable) {
     this.serviceWorker = serviceWorker;
     this.progressable = progressable;
+    this.conf = conf;
     maxBytesPerAggregatorRequest = conf.getInt(
         AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
         AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
@@ -89,8 +92,6 @@ public class WorkerAggregatorHandler imp
   public <A extends Writable> void aggregate(String name, A value) {
     Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
     if (aggregator != null) {
-      // TODO we can later improve this for mutlithreading to have local
-      // copies of aggregators per thread
       synchronized (aggregator) {
         aggregator.aggregate(value);
       }
@@ -213,4 +214,81 @@ public class WorkerAggregatorHandler imp
       LOG.debug("finishSuperstep: Aggregators finished");
     }
   }
+
+  /**
+   * Create new aggregator usage which will be used by one of the compute
+   * threads.
+   *
+   * @return New aggregator usage
+   */
+  public WorkerThreadAggregatorUsage newThreadAggregatorUsage() {
+    if (AggregatorUtils.useThreadLocalAggregators(conf)) {
+      return new ThreadLocalWorkerAggregatorUsage();
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public void finishThreadComputation() {
+    // If we don't use thread-local aggregators, all the aggregated values
+    // are already in this object
+  }
+
+  /**
+   * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
+   * We can use one instance of this object per thread to prevent
+   * synchronizing on each aggregate() call. In the end of superstep,
+   * values from each of these will be aggregated back to {@link
+   * WorkerAggregatorHandler}
+   */
+  public class ThreadLocalWorkerAggregatorUsage
+      implements WorkerThreadAggregatorUsage {
+    /** Thread-local aggregator map */
+    private final Map<String, Aggregator<Writable>> threadAggregatorMap;
+
+    /**
+     * Constructor
+     *
+     * Creates new instances of all aggregators from
+     * {@link WorkerAggregatorHandler}
+     */
+    public ThreadLocalWorkerAggregatorUsage() {
+      threadAggregatorMap = Maps.newHashMapWithExpectedSize(
+          WorkerAggregatorHandler.this.currentAggregatorMap.size());
+      for (Map.Entry<String, Aggregator<Writable>> entry :
+          WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
+        threadAggregatorMap.put(entry.getKey(),
+            AggregatorUtils.newAggregatorInstance(
+                (Class<Aggregator<Writable>>) entry.getValue().getClass()));
+      }
+    }
+
+    @Override
+    public <A extends Writable> void aggregate(String name, A value) {
+      Aggregator<Writable> aggregator = threadAggregatorMap.get(name);
+      if (aggregator != null) {
+        aggregator.aggregate(value);
+      } else {
+        throw new IllegalStateException("aggregate: " +
+            "Tried to aggregate value to unregistered aggregator " + name);
+      }
+    }
+
+    @Override
+    public <A extends Writable> A getAggregatedValue(String name) {
+      return WorkerAggregatorHandler.this.<A>getAggregatedValue(name);
+    }
+
+    @Override
+    public void finishThreadComputation() {
+      // Aggregate the values this thread's vertices provided back to
+      // WorkerAggregatorHandler
+      for (Map.Entry<String, Aggregator<Writable>> entry :
+          threadAggregatorMap.entrySet()) {
+        WorkerAggregatorHandler.this.aggregate(entry.getKey(),
+            entry.getValue().getAggregatedValue());
+      }
+    }
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerContext.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1406040&r1=1406039&r2=1406040&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerContext.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerContext.java Tue Nov  6 04:35:25 2012
@@ -111,13 +111,11 @@ public abstract class WorkerContext impl
 
   @Override
   public <A extends Writable> void aggregate(String name, A value) {
-    graphState.getGraphMapper().getWorkerAggregatorUsage().
-        aggregate(name, value);
+    graphState.getWorkerAggregatorUsage().aggregate(name, value);
   }
 
   @Override
   public <A extends Writable> A getAggregatedValue(String name) {
-    return graphState.getGraphMapper().getWorkerAggregatorUsage().
-        <A>getAggregatedValue(name);
+    return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
   }
 }

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java?rev=1406040&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java Tue Nov  6 04:35:25 2012
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * {@link WorkerAggregatorUsage} which can be used in each of the
+ * computation threads.
+ */
+public interface WorkerThreadAggregatorUsage extends WorkerAggregatorUsage {
+  /**
+   * Call this after thread's computation is finished,
+   * i.e. when all vertices have provided their values to aggregators
+   */
+  void finishThreadComputation();
+}