You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [14/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java Thu Feb 16 22:12:31 2012
@@ -27,101 +27,106 @@ import org.apache.hadoop.mapreduce.Mappe
  */
 @SuppressWarnings("rawtypes")
 public abstract class WorkerContext implements AggregatorUsage {
-    /** Global graph state */
-	private GraphState graphState;
+  /** Global graph state */
+  private GraphState graphState;
 
-	public void setGraphState(GraphState graphState) {
-		this.graphState = graphState;
-	}
-
-    /**
-     * Initialize the WorkerContext.
-     * This method is executed once on each Worker before the first
-     * superstep starts.
-     *
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     */
-	public abstract void preApplication() throws InstantiationException,
-		IllegalAccessException;
-
-    /**
-     * Finalize the WorkerContext.
-     * This method is executed once on each Worker after the last
-     * superstep ends.
-     */
-    public abstract void postApplication();
-
-    /**
-     * Execute user code.
-     * This method is executed once on each Worker before each
-     * superstep starts.
-     */
-    public abstract void preSuperstep();
-
-    /**
-     * Execute user code.
-     * This method is executed once on each Worker after each
-     * superstep ends.
-     */
-    public abstract void postSuperstep();
-
-    /**
-     * Retrieves the current superstep.
-     *
-     * @return Current superstep
-     */
-    public long getSuperstep() {
-        return graphState.getSuperstep();
-    }
-
-    /**
-     * Get the total (all workers) number of vertices that
-     * existed in the previous superstep.
-     *
-     * @return Total number of vertices (-1 if first superstep)
-     */
-    public long getNumVertices() {
-    	return graphState.getNumVertices();
-    }
-
-    /**
-     * Get the total (all workers) number of edges that
-     * existed in the previous superstep.
-     *
-     * @return Total number of edges (-1 if first superstep)
-     */
-    public long getNumEdges() {
-    	return graphState.getNumEdges();
-    }
-
-    /**
-     * Get the mapper context
-     *
-     * @return Mapper context
-     */
-    public Mapper.Context getContext() {
-        return graphState.getContext();
-    }
-
-    @Override
-    public final <A extends Writable> Aggregator<A> registerAggregator(
-            String name,
-            Class<? extends Aggregator<A>> aggregatorClass)
-            throws InstantiationException, IllegalAccessException {
-        return graphState.getGraphMapper().getAggregatorUsage().
-            registerAggregator(name, aggregatorClass);
-    }
-
-    @Override
-    public final Aggregator<? extends Writable> getAggregator(String name) {
-        return graphState.getGraphMapper().getAggregatorUsage().
-            getAggregator(name);
-    }
-
-    @Override
-    public final boolean useAggregator(String name) {
-        return graphState.getGraphMapper().getAggregatorUsage().
-            useAggregator(name);
-    }
-}
\ No newline at end of file
+  /**
+   * Set the graph state.
+   *
+   *  @param graphState Used to set the graph state.
+   */
+  public void setGraphState(GraphState graphState) {
+    this.graphState = graphState;
+  }
+
+  /**
+   * Initialize the WorkerContext.
+   * This method is executed once on each Worker before the first
+   * superstep starts.
+   *
+   * @throws IllegalAccessException Thrown for getting the class
+   * @throws InstantiationException Expected instantiation in this method.
+   */
+  public abstract void preApplication() throws InstantiationException,
+    IllegalAccessException;
+
+  /**
+   * Finalize the WorkerContext.
+   * This method is executed once on each Worker after the last
+   * superstep ends.
+   */
+  public abstract void postApplication();
+
+  /**
+   * Execute user code.
+   * This method is executed once on each Worker before each
+   * superstep starts.
+   */
+  public abstract void preSuperstep();
+
+  /**
+   * Execute user code.
+   * This method is executed once on each Worker after each
+   * superstep ends.
+   */
+  public abstract void postSuperstep();
+
+  /**
+   * Retrieves the current superstep.
+   *
+   * @return Current superstep
+   */
+  public long getSuperstep() {
+    return graphState.getSuperstep();
+  }
+
+  /**
+   * Get the total (all workers) number of vertices that
+   * existed in the previous superstep.
+   *
+   * @return Total number of vertices (-1 if first superstep)
+   */
+  public long getNumVertices() {
+    return graphState.getNumVertices();
+  }
+
+  /**
+   * Get the total (all workers) number of edges that
+   * existed in the previous superstep.
+   *
+   * @return Total number of edges (-1 if first superstep)
+   */
+  public long getNumEdges() {
+    return graphState.getNumEdges();
+  }
+
+  /**
+   * Get the mapper context
+   *
+   * @return Mapper context
+   */
+  public Mapper.Context getContext() {
+    return graphState.getContext();
+  }
+
+  @Override
+  public final <A extends Writable> Aggregator<A> registerAggregator(
+    String name,
+    Class<? extends Aggregator<A>> aggregatorClass)
+    throws InstantiationException, IllegalAccessException {
+    return graphState.getGraphMapper().getAggregatorUsage().
+        registerAggregator(name, aggregatorClass);
+  }
+
+  @Override
+  public final Aggregator<? extends Writable> getAggregator(String name) {
+    return graphState.getGraphMapper().getAggregatorUsage().
+        getAggregator(name);
+  }
+
+  @Override
+  public final boolean useAggregator(String name) {
+    return graphState.getGraphMapper().getAggregatorUsage().
+        useAggregator(name);
+  }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Thu Feb 16 22:12:31 2012
@@ -28,84 +28,91 @@ import org.apache.hadoop.io.Writable;
  * Information about a worker that is sent to the master and other workers.
  */
 public class WorkerInfo implements Writable {
-    /** Worker hostname */
-    private String hostname;
-    /** Partition id of this worker */
-    private int partitionId = -1;
-    /** Port that the RPC server is using */
-    private int port = -1;
-    /** Hostname + "_" + id for easier debugging */
-    private String hostnameId;
-
-    /**
-     * Constructor for reflection
-     */
-    public WorkerInfo() {
-    }
-
-    public WorkerInfo(String hostname, int partitionId, int port) {
-        this.hostname = hostname;
-        this.partitionId = partitionId;
-        this.port = port;
-        this.hostnameId = hostname + "_" + partitionId;
-    }
-
-    public String getHostname() {
-        return hostname;
-    }
-
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    public String getHostnameId() {
-        return hostnameId;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (other instanceof WorkerInfo) {
-            WorkerInfo workerInfo = (WorkerInfo) other;
-            if (hostname.equals(workerInfo.getHostname()) &&
-                    (partitionId == workerInfo.getPartitionId()) &&
-                    (port == workerInfo.getPort())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = 17;
-        result = 37 * result + port;
-        result = 37 * result + hostname.hashCode();
-        result = 37 * result + partitionId;
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "Worker(hostname=" + hostname + ", MRpartition=" +
-            partitionId + ", port=" + port + ")";
-    }
-
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        hostname = input.readUTF();
-        partitionId = input.readInt();
-        port = input.readInt();
-        hostnameId = hostname + "_" + partitionId;
-    }
-
-    @Override
-    public void write(DataOutput output) throws IOException {
-        output.writeUTF(hostname);
-        output.writeInt(partitionId);
-        output.writeInt(port);
-    }
+  /** Worker hostname */
+  private String hostname;
+  /** Partition id of this worker */
+  private int partitionId = -1;
+  /** Port that the RPC server is using */
+  private int port = -1;
+  /** Hostname + "_" + id for easier debugging */
+  private String hostnameId;
+
+  /**
+   * Constructor for reflection
+   */
+  public WorkerInfo() {
+  }
+
+  /**
+   * Constructor with paramters.
+   *
+   * @param hostname Hostname of this worker.
+   * @param partitionId partition id of this particular object.
+   * @param port Port of the service.
+   */
+  public WorkerInfo(String hostname, int partitionId, int port) {
+    this.hostname = hostname;
+    this.partitionId = partitionId;
+    this.port = port;
+    this.hostnameId = hostname + "_" + partitionId;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  public String getHostnameId() {
+    return hostnameId;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof WorkerInfo) {
+      WorkerInfo workerInfo = (WorkerInfo) other;
+      if (hostname.equals(workerInfo.getHostname()) &&
+          (partitionId == workerInfo.getPartitionId()) &&
+          (port == workerInfo.getPort())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 37 * result + port;
+    result = 37 * result + hostname.hashCode();
+    result = 37 * result + partitionId;
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "Worker(hostname=" + hostname + ", MRpartition=" +
+        partitionId + ", port=" + port + ")";
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    hostname = input.readUTF();
+    partitionId = input.readInt();
+    port = input.readInt();
+    hostnameId = hostname + "_" + partitionId;
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeUTF(hostname);
+    output.writeInt(partitionId);
+    output.writeInt(port);
+  }
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of all the graph related objects, built on the
+ * org.apache.bsp package.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.graph;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java Thu Feb 16 22:12:31 2012
@@ -31,116 +31,132 @@ import org.apache.hadoop.conf.Configurat
  * owner implementations.
  */
 public class BasicPartitionOwner implements PartitionOwner, Configurable {
-    /** Configuration */
-    private Configuration conf;
-    /** Partition id */
-    private int partitionId = -1;
-    /** Owning worker information */
-    private WorkerInfo workerInfo;
-    /** Previous (if any) worker info */
-    private WorkerInfo previousWorkerInfo;
-    /** Checkpoint files prefix for this partition */
-    private String checkpointFilesPrefix;
-
-    public BasicPartitionOwner() {
-    }
-
-    public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) {
-        this(partitionId, workerInfo, null, null);
-    }
-
-    public BasicPartitionOwner(int partitionId,
-                               WorkerInfo workerInfo,
-                               WorkerInfo previousWorkerInfo,
-                               String checkpointFilesPrefix) {
-        this.partitionId = partitionId;
-        this.workerInfo = workerInfo;
-        this.previousWorkerInfo = previousWorkerInfo;
-        this.checkpointFilesPrefix = checkpointFilesPrefix;
-    }
-
-    @Override
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    @Override
-    public WorkerInfo getWorkerInfo() {
-        return workerInfo;
-    }
-
-    @Override
-    public void setWorkerInfo(WorkerInfo workerInfo) {
-        this.workerInfo = workerInfo;
-    }
-
-    @Override
-    public WorkerInfo getPreviousWorkerInfo() {
-        return previousWorkerInfo;
-    }
-
-    @Override
-    public void setPreviousWorkerInfo(WorkerInfo workerInfo) {
-        this.previousWorkerInfo = workerInfo;
-    }
-
-    @Override
-    public String getCheckpointFilesPrefix() {
-        return checkpointFilesPrefix;
-    }
-
-    @Override
-    public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
-        this.checkpointFilesPrefix = checkpointFilesPrefix;
-    }
-
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        partitionId = input.readInt();
-        workerInfo = new WorkerInfo();
-        workerInfo.readFields(input);
-        boolean hasPreviousWorkerInfo = input.readBoolean();
-        if (hasPreviousWorkerInfo) {
-            previousWorkerInfo = new WorkerInfo();
-            previousWorkerInfo.readFields(input);
-        }
-        boolean hasCheckpointFilePrefix = input.readBoolean();
-        if (hasCheckpointFilePrefix) {
-            checkpointFilesPrefix = input.readUTF();
-        }
-    }
-
-    @Override
-    public void write(DataOutput output) throws IOException {
-        output.writeInt(partitionId);
-        workerInfo.write(output);
-        if (previousWorkerInfo != null) {
-            output.writeBoolean(true);
-            previousWorkerInfo.write(output);
-        } else {
-            output.writeBoolean(false);
-        }
-        if (checkpointFilesPrefix != null) {
-            output.writeBoolean(true);
-            output.writeUTF(checkpointFilesPrefix);
-        } else {
-            output.writeBoolean(false);
-        }
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    @Override
-    public String toString() {
-        return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" +
-               previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")";
-    }
+  /** Configuration */
+  private Configuration conf;
+  /** Partition id */
+  private int partitionId = -1;
+  /** Owning worker information */
+  private WorkerInfo workerInfo;
+  /** Previous (if any) worker info */
+  private WorkerInfo previousWorkerInfo;
+  /** Checkpoint files prefix for this partition */
+  private String checkpointFilesPrefix;
+
+  /**
+   * Default constructor.
+   */
+  public BasicPartitionOwner() { }
+
+  /**
+   * Constructor with partition id and worker info.
+   *
+   * @param partitionId Partition id of this partition.
+   * @param workerInfo Owner of the partition.
+   */
+  public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) {
+    this(partitionId, workerInfo, null, null);
+  }
+
+  /**
+   * Constructor with partition id and worker info.
+   *
+   * @param partitionId Partition id of this partition.
+   * @param workerInfo Owner of the partition.
+   * @param previousWorkerInfo Previous owner of this partition.
+   * @param checkpointFilesPrefix Prefix of the checkpoint files.
+   */
+  public BasicPartitionOwner(int partitionId,
+                             WorkerInfo workerInfo,
+                             WorkerInfo previousWorkerInfo,
+                             String checkpointFilesPrefix) {
+    this.partitionId = partitionId;
+    this.workerInfo = workerInfo;
+    this.previousWorkerInfo = previousWorkerInfo;
+    this.checkpointFilesPrefix = checkpointFilesPrefix;
+  }
+
+  @Override
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  @Override
+  public WorkerInfo getWorkerInfo() {
+    return workerInfo;
+  }
+
+  @Override
+  public void setWorkerInfo(WorkerInfo workerInfo) {
+    this.workerInfo = workerInfo;
+  }
+
+  @Override
+  public WorkerInfo getPreviousWorkerInfo() {
+    return previousWorkerInfo;
+  }
+
+  @Override
+  public void setPreviousWorkerInfo(WorkerInfo workerInfo) {
+    this.previousWorkerInfo = workerInfo;
+  }
+
+  @Override
+  public String getCheckpointFilesPrefix() {
+    return checkpointFilesPrefix;
+  }
+
+  @Override
+  public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
+    this.checkpointFilesPrefix = checkpointFilesPrefix;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    partitionId = input.readInt();
+    workerInfo = new WorkerInfo();
+    workerInfo.readFields(input);
+    boolean hasPreviousWorkerInfo = input.readBoolean();
+    if (hasPreviousWorkerInfo) {
+      previousWorkerInfo = new WorkerInfo();
+      previousWorkerInfo.readFields(input);
+    }
+    boolean hasCheckpointFilePrefix = input.readBoolean();
+    if (hasCheckpointFilePrefix) {
+      checkpointFilesPrefix = input.readUTF();
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(partitionId);
+    workerInfo.write(output);
+    if (previousWorkerInfo != null) {
+      output.writeBoolean(true);
+      previousWorkerInfo.write(output);
+    } else {
+      output.writeBoolean(false);
+    }
+    if (checkpointFilesPrefix != null) {
+      output.writeBoolean(true);
+      output.writeUTF(checkpointFilesPrefix);
+    } else {
+      output.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String toString() {
+    return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" +
+        previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")";
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java Thu Feb 16 22:12:31 2012
@@ -31,20 +31,20 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public interface GraphPartitionerFactory<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable> {
-    /**
-     * Create the {@link MasterGraphPartitioner} used by the master.
-     * Instantiated once by the master and reused.
-     *
-     * @return Instantiated master graph partitioner
-     */
-    MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Create the {@link MasterGraphPartitioner} used by the master.
+   * Instantiated once by the master and reused.
+   *
+   * @return Instantiated master graph partitioner
+   */
+  MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
 
-    /**
-     * Create the {@link WorkerGraphPartitioner} used by the worker.
-     * Instantiated once by every worker and reused.
-     *
-     * @return Instantiated worker graph partitioner
-     */
-    WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
+  /**
+   * Create the {@link WorkerGraphPartitioner} used by the worker.
+   * Instantiated once by every worker and reused.
+   *
+   * @return Instantiated worker graph partitioner
+   */
+  WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java Thu Feb 16 22:12:31 2012
@@ -39,119 +39,119 @@ import org.apache.log4j.Logger;
  */
 @SuppressWarnings("rawtypes")
 public class HashMasterPartitioner<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable> implements
-        MasterGraphPartitioner<I, V, E, M> {
-    /** Provided configuration */
-    private Configuration conf;
-    /** Specified partition count (overrides calculation) */
-    private final int userPartitionCount;
-    /** Partition count (calculated in createInitialPartitionOwners) */
-    private int partitionCount = -1;
-    /** Save the last generated partition owner list */
-    private List<PartitionOwner> partitionOwnerList;
-    /** Class logger */
-    private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
-
-    /**
-     * ZooKeeper has a limit of the data in a single znode of 1 MB and
-     * each entry can go be on the average somewhat more than 300 bytes
-     */
-    private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
-
-    /**
-     * Multiplier for the current workers squared
-     */
-    public static final String PARTITION_COUNT_MULTIPLIER =
-        "hash.masterPartitionCountMultipler";
-    public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
-
-    /** Overrides default partition count calculation if not -1 */
-    public static final String USER_PARTITION_COUNT =
-        "hash.userPartitionCount";
-    public static final int DEFAULT_USER_PARTITION_COUNT = -1;
-
-    public HashMasterPartitioner(Configuration conf) {
-        this.conf = conf;
-        userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
-                                         DEFAULT_USER_PARTITION_COUNT);
-    }
-
-    @Override
-    public Collection<PartitionOwner> createInitialPartitionOwners(
-            Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
-        if (availableWorkerInfos.isEmpty()) {
-            throw new IllegalArgumentException(
-                "createInitialPartitionOwners: No available workers");
-        }
-        List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
-        Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
-        if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
-            float multiplier = conf.getFloat(
-                PARTITION_COUNT_MULTIPLIER,
-                DEFAULT_PARTITION_COUNT_MULTIPLIER);
-            partitionCount =
-                Math.max((int) (multiplier * availableWorkerInfos.size() *
-                         availableWorkerInfos.size()),
-                         1);
-        } else {
-            partitionCount = userPartitionCount;
-        }
-        if (LOG.isInfoEnabled()) {
-            LOG.info("createInitialPartitionOwners: Creating " +
-                     partitionCount + ", default would have been " +
-                     (availableWorkerInfos.size() *
-                      availableWorkerInfos.size()) + " partitions.");
-        }
-        if (partitionCount > MAX_PARTTIONS) {
-            LOG.warn("createInitialPartitionOwners: " +
-                    "Reducing the partitionCount to " + MAX_PARTTIONS +
-                    " from " + partitionCount);
-            partitionCount = MAX_PARTTIONS;
-        }
-
-        for (int i = 0; i < partitionCount; ++i) {
-            PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
-            if (!workerIt.hasNext()) {
-                workerIt = availableWorkerInfos.iterator();
-            }
-            ownerList.add(owner);
-        }
-        this.partitionOwnerList = ownerList;
-        return ownerList;
-    }
-
-
-    @Override
-    public Collection<PartitionOwner> getCurrentPartitionOwners() {
-        return partitionOwnerList;
-    }
-
-    /**
-     * Subclasses can set the partition owner list.
-     *
-     * @param partitionOwnerList New partition owner list.
-     */
-    protected void setPartitionOwnerList(List<PartitionOwner>
-            partitionOwnerList) {
-        this.partitionOwnerList = partitionOwnerList;
-    }
-
-    @Override
-    public Collection<PartitionOwner> generateChangedPartitionOwners(
-            Collection<PartitionStats> allPartitionStatsList,
-            Collection<WorkerInfo> availableWorkerInfos,
-            int maxWorkers,
-            long superstep) {
-        return PartitionBalancer.balancePartitionsAcrossWorkers(
-            conf,
-            partitionOwnerList,
-            allPartitionStatsList,
-            availableWorkerInfos);
-    }
-
-    @Override
-    public PartitionStats createPartitionStats() {
-        return new PartitionStats();
-    }
-
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MasterGraphPartitioner<I, V, E, M> {
+  /** Multiplier for the current workers squared */
+  public static final String PARTITION_COUNT_MULTIPLIER =
+    "hash.masterPartitionCountMultipler";
+  /** Default mulitplier for current workers squared */
+  public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
+  /** Overrides default partition count calculation if not -1 */
+  public static final String USER_PARTITION_COUNT =
+    "hash.userPartitionCount";
+  /** Default user partition count */
+  public static final int DEFAULT_USER_PARTITION_COUNT = -1;
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
+  /**
+   * ZooKeeper has a limit of the data in a single znode of 1 MB and
+   * each entry can go be on the average somewhat more than 300 bytes
+   */
+  private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
+  /** Provided configuration */
+  private Configuration conf;
+  /** Specified partition count (overrides calculation) */
+  private final int userPartitionCount;
+  /** Partition count (calculated in createInitialPartitionOwners) */
+  private int partitionCount = -1;
+  /** Save the last generated partition owner list */
+  private List<PartitionOwner> partitionOwnerList;
+
+  /**
+   * Constructor.
+   *
+   *@param conf Configuration used.
+   */
+  public HashMasterPartitioner(Configuration conf) {
+    this.conf = conf;
+    userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
+        DEFAULT_USER_PARTITION_COUNT);
+  }
+
+  @Override
+  public Collection<PartitionOwner> createInitialPartitionOwners(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+    if (availableWorkerInfos.isEmpty()) {
+      throw new IllegalArgumentException(
+          "createInitialPartitionOwners: No available workers");
+    }
+    List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
+    Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
+    if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
+      float multiplier = conf.getFloat(
+          PARTITION_COUNT_MULTIPLIER,
+          DEFAULT_PARTITION_COUNT_MULTIPLIER);
+      partitionCount =
+          Math.max((int) (multiplier * availableWorkerInfos.size() *
+              availableWorkerInfos.size()),
+              1);
+    } else {
+      partitionCount = userPartitionCount;
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("createInitialPartitionOwners: Creating " +
+        partitionCount + ", default would have been " +
+        (availableWorkerInfos.size() *
+         availableWorkerInfos.size()) + " partitions.");
+    }
+    if (partitionCount > MAX_PARTTIONS) {
+      LOG.warn("createInitialPartitionOwners: " +
+          "Reducing the partitionCount to " + MAX_PARTTIONS +
+          " from " + partitionCount);
+      partitionCount = MAX_PARTTIONS;
+    }
+
+    for (int i = 0; i < partitionCount; ++i) {
+      PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
+      if (!workerIt.hasNext()) {
+        workerIt = availableWorkerInfos.iterator();
+      }
+      ownerList.add(owner);
+    }
+    this.partitionOwnerList = ownerList;
+    return ownerList;
+  }
+
+  @Override
+  public Collection<PartitionOwner> getCurrentPartitionOwners() {
+    return partitionOwnerList;
+  }
+
+  /**
+   * Subclasses can set the partition owner list.
+   *
+   * @param partitionOwnerList New partition owner list.
+   */
+  protected void setPartitionOwnerList(List<PartitionOwner>
+  partitionOwnerList) {
+    this.partitionOwnerList = partitionOwnerList;
+  }
+
+  @Override
+  public Collection<PartitionOwner> generateChangedPartitionOwners(
+      Collection<PartitionStats> allPartitionStatsList,
+      Collection<WorkerInfo> availableWorkerInfos,
+      int maxWorkers,
+      long superstep) {
+    return PartitionBalancer.balancePartitionsAcrossWorkers(
+        conf,
+        partitionOwnerList,
+        allPartitionStatsList,
+        availableWorkerInfos);
+  }
+
+  @Override
+  public PartitionStats createPartitionStats() {
+    return new PartitionStats();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java Thu Feb 16 22:12:31 2012
@@ -34,28 +34,28 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public class HashPartitionerFactory<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable>
-        implements Configurable,
-        GraphPartitionerFactory<I, V, E, M> {
-    private Configuration conf;
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Configurable, GraphPartitionerFactory<I, V, E, M> {
+  /** Saved configuration */
+  private Configuration conf;
 
-    @Override
-    public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
-        return new HashMasterPartitioner<I, V, E, M>(getConf());
-    }
+  @Override
+  public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+    return new HashMasterPartitioner<I, V, E, M>(getConf());
+  }
 
-    @Override
-    public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
-        return new HashWorkerPartitioner<I, V, E, M>();
-    }
+  @Override
+  public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+    return new HashWorkerPartitioner<I, V, E, M>();
+  }
 
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
 
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java Thu Feb 16 22:12:31 2012
@@ -34,27 +34,28 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public class HashRangePartitionerFactory<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable>
-        implements Configurable, GraphPartitionerFactory<I, V, E, M> {
-    private Configuration conf;
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Configurable, GraphPartitionerFactory<I, V, E, M> {
+  /** Saved configuration */
+  private Configuration conf;
 
-    @Override
-    public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
-        return new HashMasterPartitioner<I, V, E, M>(getConf());
-    }
+  @Override
+  public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+    return new HashMasterPartitioner<I, V, E, M>(getConf());
+  }
 
-    @Override
-    public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
-        return new HashRangeWorkerPartitioner<I, V, E, M>();
-    }
+  @Override
+  public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+    return new HashRangeWorkerPartitioner<I, V, E, M>();
+  }
 
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
 
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java Thu Feb 16 22:12:31 2012
@@ -31,12 +31,12 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public class HashRangeWorkerPartitioner<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable>
-        extends HashWorkerPartitioner<I, V, E, M> {
-    @Override
-    public PartitionOwner getPartitionOwner(I vertexId) {
-        int rangeSize = Integer.MAX_VALUE / getPartitionOwners().size();
-        int index = Math.abs(vertexId.hashCode()) / rangeSize;
-        return partitionOwnerList.get(index);
-    }
+    V extends Writable, E extends Writable, M extends Writable>
+    extends HashWorkerPartitioner<I, V, E, M> {
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    int rangeSize = Integer.MAX_VALUE / getPartitionOwners().size();
+    int index = Math.abs(vertexId.hashCode()) / rangeSize;
+    return partitionOwnerList.get(index);
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Thu Feb 16 22:12:31 2012
@@ -40,77 +40,77 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public class HashWorkerPartitioner<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable>
-        implements WorkerGraphPartitioner<I, V, E, M> {
-    /** Mapping of the vertex ids to {@link PartitionOwner} */
-    protected List<PartitionOwner> partitionOwnerList =
-        new ArrayList<PartitionOwner>();
-
-    @Override
-    public PartitionOwner createPartitionOwner() {
-        return new BasicPartitionOwner();
-    }
-
-    @Override
-    public PartitionOwner getPartitionOwner(I vertexId) {
-        return partitionOwnerList.get(Math.abs(vertexId.hashCode())
-                % partitionOwnerList.size());
-    }
-
-    @Override
-    public Collection<PartitionStats> finalizePartitionStats(
-            Collection<PartitionStats> workerPartitionStats,
-            Map<Integer, Partition<I, V, E, M>> partitionMap) {
-        // No modification necessary
-        return workerPartitionStats;
-    }
-
-    @Override
-    public PartitionExchange updatePartitionOwners(
-            WorkerInfo myWorkerInfo,
-            Collection<? extends PartitionOwner> masterSetPartitionOwners,
-            Map<Integer, Partition<I, V, E, M>> partitionMap) {
-        partitionOwnerList.clear();
-        partitionOwnerList.addAll(masterSetPartitionOwners);
-
-        Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
-        Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
-            new HashMap<WorkerInfo, List<Integer>>();
-        for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
-            if (partitionOwner.getPreviousWorkerInfo() == null) {
-                continue;
-            } else if (partitionOwner.getWorkerInfo().equals(
-                       myWorkerInfo) &&
-                       partitionOwner.getPreviousWorkerInfo().equals(
-                       myWorkerInfo)) {
-                throw new IllegalStateException(
-                    "updatePartitionOwners: Impossible to have the same " +
-                    "previous and current worker info " + partitionOwner +
-                    " as me " + myWorkerInfo);
-            } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
-                dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
-            } else if (partitionOwner.getPreviousWorkerInfo().equals(
-                    myWorkerInfo)) {
-                if (workerPartitionOwnerMap.containsKey(
-                        partitionOwner.getWorkerInfo())) {
-                    workerPartitionOwnerMap.get(
-                        partitionOwner.getWorkerInfo()).add(
-                            partitionOwner.getPartitionId());
-                } else {
-                    List<Integer> partitionOwnerList = new ArrayList<Integer>();
-                    partitionOwnerList.add(partitionOwner.getPartitionId());
-                    workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
-                                                partitionOwnerList);
-                }
-            }
+    V extends Writable, E extends Writable, M extends Writable>
+    implements WorkerGraphPartitioner<I, V, E, M> {
+  /** Mapping of the vertex ids to {@link PartitionOwner} */
+  protected List<PartitionOwner> partitionOwnerList =
+      new ArrayList<PartitionOwner>();
+
+  @Override
+  public PartitionOwner createPartitionOwner() {
+    return new BasicPartitionOwner();
+  }
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
+      partitionOwnerList.size());
+  }
+
+  @Override
+  public Collection<PartitionStats> finalizePartitionStats(
+      Collection<PartitionStats> workerPartitionStats,
+      Map<Integer, Partition<I, V, E, M>> partitionMap) {
+    // No modification necessary
+    return workerPartitionStats;
+  }
+
+  @Override
+  public PartitionExchange updatePartitionOwners(
+      WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners,
+      Map<Integer, Partition<I, V, E, M>> partitionMap) {
+    partitionOwnerList.clear();
+    partitionOwnerList.addAll(masterSetPartitionOwners);
+
+    Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
+    Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
+        new HashMap<WorkerInfo, List<Integer>>();
+    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+      if (partitionOwner.getPreviousWorkerInfo() == null) {
+        continue;
+      } else if (partitionOwner.getWorkerInfo().equals(
+          myWorkerInfo) &&
+          partitionOwner.getPreviousWorkerInfo().equals(
+              myWorkerInfo)) {
+        throw new IllegalStateException(
+            "updatePartitionOwners: Impossible to have the same " +
+                "previous and current worker info " + partitionOwner +
+                " as me " + myWorkerInfo);
+      } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
+        dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
+      } else if (partitionOwner.getPreviousWorkerInfo().equals(
+          myWorkerInfo)) {
+        if (workerPartitionOwnerMap.containsKey(
+            partitionOwner.getWorkerInfo())) {
+          workerPartitionOwnerMap.get(
+              partitionOwner.getWorkerInfo()).add(
+                  partitionOwner.getPartitionId());
+        } else {
+          List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
+          tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
+          workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
+                                      tmpPartitionOwnerList);
         }
-
-        return new PartitionExchange(dependentWorkerSet,
-                                     workerPartitionOwnerMap);
+      }
     }
 
-    @Override
-    public Collection<? extends PartitionOwner> getPartitionOwners() {
-        return partitionOwnerList;
-    }
+    return new PartitionExchange(dependentWorkerSet,
+        workerPartitionOwnerMap);
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> getPartitionOwners() {
+    return partitionOwnerList;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java Thu Feb 16 22:12:31 2012
@@ -35,48 +35,49 @@ import org.apache.giraph.graph.WorkerInf
  */
 @SuppressWarnings("rawtypes")
 public interface MasterGraphPartitioner<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable> {
-    /**
-     * Set some initial partition owners for the graph. Guaranteed to be called
-     * prior to the graph being loaded (initial or restart).
-     *
-     * @param availableWorkerInfos Workers available for partition assignment
-     * @param maxWorkers Maximum number of workers
-     */
-    Collection<PartitionOwner> createInitialPartitionOwners(
-            Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Set some initial partition owners for the graph. Guaranteed to be called
+   * prior to the graph being loaded (initial or restart).
+   *
+   * @param availableWorkerInfos Workers available for partition assignment
+   * @param maxWorkers Maximum number of workers
+   * @return Collection of generated partition owners.
+   */
+  Collection<PartitionOwner> createInitialPartitionOwners(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
 
-    /**
-     * After the worker stats have been merged to a single list, the master can
-     * use this information to send commands to the workers for any
-     * {@link Partition} changes. This protocol is specific to the
-     * {@link MasterGraphPartitioner} implementation.
-     *
-     * @param allPartitionStatsList All partition stats from all workers.
-     * @param availableWorkers Workers available for partition assignment
-     * @param maxWorkers Maximum number of workers
-     * @param superstep Partition owners will be set for this superstep
-     * @return Collection of {@link PartitionOwner} objects that changed from
-     *         the previous superstep, empty list if no change.
-     */
-    Collection<PartitionOwner> generateChangedPartitionOwners(
-            Collection<PartitionStats> allPartitionStatsList,
-            Collection<WorkerInfo> availableWorkers,
-            int maxWorkers,
-            long superstep);
+  /**
+   * After the worker stats have been merged to a single list, the master can
+   * use this information to send commands to the workers for any
+   * {@link Partition} changes. This protocol is specific to the
+   * {@link MasterGraphPartitioner} implementation.
+   *
+   * @param allPartitionStatsList All partition stats from all workers.
+   * @param availableWorkers Workers available for partition assignment
+   * @param maxWorkers Maximum number of workers
+   * @param superstep Partition owners will be set for this superstep
+   * @return Collection of {@link PartitionOwner} objects that changed from
+   *         the previous superstep, empty list if no change.
+   */
+  Collection<PartitionOwner> generateChangedPartitionOwners(
+      Collection<PartitionStats> allPartitionStatsList,
+      Collection<WorkerInfo> availableWorkers,
+      int maxWorkers,
+      long superstep);
 
-    /**
-     * Get current partition owners at this time.
-     *
-     * @return Collection of current {@link PartitionOwner} objects
-     */
-    Collection<PartitionOwner> getCurrentPartitionOwners();
+  /**
+   * Get current partition owners at this time.
+   *
+   * @return Collection of current {@link PartitionOwner} objects
+   */
+  Collection<PartitionOwner> getCurrentPartitionOwners();
 
-    /**
-     * Instantiate the {@link PartitionStats} implementation used to read the
-     * worker stats
-     *
-     * @return Instantiated {@link PartitionStats} object
-     */
-    PartitionStats createPartitionStats();
+  /**
+   * Instantiate the {@link PartitionStats} implementation used to read the
+   * worker stats
+   *
+   * @return Instantiated {@link PartitionStats} object
+   */
+  PartitionStats createPartitionStats();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Thu Feb 16 22:12:31 2012
@@ -31,115 +31,126 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-
 /**
  * A generic container that stores vertices.  Vertex ids will map to exactly
  * one partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class Partition<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable>
-        implements Writable {
-    /** Configuration from the worker */
-    private final Configuration conf;
-    /** Partition id */
-    private final int partitionId;
-    /** Vertex map for this range (keyed by index) */
-    private final Map<I, BasicVertex<I, V, E, M>> vertexMap =
-        new HashMap<I, BasicVertex<I, V, E, M>>();
-
-    public Partition(Configuration conf, int partitionId) {
-        this.conf = conf;
-        this.partitionId = partitionId;
-    }
-
-    /**
-     * Get the vertex for this vertex index.
-     *
-     * @param vertexIndex Vertex index to search for
-     * @return Vertex if it exists, null otherwise
-     */
-    public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
-        return vertexMap.get(vertexIndex);
-    }
-
-    /**
-     * Put a vertex into the Partition
-     *
-     * @param vertex Vertex to put in the Partition
-     * @return old vertex value (i.e. null if none existed prior)
-     */
-    public BasicVertex<I, V, E, M> putVertex(BasicVertex<I, V, E, M> vertex) {
-        return vertexMap.put(vertex.getVertexId(), vertex);
-    }
-
-    /**
-     * Remove a vertex from the Partition
-     *
-     * @param vertexIndex Vertex index to remove
-     */
-    public BasicVertex<I, V, E, M> removeVertex(I vertexIndex) {
-        return vertexMap.remove(vertexIndex);
-    }
-
-    /**
-     * Get a collection of the vertices.
-     *
-     * @return Collection of the vertices
-     */
-    public Collection<BasicVertex<I, V, E , M>> getVertices() {
-        return vertexMap.values();
-    }
-
-    /**
-     * Get the number of edges in this partition.  Computed on the fly.
-     *
-     * @return Number of edges.
-     */
-    public long getEdgeCount() {
-        long edges = 0;
-        for (BasicVertex<I, V, E, M> vertex : vertexMap.values()) {
-            edges += vertex.getNumOutEdges();
-        }
-        return edges;
-    }
-
-    /**
-     * Get the partition id.
-     *
-     * @return Partition id of this partition.
-     */
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    @Override
-    public String toString() {
-        return "(id=" + getPartitionId() + ",V=" + vertexMap.size() +
-            ",E=" + getEdgeCount() + ")";
-    }
-
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        int vertices = input.readInt();
-        for (int i = 0; i < vertices; ++i) {
-            BasicVertex<I, V, E, M> vertex =
-                BspUtils.<I, V, E, M>createVertex(conf);
-            vertex.readFields(input);
-            if (vertexMap.put(vertex.getVertexId(),
-                              (BasicVertex<I, V, E, M>) vertex) != null) {
-                throw new IllegalStateException(
-                    "readFields: " + this +
-                    " already has same id " + vertex);
-            }
-        }
-    }
-
-    @Override
-    public void write(DataOutput output) throws IOException {
-        output.writeInt(vertexMap.size());
-        for (BasicVertex vertex : vertexMap.values()) {
-            vertex.write(output);
-        }
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Writable {
+  /** Configuration from the worker */
+  private final Configuration conf;
+  /** Partition id */
+  private final int partitionId;
+  /** Vertex map for this range (keyed by index) */
+  private final Map<I, BasicVertex<I, V, E, M>> vertexMap =
+      new HashMap<I, BasicVertex<I, V, E, M>>();
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration.
+   * @param partitionId Partition id.
+   */
+  public Partition(Configuration conf, int partitionId) {
+    this.conf = conf;
+    this.partitionId = partitionId;
+  }
+
+  /**
+   * Get the vertex for this vertex index.
+   *
+   * @param vertexIndex Vertex index to search for
+   * @return Vertex if it exists, null otherwise
+   */
+  public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
+    return vertexMap.get(vertexIndex);
+  }
+
+  /**
+   * Put a vertex into the Partition
+   *
+   * @param vertex Vertex to put in the Partition
+   * @return old vertex value (i.e. null if none existed prior)
+   */
+  public BasicVertex<I, V, E, M> putVertex(BasicVertex<I, V, E, M> vertex) {
+    return vertexMap.put(vertex.getVertexId(), vertex);
+  }
+
+  /**
+   * Remove a vertex from the Partition
+   *
+   * @param vertexIndex Vertex index to remove
+   * @return The removed vertex.
+   */
+  public BasicVertex<I, V, E, M> removeVertex(I vertexIndex) {
+    return vertexMap.remove(vertexIndex);
+  }
+
+  /**
+   * Get a collection of the vertices.
+   *
+   * @return Collection of the vertices
+   */
+  public Collection<BasicVertex<I, V, E , M>> getVertices() {
+    return vertexMap.values();
+  }
+
+  /**
+   * Get the number of edges in this partition.  Computed on the fly.
+   *
+   * @return Number of edges.
+   */
+  public long getEdgeCount() {
+    long edges = 0;
+    for (BasicVertex<I, V, E, M> vertex : vertexMap.values()) {
+      edges += vertex.getNumOutEdges();
+    }
+    return edges;
+  }
+
+  /**
+   * Get the partition id.
+   *
+   * @return Partition id of this partition.
+   */
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  @Override
+  public String toString() {
+    return "(id=" + getPartitionId() + ",V=" + vertexMap.size() +
+        ",E=" + getEdgeCount() + ")";
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    int vertices = input.readInt();
+    for (int i = 0; i < vertices; ++i) {
+      BasicVertex<I, V, E, M> vertex =
+        BspUtils.<I, V, E, M>createVertex(conf);
+      vertex.readFields(input);
+      if (vertexMap.put(vertex.getVertexId(),
+          (BasicVertex<I, V, E, M>) vertex) != null) {
+        throw new IllegalStateException(
+            "readFields: " + this +
+            " already has same id " + vertex);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(vertexMap.size());
+    for (BasicVertex vertex : vertexMap.values()) {
+      vertex.write(output);
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java Thu Feb 16 22:12:31 2012
@@ -35,234 +35,254 @@ import org.apache.log4j.Logger;
  * Helper class for balancing partitions across a set of workers.
  */
 public class PartitionBalancer {
-    /** Partition balancing algorithm */
-    public static final String PARTITION_BALANCE_ALGORITHM =
-        "hash.partitionBalanceAlgorithm";
-    public static final String STATIC_BALANCE_ALGORITHM =
-        "static";
-    public static final String EGDE_BALANCE_ALGORITHM =
-        "edges";
-    public static final String VERTICES_BALANCE_ALGORITHM =
-        "vertices";
-    /** Class logger */
-    private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
-
-    /**
-     * What value to balance partitions with?  Edges, vertices?
-     */
-    private enum BalanceValue {
-        UNSET,
-        EDGES,
-        VERTICES
+  /** Partition balancing algorithm */
+  public static final String PARTITION_BALANCE_ALGORITHM =
+    "hash.partitionBalanceAlgorithm";
+  /** No rebalancing during the supersteps */
+  public static final String STATIC_BALANCE_ALGORITHM =
+    "static";
+  /** Rebalance across supersteps by edges */
+  public static final String EGDE_BALANCE_ALGORITHM =
+    "edges";
+  /** Rebalance across supersteps by vertices */
+  public static final String VERTICES_BALANCE_ALGORITHM =
+    "vertices";
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
+
+  /**
+   * What value to balance partitions with?  Edges, vertices?
+   */
+  private enum BalanceValue {
+    /** Not chosen */
+    UNSET,
+    /** Balance with edges */
+    EDGES,
+    /** Balance with vertices */
+    VERTICES
+  }
+
+  /**
+   * Do not construct this class.
+   */
+  private PartitionBalancer() { }
+
+  /**
+   * Get the value used to balance.
+   *
+   * @param partitionStat Stats of this partition.
+   * @param balanceValue Type of the value to balance.
+   * @return Balance value.
+   */
+  private static long getBalanceValue(PartitionStats partitionStat,
+      BalanceValue balanceValue) {
+    switch (balanceValue) {
+    case EDGES:
+      return partitionStat.getEdgeCount();
+    case VERTICES:
+      return partitionStat.getVertexCount();
+    default:
+      throw new IllegalArgumentException(
+          "getBalanceValue: Illegal balance value " + balanceValue);
     }
+  }
+
+  /**
+   * Used to sort the partition owners from lowest value to highest value
+   */
+  private static class PartitionOwnerComparator implements
+      Comparator<PartitionOwner> {
+    /** Map of owner to stats */
+    private final Map<PartitionOwner, PartitionStats> ownerStatMap;
+    /** Value type to compare on */
+    private final BalanceValue balanceValue;
+
 
     /**
-     * Get the value used to balance.
+     * Only constructor.
      *
-     * @param partitionStat
-     * @param balanceValue
-     * @return
+     * @param ownerStatMap Map of owners to stats.
+     * @param balanceValue Value to balance with.
      */
-    private static long getBalanceValue(PartitionStats partitionStat,
-                                        BalanceValue balanceValue) {
-        switch (balanceValue) {
-            case EDGES:
-                return partitionStat.getEdgeCount();
-            case VERTICES:
-                return partitionStat.getVertexCount();
-            default:
-                throw new IllegalArgumentException(
-                    "getBalanceValue: Illegal balance value " + balanceValue);
-        }
+    public PartitionOwnerComparator(
+        Map<PartitionOwner, PartitionStats> ownerStatMap,
+        BalanceValue balanceValue) {
+      this.ownerStatMap = ownerStatMap;
+      this.balanceValue = balanceValue;
+    }
+
+    @Override
+    public int compare(PartitionOwner owner1, PartitionOwner owner2) {
+      return (int)
+          (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
+              getBalanceValue(ownerStatMap.get(owner2), balanceValue));
     }
+  }
+
+  /**
+   * Structure to keep track of how much value a {@link WorkerInfo} has
+   * been assigned.
+   */
+  private static class WorkerInfoAssignments implements
+      Comparable<WorkerInfoAssignments> {
+    /** Worker info associated */
+    private final WorkerInfo workerInfo;
+    /** Balance value */
+    private final BalanceValue balanceValue;
+    /** Map of owner to stats */
+    private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
+    /** Current value of this object */
+    private long value = 0;
 
     /**
-     * Used to sort the partition owners from lowest value to highest value
+     * Constructor with final values.
+     *
+     * @param workerInfo Worker info for assignment.
+     * @param balanceValue Value used to balance.
+     * @param ownerStatsMap Map of owner to stats.
      */
-    private static class PartitionOwnerComparator implements
-            Comparator<PartitionOwner> {
-        /** Map of owner to stats */
-        private final Map<PartitionOwner, PartitionStats> ownerStatMap;
-        /** Value type to compare on */
-        private final BalanceValue balanceValue;
-
-
-        /**
-         * Only constructor.
-         *
-         * @param comparatorValue What to compare with?
-         */
-        public PartitionOwnerComparator(
-                Map<PartitionOwner, PartitionStats> ownerStatMap,
-                BalanceValue balanceValue) {
-            this.ownerStatMap = ownerStatMap;
-            this.balanceValue = balanceValue;
-        }
-
-        @Override
-        public int compare(PartitionOwner owner1, PartitionOwner owner2) {
-            return (int)
-                (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
-                 getBalanceValue(ownerStatMap.get(owner2), balanceValue));
-        }
+    public WorkerInfoAssignments(
+        WorkerInfo workerInfo,
+        BalanceValue balanceValue,
+        Map<PartitionOwner, PartitionStats> ownerStatsMap) {
+      this.workerInfo = workerInfo;
+      this.balanceValue = balanceValue;
+      this.ownerStatsMap = ownerStatsMap;
     }
 
     /**
-     * Structure to keep track of how much value a {@link WorkerInfo} has
-     * been assigned.
+     * Get the total value of all partitions assigned to this worker.
+     *
+     * @return Total value of all partition assignments.
      */
-    private static class WorkerInfoAssignments implements
-            Comparable<WorkerInfoAssignments> {
-        /** Worker info associated */
-        private final WorkerInfo workerInfo;
-        /** Balance value */
-        private final BalanceValue balanceValue;
-        /** Map of owner to stats */
-        private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
-        /** Current value of this object */
-        private long value = 0;
-
-        public WorkerInfoAssignments(
-                WorkerInfo workerInfo,
-                BalanceValue balanceValue,
-                Map<PartitionOwner, PartitionStats> ownerStatsMap) {
-            this.workerInfo = workerInfo;
-            this.balanceValue = balanceValue;
-            this.ownerStatsMap = ownerStatsMap;
-        }
-
-        /**
-         * Get the total value of all partitions assigned to this worker.
-         *
-         * @return Total value of all partition assignments.
-         */
-        public long getValue() {
-            return value;
-        }
-
-        /**
-         * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
-         *
-         * @param partitionOwner PartitionOwner to assign.
-         */
-        public void assignPartitionOwner(
-                PartitionOwner partitionOwner) {
-            value += getBalanceValue(ownerStatsMap.get(partitionOwner),
-                                     balanceValue);
-            if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
-                partitionOwner.setPreviousWorkerInfo(
-                    partitionOwner.getWorkerInfo());
-                partitionOwner.setWorkerInfo(workerInfo);
-            } else {
-                partitionOwner.setPreviousWorkerInfo(null);
-            }
-        }
-
-        @Override
-        public int compareTo(WorkerInfoAssignments other) {
-            return (int)
-                (getValue() - ((WorkerInfoAssignments) other).getValue());
-        }
+    public long getValue() {
+      return value;
     }
 
     /**
-     * Balance the partitions with an algorithm based on a value.
+     * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
      *
-     * @param conf Configuration to find the algorithm
-     * @param allPartitionStats All the partition stats
-     * @param availableWorkerInfos All the available workers
-     * @return Balanced partition owners
+     * @param partitionOwner PartitionOwner to assign.
      */
-    public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
-        Configuration conf,
-        Collection<PartitionOwner> partitionOwners,
-        Collection<PartitionStats> allPartitionStats,
-        Collection<WorkerInfo> availableWorkerInfos) {
-
-        String balanceAlgorithm =
-            conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
-        if (LOG.isInfoEnabled()) {
-            LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
-                     balanceAlgorithm);
-        }
-        BalanceValue balanceValue = BalanceValue.UNSET;
-        if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
-            return partitionOwners;
-        } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
-            balanceValue = BalanceValue.EDGES;
-        } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
-            balanceValue = BalanceValue.VERTICES;
-        } else {
-            throw new IllegalArgumentException(
-                "balancePartitionsAcrossWorkers: Illegal balance " +
-                "algorithm - " + balanceAlgorithm);
-        }
-
-        // Join the partition stats and partition owners by partition id
-        Map<Integer, PartitionStats> idStatMap =
-            new HashMap<Integer, PartitionStats>();
-        for (PartitionStats partitionStats : allPartitionStats) {
-            if (idStatMap.put(partitionStats.getPartitionId(), partitionStats)
-                    != null) {
-                throw new IllegalStateException(
-                    "balancePartitionsAcrossWorkers: Duplicate partition id " +
-                    "for " + partitionStats);
-            }
-        }
-        Map<PartitionOwner, PartitionStats> ownerStatsMap =
-            new HashMap<PartitionOwner, PartitionStats>();
-        for (PartitionOwner partitionOwner : partitionOwners) {
-            PartitionStats partitionStats =
-                idStatMap.get(partitionOwner.getPartitionId());
-            if (partitionStats == null) {
-                throw new IllegalStateException(
-                    "balancePartitionsAcrossWorkers: Missing partition " +
-                    "stats for " + partitionOwner);
-            }
-            if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
-                throw new IllegalStateException(
-                    "balancePartitionsAcrossWorkers: Duplicate partition " +
-                    "owner " + partitionOwner);
-            }
-        }
-        if (ownerStatsMap.size() != partitionOwners.size()) {
-            throw new IllegalStateException(
-                "balancePartitionsAcrossWorkers: ownerStats count = " +
-                ownerStatsMap.size() + ", partitionOwners count = " +
-                partitionOwners.size() + " and should match.");
-        }
-
-        List<WorkerInfoAssignments> workerInfoAssignmentsList =
-            new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
-        for (WorkerInfo workerInfo : availableWorkerInfos) {
-            workerInfoAssignmentsList.add(
-                new WorkerInfoAssignments(
-                    workerInfo, balanceValue, ownerStatsMap));
-        }
-
-        // A simple heuristic for balancing the partitions across the workers
-        // using a value (edges, vertices).  An improvement would be to
-        // take into account the already existing partition worker assignments.
-        // 1.  Sort the partitions by size
-        // 2.  Place the workers in a min heap sorted by their total balance
-        //     value.
-        // 3.  From largest partition to the smallest, take the partition
-        //     worker at the top of the heap, add the partition to it, and
-        //     then put it back in the heap
-        List<PartitionOwner> partitionOwnerList =
-            new ArrayList<PartitionOwner>(partitionOwners);
-        Collections.sort(partitionOwnerList,
-            Collections.reverseOrder(
-                new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
-        PriorityQueue<WorkerInfoAssignments> minQueue =
-            new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
-        for (PartitionOwner partitionOwner : partitionOwnerList) {
-            WorkerInfoAssignments chosenWorker = minQueue.remove();
-            chosenWorker.assignPartitionOwner(partitionOwner);
-            minQueue.add(chosenWorker);
-        }
+    public void assignPartitionOwner(
+        PartitionOwner partitionOwner) {
+      value += getBalanceValue(ownerStatsMap.get(partitionOwner),
+          balanceValue);
+      if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
+        partitionOwner.setPreviousWorkerInfo(
+            partitionOwner.getWorkerInfo());
+        partitionOwner.setWorkerInfo(workerInfo);
+      } else {
+        partitionOwner.setPreviousWorkerInfo(null);
+      }
+    }
 
-        return partitionOwnerList;
+    @Override
+    public int compareTo(WorkerInfoAssignments other) {
+      return (int)
+          (getValue() - ((WorkerInfoAssignments) other).getValue());
     }
+  }
+
+  /**
+   * Balance the partitions with an algorithm based on a value.
+   *
+   * @param conf Configuration to find the algorithm
+   * @param partitionOwners All the owners of all partitions
+   * @param allPartitionStats All the partition stats
+   * @param availableWorkerInfos All the available workers
+   * @return Balanced partition owners
+   */
+  public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
+      Configuration conf,
+      Collection<PartitionOwner> partitionOwners,
+      Collection<PartitionStats> allPartitionStats,
+      Collection<WorkerInfo> availableWorkerInfos) {
+
+    String balanceAlgorithm =
+        conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
+          balanceAlgorithm);
+    }
+    BalanceValue balanceValue = BalanceValue.UNSET;
+    if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
+      return partitionOwners;
+    } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
+      balanceValue = BalanceValue.EDGES;
+    } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
+      balanceValue = BalanceValue.VERTICES;
+    } else {
+      throw new IllegalArgumentException(
+          "balancePartitionsAcrossWorkers: Illegal balance " +
+              "algorithm - " + balanceAlgorithm);
+    }
+
+    // Join the partition stats and partition owners by partition id
+    Map<Integer, PartitionStats> idStatMap =
+        new HashMap<Integer, PartitionStats>();
+    for (PartitionStats partitionStats : allPartitionStats) {
+      if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
+          null) {
+        throw new IllegalStateException(
+            "balancePartitionsAcrossWorkers: Duplicate partition id " +
+                "for " + partitionStats);
+      }
+    }
+    Map<PartitionOwner, PartitionStats> ownerStatsMap =
+        new HashMap<PartitionOwner, PartitionStats>();
+    for (PartitionOwner partitionOwner : partitionOwners) {
+      PartitionStats partitionStats =
+          idStatMap.get(partitionOwner.getPartitionId());
+      if (partitionStats == null) {
+        throw new IllegalStateException(
+            "balancePartitionsAcrossWorkers: Missing partition " +
+                "stats for " + partitionOwner);
+      }
+      if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
+        throw new IllegalStateException(
+            "balancePartitionsAcrossWorkers: Duplicate partition " +
+                "owner " + partitionOwner);
+      }
+    }
+    if (ownerStatsMap.size() != partitionOwners.size()) {
+      throw new IllegalStateException(
+          "balancePartitionsAcrossWorkers: ownerStats count = " +
+              ownerStatsMap.size() + ", partitionOwners count = " +
+              partitionOwners.size() + " and should match.");
+    }
+
+    List<WorkerInfoAssignments> workerInfoAssignmentsList =
+        new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
+    for (WorkerInfo workerInfo : availableWorkerInfos) {
+      workerInfoAssignmentsList.add(
+          new WorkerInfoAssignments(
+              workerInfo, balanceValue, ownerStatsMap));
+    }
+
+    // A simple heuristic for balancing the partitions across the workers
+    // using a value (edges, vertices).  An improvement would be to
+    // take into account the already existing partition worker assignments.
+    // 1.  Sort the partitions by size
+    // 2.  Place the workers in a min heap sorted by their total balance
+    //     value.
+    // 3.  From largest partition to the smallest, take the partition
+    //     worker at the top of the heap, add the partition to it, and
+    //     then put it back in the heap
+    List<PartitionOwner> partitionOwnerList =
+        new ArrayList<PartitionOwner>(partitionOwners);
+    Collections.sort(partitionOwnerList,
+        Collections.reverseOrder(
+            new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
+    PriorityQueue<WorkerInfoAssignments> minQueue =
+        new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
+    for (PartitionOwner partitionOwner : partitionOwnerList) {
+      WorkerInfoAssignments chosenWorker = minQueue.remove();
+      chosenWorker.assignPartitionOwner(partitionOwner);
+      minQueue.add(chosenWorker);
+    }
+
+    return partitionOwnerList;
+  }
 }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java Thu Feb 16 22:12:31 2012
@@ -29,49 +29,49 @@ import org.apache.giraph.graph.WorkerInf
  * exchange between workers.
  */
 public class PartitionExchange {
-    /** Workers that I am dependent on before I can continue */
-    private final Set<WorkerInfo> myDependencyWorkerSet;
-    /** Workers that I need to sent partitions to */
-    private final Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap;
+  /** Workers that I am dependent on before I can continue */
+  private final Set<WorkerInfo> myDependencyWorkerSet;
+  /** Workers that I need to sent partitions to */
+  private final Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap;
 
-    /**
-     * Only constructor.
-     *
-     * @param myDependencyWorkerSet All the workers I must wait for
-     * @param sendWorkerPartitionMap Partitions I need to send to other workers
-     */
-    public PartitionExchange(
-            Set<WorkerInfo> myDependencyWorkerSet,
-            Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap) {
-        this.myDependencyWorkerSet = myDependencyWorkerSet;
-        this.sendWorkerPartitionMap = sendWorkerPartitionMap;
-    }
+  /**
+   * Only constructor.
+   *
+   * @param myDependencyWorkerSet All the workers I must wait for
+   * @param sendWorkerPartitionMap Partitions I need to send to other workers
+   */
+  public PartitionExchange(
+      Set<WorkerInfo> myDependencyWorkerSet,
+      Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap) {
+    this.myDependencyWorkerSet = myDependencyWorkerSet;
+    this.sendWorkerPartitionMap = sendWorkerPartitionMap;
+  }
 
-    /**
-     * Get the workers that I must wait for
-     *
-     * @return Set of workers I must wait for
-     */
-    public Set<WorkerInfo> getMyDependencyWorkerSet() {
-        return myDependencyWorkerSet;
-    }
+  /**
+   * Get the workers that I must wait for
+   *
+   * @return Set of workers I must wait for
+   */
+  public Set<WorkerInfo> getMyDependencyWorkerSet() {
+    return myDependencyWorkerSet;
+  }
 
-    /**
-     * Get a mapping of worker to list of partition ids I need to send to.
-     *
-     * @return Mapping of worker to partition id list I will send to.
-     */
-    public Map<WorkerInfo, List<Integer>> getSendWorkerPartitionMap() {
-        return sendWorkerPartitionMap;
-    }
+  /**
+   * Get a mapping of worker to list of partition ids I need to send to.
+   *
+   * @return Mapping of worker to partition id list I will send to.
+   */
+  public Map<WorkerInfo, List<Integer>> getSendWorkerPartitionMap() {
+    return sendWorkerPartitionMap;
+  }
 
-    /**
-     * Is this worker involved in a partition exchange?  Receiving or sending?
-     *
-     * @return True if needs to be involved in the exchange, false otherwise.
-     */
-    public boolean doExchange() {
-        return !myDependencyWorkerSet.isEmpty() ||
-               !sendWorkerPartitionMap.isEmpty();
-    }
+  /**
+   * Is this worker involved in a partition exchange?  Receiving or sending?
+   *
+   * @return True if needs to be involved in the exchange, false otherwise.
+   */
+  public boolean doExchange() {
+    return !myDependencyWorkerSet.isEmpty() ||
+        !sendWorkerPartitionMap.isEmpty();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java Thu Feb 16 22:12:31 2012
@@ -25,57 +25,57 @@ import org.apache.hadoop.io.Writable;
  * Metadata about ownership of a partition.
  */
 public interface PartitionOwner extends Writable {
-    /**
-     * Get the partition id that maps to the relevant {@link Partition} object
-     *
-     * @return Partition id
-     */
-    int getPartitionId();
-
-    /**
-     * Get the worker information that is currently responsible for
-     * the partition id.
-     *
-     * @return Owning worker information.
-     */
-    WorkerInfo getWorkerInfo();
-
-    /**
-     * Set the current worker info.
-     *
-     * @param workerInfo Worker info responsible for partition
-     */
-    void setWorkerInfo(WorkerInfo workerInfo);
-
-    /**
-     * Get the worker information that was previously responsible for the
-     * partition id.
-     *
-     * @return Owning worker information or null if no previous worker info.
-     */
-    WorkerInfo getPreviousWorkerInfo();
-
-    /**
-     * Set the previous worker info.
-     *
-     * @param workerInfo Worker info that was previously responsible for the
-     *        partition.
-     */
-    void setPreviousWorkerInfo(WorkerInfo workerInfo);
-
-    /**
-     * If this is a restarted checkpoint, the worker will use this information
-     * to determine where the checkpointed partition was stored on HDFS.
-     *
-     * @return Prefix of the checkpoint HDFS files for this partition, null if
-     *         this is not a restarted superstep.
-     */
-    String getCheckpointFilesPrefix();
-
-    /**
-     * Set the checkpoint files prefix.  Master uses this.
-     *
-     * @param checkpointFilesPrefix HDFS checkpoint file prefix
-     */
-    void setCheckpointFilesPrefix(String checkpointFilesPrefix);
+  /**
+   * Get the partition id that maps to the relevant {@link Partition} object
+   *
+   * @return Partition id
+   */
+  int getPartitionId();
+
+  /**
+   * Get the worker information that is currently responsible for
+   * the partition id.
+   *
+   * @return Owning worker information.
+   */
+  WorkerInfo getWorkerInfo();
+
+  /**
+   * Set the current worker info.
+   *
+   * @param workerInfo Worker info responsible for partition
+   */
+  void setWorkerInfo(WorkerInfo workerInfo);
+
+  /**
+   * Get the worker information that was previously responsible for the
+   * partition id.
+   *
+   * @return Owning worker information or null if no previous worker info.
+   */
+  WorkerInfo getPreviousWorkerInfo();
+
+  /**
+   * Set the previous worker info.
+   *
+   * @param workerInfo Worker info that was previously responsible for the
+   *        partition.
+   */
+  void setPreviousWorkerInfo(WorkerInfo workerInfo);
+
+  /**
+   * If this is a restarted checkpoint, the worker will use this information
+   * to determine where the checkpointed partition was stored on HDFS.
+   *
+   * @return Prefix of the checkpoint HDFS files for this partition, null if
+   *         this is not a restarted superstep.
+   */
+  String getCheckpointFilesPrefix();
+
+  /**
+   * Set the checkpoint files prefix.  Master uses this.
+   *
+   * @param checkpointFilesPrefix HDFS checkpoint file prefix
+   */
+  void setCheckpointFilesPrefix(String checkpointFilesPrefix);
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java Thu Feb 16 22:12:31 2012
@@ -29,74 +29,125 @@ import org.apache.hadoop.io.Writable;
  * actual partition data, only the statistics.
  */
 public class PartitionStats implements Writable {
-    private int partitionId = -1;
-    private long vertexCount = 0;
-    private long finishedVertexCount = 0;
-    private long edgeCount = 0;
-
-    public PartitionStats() {}
-
-    public PartitionStats(int partitionId,
-                          long vertexCount,
-                          long finishedVertexCount,
-                          long edgeCount) {
-        this.partitionId = partitionId;
-        this.vertexCount = vertexCount;
-        this.finishedVertexCount = finishedVertexCount;
-        this.edgeCount = edgeCount;
-    }
-
-    public void setPartitionId(int partitionId) {
-        this.partitionId = partitionId;
-    }
-
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    public void incrVertexCount() {
-        ++vertexCount;
-    }
-
-    public long getVertexCount() {
-        return vertexCount;
-    }
-
-    public void incrFinishedVertexCount() {
-        ++finishedVertexCount;
-    }
-
-    public long getFinishedVertexCount() {
-        return finishedVertexCount;
-    }
-
-    public void addEdgeCount(long edgeCount) {
-        this.edgeCount += edgeCount;
-    }
-
-    public long getEdgeCount() {
-        return edgeCount;
-    }
-
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        partitionId = input.readInt();
-        vertexCount = input.readLong();
-        finishedVertexCount = input.readLong();
-        edgeCount = input.readLong();
-    }
-
-    @Override
-    public void write(DataOutput output) throws IOException {
-        output.writeInt(partitionId);
-        output.writeLong(vertexCount);
-        output.writeLong(finishedVertexCount);
-        output.writeLong(edgeCount);
-    }
-
-    @Override
-    public String toString() {
-        return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
-               finishedVertexCount + ",edges=" + edgeCount + ")";
-    }
+  /** Id of partition to keep stats for */
+  private int partitionId = -1;
+  /** Vertices in this partition */
+  private long vertexCount = 0;
+  /** Finished vertices in this partition */
+  private long finishedVertexCount = 0;
+  /** Edges in this partition */
+  private long edgeCount = 0;
+
+  /**
+   * Default constructor for reflection.
+   */
+  public PartitionStats() { }
+
+  /**
+   * Constructor with the initial stats.
+   *
+   * @param partitionId Partition count.
+   * @param vertexCount Vertex count.
+   * @param finishedVertexCount Finished vertex count.
+   * @param edgeCount Edge count.
+   */
+  public PartitionStats(int partitionId,
+      long vertexCount,
+      long finishedVertexCount,
+      long edgeCount) {
+    this.partitionId = partitionId;
+    this.vertexCount = vertexCount;
+    this.finishedVertexCount = finishedVertexCount;
+    this.edgeCount = edgeCount;
+  }
+
+  /**
+   * Set the partition id.
+   *
+   * @param partitionId New partition id.
+   */
+  public void setPartitionId(int partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  /**
+   * Get partition id.
+   *
+   * @return Partition id.
+   */
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  /**
+   * Increment the vertex count by one.
+   */
+  public void incrVertexCount() {
+    ++vertexCount;
+  }
+
+  /**
+   * Get the vertex count.
+   *
+   * @return Vertex count.
+   */
+  public long getVertexCount() {
+    return vertexCount;
+  }
+
+  /**
+   * Increment the finished vertex count by one.
+   */
+  public void incrFinishedVertexCount() {
+    ++finishedVertexCount;
+  }
+
+  /**
+   * Get the finished vertex count.
+   *
+   * @return Finished vertex count.
+   */
+  public long getFinishedVertexCount() {
+    return finishedVertexCount;
+  }
+
+  /**
+   * Add edges to the edge count.
+   *
+   * @param edgeCount Number of edges to add.
+   */
+  public void addEdgeCount(long edgeCount) {
+    this.edgeCount += edgeCount;
+  }
+
+  /**
+   * Get the edge count.
+   *
+   * @return Edge count.
+   */
+  public long getEdgeCount() {
+    return edgeCount;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    partitionId = input.readInt();
+    vertexCount = input.readLong();
+    finishedVertexCount = input.readLong();
+    edgeCount = input.readLong();
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(partitionId);
+    output.writeLong(vertexCount);
+    output.writeLong(finishedVertexCount);
+    output.writeLong(edgeCount);
+  }
+
+  @Override
+  public String toString() {
+    return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
+        finishedVertexCount + ",edges=" + edgeCount + ")";
+  }
 }