You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [14/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/example...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java Thu Feb 16 22:12:31 2012
@@ -27,101 +27,106 @@ import org.apache.hadoop.mapreduce.Mappe
*/
@SuppressWarnings("rawtypes")
public abstract class WorkerContext implements AggregatorUsage {
- /** Global graph state */
- private GraphState graphState;
+ /** Global graph state */
+ private GraphState graphState;
- public void setGraphState(GraphState graphState) {
- this.graphState = graphState;
- }
-
- /**
- * Initialize the WorkerContext.
- * This method is executed once on each Worker before the first
- * superstep starts.
- *
- * @throws IllegalAccessException
- * @throws InstantiationException
- */
- public abstract void preApplication() throws InstantiationException,
- IllegalAccessException;
-
- /**
- * Finalize the WorkerContext.
- * This method is executed once on each Worker after the last
- * superstep ends.
- */
- public abstract void postApplication();
-
- /**
- * Execute user code.
- * This method is executed once on each Worker before each
- * superstep starts.
- */
- public abstract void preSuperstep();
-
- /**
- * Execute user code.
- * This method is executed once on each Worker after each
- * superstep ends.
- */
- public abstract void postSuperstep();
-
- /**
- * Retrieves the current superstep.
- *
- * @return Current superstep
- */
- public long getSuperstep() {
- return graphState.getSuperstep();
- }
-
- /**
- * Get the total (all workers) number of vertices that
- * existed in the previous superstep.
- *
- * @return Total number of vertices (-1 if first superstep)
- */
- public long getNumVertices() {
- return graphState.getNumVertices();
- }
-
- /**
- * Get the total (all workers) number of edges that
- * existed in the previous superstep.
- *
- * @return Total number of edges (-1 if first superstep)
- */
- public long getNumEdges() {
- return graphState.getNumEdges();
- }
-
- /**
- * Get the mapper context
- *
- * @return Mapper context
- */
- public Mapper.Context getContext() {
- return graphState.getContext();
- }
-
- @Override
- public final <A extends Writable> Aggregator<A> registerAggregator(
- String name,
- Class<? extends Aggregator<A>> aggregatorClass)
- throws InstantiationException, IllegalAccessException {
- return graphState.getGraphMapper().getAggregatorUsage().
- registerAggregator(name, aggregatorClass);
- }
-
- @Override
- public final Aggregator<? extends Writable> getAggregator(String name) {
- return graphState.getGraphMapper().getAggregatorUsage().
- getAggregator(name);
- }
-
- @Override
- public final boolean useAggregator(String name) {
- return graphState.getGraphMapper().getAggregatorUsage().
- useAggregator(name);
- }
-}
\ No newline at end of file
+ /**
+ * Set the graph state.
+ *
+ * @param graphState Used to set the graph state.
+ */
+ public void setGraphState(GraphState graphState) {
+ this.graphState = graphState;
+ }
+
+ /**
+ * Initialize the WorkerContext.
+ * This method is executed once on each Worker before the first
+ * superstep starts.
+ *
+ * @throws IllegalAccessException Thrown for getting the class
+ * @throws InstantiationException Expected instantiation in this method.
+ */
+ public abstract void preApplication() throws InstantiationException,
+ IllegalAccessException;
+
+ /**
+ * Finalize the WorkerContext.
+ * This method is executed once on each Worker after the last
+ * superstep ends.
+ */
+ public abstract void postApplication();
+
+ /**
+ * Execute user code.
+ * This method is executed once on each Worker before each
+ * superstep starts.
+ */
+ public abstract void preSuperstep();
+
+ /**
+ * Execute user code.
+ * This method is executed once on each Worker after each
+ * superstep ends.
+ */
+ public abstract void postSuperstep();
+
+ /**
+ * Retrieves the current superstep.
+ *
+ * @return Current superstep
+ */
+ public long getSuperstep() {
+ return graphState.getSuperstep();
+ }
+
+ /**
+ * Get the total (all workers) number of vertices that
+ * existed in the previous superstep.
+ *
+ * @return Total number of vertices (-1 if first superstep)
+ */
+ public long getNumVertices() {
+ return graphState.getNumVertices();
+ }
+
+ /**
+ * Get the total (all workers) number of edges that
+ * existed in the previous superstep.
+ *
+ * @return Total number of edges (-1 if first superstep)
+ */
+ public long getNumEdges() {
+ return graphState.getNumEdges();
+ }
+
+ /**
+ * Get the mapper context
+ *
+ * @return Mapper context
+ */
+ public Mapper.Context getContext() {
+ return graphState.getContext();
+ }
+
+ @Override
+ public final <A extends Writable> Aggregator<A> registerAggregator(
+ String name,
+ Class<? extends Aggregator<A>> aggregatorClass)
+ throws InstantiationException, IllegalAccessException {
+ return graphState.getGraphMapper().getAggregatorUsage().
+ registerAggregator(name, aggregatorClass);
+ }
+
+ @Override
+ public final Aggregator<? extends Writable> getAggregator(String name) {
+ return graphState.getGraphMapper().getAggregatorUsage().
+ getAggregator(name);
+ }
+
+ @Override
+ public final boolean useAggregator(String name) {
+ return graphState.getGraphMapper().getAggregatorUsage().
+ useAggregator(name);
+ }
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Thu Feb 16 22:12:31 2012
@@ -28,84 +28,91 @@ import org.apache.hadoop.io.Writable;
* Information about a worker that is sent to the master and other workers.
*/
public class WorkerInfo implements Writable {
- /** Worker hostname */
- private String hostname;
- /** Partition id of this worker */
- private int partitionId = -1;
- /** Port that the RPC server is using */
- private int port = -1;
- /** Hostname + "_" + id for easier debugging */
- private String hostnameId;
-
- /**
- * Constructor for reflection
- */
- public WorkerInfo() {
- }
-
- public WorkerInfo(String hostname, int partitionId, int port) {
- this.hostname = hostname;
- this.partitionId = partitionId;
- this.port = port;
- this.hostnameId = hostname + "_" + partitionId;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public String getHostnameId() {
- return hostnameId;
- }
-
- public int getPort() {
- return port;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof WorkerInfo) {
- WorkerInfo workerInfo = (WorkerInfo) other;
- if (hostname.equals(workerInfo.getHostname()) &&
- (partitionId == workerInfo.getPartitionId()) &&
- (port == workerInfo.getPort())) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- int result = 17;
- result = 37 * result + port;
- result = 37 * result + hostname.hashCode();
- result = 37 * result + partitionId;
- return result;
- }
-
- @Override
- public String toString() {
- return "Worker(hostname=" + hostname + ", MRpartition=" +
- partitionId + ", port=" + port + ")";
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- hostname = input.readUTF();
- partitionId = input.readInt();
- port = input.readInt();
- hostnameId = hostname + "_" + partitionId;
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeUTF(hostname);
- output.writeInt(partitionId);
- output.writeInt(port);
- }
+ /** Worker hostname */
+ private String hostname;
+ /** Partition id of this worker */
+ private int partitionId = -1;
+ /** Port that the RPC server is using */
+ private int port = -1;
+ /** Hostname + "_" + id for easier debugging */
+ private String hostnameId;
+
+ /**
+ * Constructor for reflection
+ */
+ public WorkerInfo() {
+ }
+
+ /**
+ * Constructor with paramters.
+ *
+ * @param hostname Hostname of this worker.
+ * @param partitionId partition id of this particular object.
+ * @param port Port of the service.
+ */
+ public WorkerInfo(String hostname, int partitionId, int port) {
+ this.hostname = hostname;
+ this.partitionId = partitionId;
+ this.port = port;
+ this.hostnameId = hostname + "_" + partitionId;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public String getHostnameId() {
+ return hostnameId;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof WorkerInfo) {
+ WorkerInfo workerInfo = (WorkerInfo) other;
+ if (hostname.equals(workerInfo.getHostname()) &&
+ (partitionId == workerInfo.getPartitionId()) &&
+ (port == workerInfo.getPort())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 37 * result + port;
+ result = 37 * result + hostname.hashCode();
+ result = 37 * result + partitionId;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Worker(hostname=" + hostname + ", MRpartition=" +
+ partitionId + ", port=" + port + ")";
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ hostname = input.readUTF();
+ partitionId = input.readInt();
+ port = input.readInt();
+ hostnameId = hostname + "_" + partitionId;
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeUTF(hostname);
+ output.writeInt(partitionId);
+ output.writeInt(port);
+ }
}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.giraph.bsp;
-
/**
- * State of the BSP application
+ * Package of all the graph related objects, built on the
+ * org.apache.bsp package.
*/
-public enum ApplicationState {
- UNKNOWN, ///< Shouldn't be seen, just an initial state
- START_SUPERSTEP, ///< Start from a desired superstep
- FAILED, ///< Unrecoverable
- FINISHED ///< Successful completion
-}
+package org.apache.giraph.graph;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java Thu Feb 16 22:12:31 2012
@@ -31,116 +31,132 @@ import org.apache.hadoop.conf.Configurat
* owner implementations.
*/
public class BasicPartitionOwner implements PartitionOwner, Configurable {
- /** Configuration */
- private Configuration conf;
- /** Partition id */
- private int partitionId = -1;
- /** Owning worker information */
- private WorkerInfo workerInfo;
- /** Previous (if any) worker info */
- private WorkerInfo previousWorkerInfo;
- /** Checkpoint files prefix for this partition */
- private String checkpointFilesPrefix;
-
- public BasicPartitionOwner() {
- }
-
- public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) {
- this(partitionId, workerInfo, null, null);
- }
-
- public BasicPartitionOwner(int partitionId,
- WorkerInfo workerInfo,
- WorkerInfo previousWorkerInfo,
- String checkpointFilesPrefix) {
- this.partitionId = partitionId;
- this.workerInfo = workerInfo;
- this.previousWorkerInfo = previousWorkerInfo;
- this.checkpointFilesPrefix = checkpointFilesPrefix;
- }
-
- @Override
- public int getPartitionId() {
- return partitionId;
- }
-
- @Override
- public WorkerInfo getWorkerInfo() {
- return workerInfo;
- }
-
- @Override
- public void setWorkerInfo(WorkerInfo workerInfo) {
- this.workerInfo = workerInfo;
- }
-
- @Override
- public WorkerInfo getPreviousWorkerInfo() {
- return previousWorkerInfo;
- }
-
- @Override
- public void setPreviousWorkerInfo(WorkerInfo workerInfo) {
- this.previousWorkerInfo = workerInfo;
- }
-
- @Override
- public String getCheckpointFilesPrefix() {
- return checkpointFilesPrefix;
- }
-
- @Override
- public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
- this.checkpointFilesPrefix = checkpointFilesPrefix;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- partitionId = input.readInt();
- workerInfo = new WorkerInfo();
- workerInfo.readFields(input);
- boolean hasPreviousWorkerInfo = input.readBoolean();
- if (hasPreviousWorkerInfo) {
- previousWorkerInfo = new WorkerInfo();
- previousWorkerInfo.readFields(input);
- }
- boolean hasCheckpointFilePrefix = input.readBoolean();
- if (hasCheckpointFilePrefix) {
- checkpointFilesPrefix = input.readUTF();
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeInt(partitionId);
- workerInfo.write(output);
- if (previousWorkerInfo != null) {
- output.writeBoolean(true);
- previousWorkerInfo.write(output);
- } else {
- output.writeBoolean(false);
- }
- if (checkpointFilesPrefix != null) {
- output.writeBoolean(true);
- output.writeUTF(checkpointFilesPrefix);
- } else {
- output.writeBoolean(false);
- }
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public String toString() {
- return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" +
- previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")";
- }
+ /** Configuration */
+ private Configuration conf;
+ /** Partition id */
+ private int partitionId = -1;
+ /** Owning worker information */
+ private WorkerInfo workerInfo;
+ /** Previous (if any) worker info */
+ private WorkerInfo previousWorkerInfo;
+ /** Checkpoint files prefix for this partition */
+ private String checkpointFilesPrefix;
+
+ /**
+ * Default constructor.
+ */
+ public BasicPartitionOwner() { }
+
+ /**
+ * Constructor with partition id and worker info.
+ *
+ * @param partitionId Partition id of this partition.
+ * @param workerInfo Owner of the partition.
+ */
+ public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) {
+ this(partitionId, workerInfo, null, null);
+ }
+
+ /**
+ * Constructor with partition id and worker info.
+ *
+ * @param partitionId Partition id of this partition.
+ * @param workerInfo Owner of the partition.
+ * @param previousWorkerInfo Previous owner of this partition.
+ * @param checkpointFilesPrefix Prefix of the checkpoint files.
+ */
+ public BasicPartitionOwner(int partitionId,
+ WorkerInfo workerInfo,
+ WorkerInfo previousWorkerInfo,
+ String checkpointFilesPrefix) {
+ this.partitionId = partitionId;
+ this.workerInfo = workerInfo;
+ this.previousWorkerInfo = previousWorkerInfo;
+ this.checkpointFilesPrefix = checkpointFilesPrefix;
+ }
+
+ @Override
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ @Override
+ public WorkerInfo getWorkerInfo() {
+ return workerInfo;
+ }
+
+ @Override
+ public void setWorkerInfo(WorkerInfo workerInfo) {
+ this.workerInfo = workerInfo;
+ }
+
+ @Override
+ public WorkerInfo getPreviousWorkerInfo() {
+ return previousWorkerInfo;
+ }
+
+ @Override
+ public void setPreviousWorkerInfo(WorkerInfo workerInfo) {
+ this.previousWorkerInfo = workerInfo;
+ }
+
+ @Override
+ public String getCheckpointFilesPrefix() {
+ return checkpointFilesPrefix;
+ }
+
+ @Override
+ public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
+ this.checkpointFilesPrefix = checkpointFilesPrefix;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ partitionId = input.readInt();
+ workerInfo = new WorkerInfo();
+ workerInfo.readFields(input);
+ boolean hasPreviousWorkerInfo = input.readBoolean();
+ if (hasPreviousWorkerInfo) {
+ previousWorkerInfo = new WorkerInfo();
+ previousWorkerInfo.readFields(input);
+ }
+ boolean hasCheckpointFilePrefix = input.readBoolean();
+ if (hasCheckpointFilePrefix) {
+ checkpointFilesPrefix = input.readUTF();
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(partitionId);
+ workerInfo.write(output);
+ if (previousWorkerInfo != null) {
+ output.writeBoolean(true);
+ previousWorkerInfo.write(output);
+ } else {
+ output.writeBoolean(false);
+ }
+ if (checkpointFilesPrefix != null) {
+ output.writeBoolean(true);
+ output.writeUTF(checkpointFilesPrefix);
+ } else {
+ output.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public String toString() {
+ return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" +
+ previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")";
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java Thu Feb 16 22:12:31 2012
@@ -31,20 +31,20 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public interface GraphPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Create the {@link MasterGraphPartitioner} used by the master.
- * Instantiated once by the master and reused.
- *
- * @return Instantiated master graph partitioner
- */
- MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Create the {@link MasterGraphPartitioner} used by the master.
+ * Instantiated once by the master and reused.
+ *
+ * @return Instantiated master graph partitioner
+ */
+ MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
- /**
- * Create the {@link WorkerGraphPartitioner} used by the worker.
- * Instantiated once by every worker and reused.
- *
- * @return Instantiated worker graph partitioner
- */
- WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
+ /**
+ * Create the {@link WorkerGraphPartitioner} used by the worker.
+ * Instantiated once by every worker and reused.
+ *
+ * @return Instantiated worker graph partitioner
+ */
+ WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java Thu Feb 16 22:12:31 2012
@@ -39,119 +39,119 @@ import org.apache.log4j.Logger;
*/
@SuppressWarnings("rawtypes")
public class HashMasterPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- MasterGraphPartitioner<I, V, E, M> {
- /** Provided configuration */
- private Configuration conf;
- /** Specified partition count (overrides calculation) */
- private final int userPartitionCount;
- /** Partition count (calculated in createInitialPartitionOwners) */
- private int partitionCount = -1;
- /** Save the last generated partition owner list */
- private List<PartitionOwner> partitionOwnerList;
- /** Class logger */
- private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
-
- /**
- * ZooKeeper has a limit of the data in a single znode of 1 MB and
- * each entry can go be on the average somewhat more than 300 bytes
- */
- private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
-
- /**
- * Multiplier for the current workers squared
- */
- public static final String PARTITION_COUNT_MULTIPLIER =
- "hash.masterPartitionCountMultipler";
- public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
-
- /** Overrides default partition count calculation if not -1 */
- public static final String USER_PARTITION_COUNT =
- "hash.userPartitionCount";
- public static final int DEFAULT_USER_PARTITION_COUNT = -1;
-
- public HashMasterPartitioner(Configuration conf) {
- this.conf = conf;
- userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
- DEFAULT_USER_PARTITION_COUNT);
- }
-
- @Override
- public Collection<PartitionOwner> createInitialPartitionOwners(
- Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
- if (availableWorkerInfos.isEmpty()) {
- throw new IllegalArgumentException(
- "createInitialPartitionOwners: No available workers");
- }
- List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
- Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
- if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
- float multiplier = conf.getFloat(
- PARTITION_COUNT_MULTIPLIER,
- DEFAULT_PARTITION_COUNT_MULTIPLIER);
- partitionCount =
- Math.max((int) (multiplier * availableWorkerInfos.size() *
- availableWorkerInfos.size()),
- 1);
- } else {
- partitionCount = userPartitionCount;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("createInitialPartitionOwners: Creating " +
- partitionCount + ", default would have been " +
- (availableWorkerInfos.size() *
- availableWorkerInfos.size()) + " partitions.");
- }
- if (partitionCount > MAX_PARTTIONS) {
- LOG.warn("createInitialPartitionOwners: " +
- "Reducing the partitionCount to " + MAX_PARTTIONS +
- " from " + partitionCount);
- partitionCount = MAX_PARTTIONS;
- }
-
- for (int i = 0; i < partitionCount; ++i) {
- PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
- if (!workerIt.hasNext()) {
- workerIt = availableWorkerInfos.iterator();
- }
- ownerList.add(owner);
- }
- this.partitionOwnerList = ownerList;
- return ownerList;
- }
-
-
- @Override
- public Collection<PartitionOwner> getCurrentPartitionOwners() {
- return partitionOwnerList;
- }
-
- /**
- * Subclasses can set the partition owner list.
- *
- * @param partitionOwnerList New partition owner list.
- */
- protected void setPartitionOwnerList(List<PartitionOwner>
- partitionOwnerList) {
- this.partitionOwnerList = partitionOwnerList;
- }
-
- @Override
- public Collection<PartitionOwner> generateChangedPartitionOwners(
- Collection<PartitionStats> allPartitionStatsList,
- Collection<WorkerInfo> availableWorkerInfos,
- int maxWorkers,
- long superstep) {
- return PartitionBalancer.balancePartitionsAcrossWorkers(
- conf,
- partitionOwnerList,
- allPartitionStatsList,
- availableWorkerInfos);
- }
-
- @Override
- public PartitionStats createPartitionStats() {
- return new PartitionStats();
- }
-
+ V extends Writable, E extends Writable, M extends Writable> implements
+ MasterGraphPartitioner<I, V, E, M> {
+ /** Multiplier for the current workers squared */
+ public static final String PARTITION_COUNT_MULTIPLIER =
+ "hash.masterPartitionCountMultipler";
+ /** Default mulitplier for current workers squared */
+ public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
+ /** Overrides default partition count calculation if not -1 */
+ public static final String USER_PARTITION_COUNT =
+ "hash.userPartitionCount";
+ /** Default user partition count */
+ public static final int DEFAULT_USER_PARTITION_COUNT = -1;
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
+ /**
+ * ZooKeeper has a limit of the data in a single znode of 1 MB and
+ * each entry can go be on the average somewhat more than 300 bytes
+ */
+ private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
+ /** Provided configuration */
+ private Configuration conf;
+ /** Specified partition count (overrides calculation) */
+ private final int userPartitionCount;
+ /** Partition count (calculated in createInitialPartitionOwners) */
+ private int partitionCount = -1;
+ /** Save the last generated partition owner list */
+ private List<PartitionOwner> partitionOwnerList;
+
+ /**
+ * Constructor.
+ *
+ *@param conf Configuration used.
+ */
+ public HashMasterPartitioner(Configuration conf) {
+ this.conf = conf;
+ userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
+ DEFAULT_USER_PARTITION_COUNT);
+ }
+
+ @Override
+ public Collection<PartitionOwner> createInitialPartitionOwners(
+ Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+ if (availableWorkerInfos.isEmpty()) {
+ throw new IllegalArgumentException(
+ "createInitialPartitionOwners: No available workers");
+ }
+ List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
+ Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
+ if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
+ float multiplier = conf.getFloat(
+ PARTITION_COUNT_MULTIPLIER,
+ DEFAULT_PARTITION_COUNT_MULTIPLIER);
+ partitionCount =
+ Math.max((int) (multiplier * availableWorkerInfos.size() *
+ availableWorkerInfos.size()),
+ 1);
+ } else {
+ partitionCount = userPartitionCount;
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("createInitialPartitionOwners: Creating " +
+ partitionCount + ", default would have been " +
+ (availableWorkerInfos.size() *
+ availableWorkerInfos.size()) + " partitions.");
+ }
+ if (partitionCount > MAX_PARTTIONS) {
+ LOG.warn("createInitialPartitionOwners: " +
+ "Reducing the partitionCount to " + MAX_PARTTIONS +
+ " from " + partitionCount);
+ partitionCount = MAX_PARTTIONS;
+ }
+
+ for (int i = 0; i < partitionCount; ++i) {
+ PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
+ if (!workerIt.hasNext()) {
+ workerIt = availableWorkerInfos.iterator();
+ }
+ ownerList.add(owner);
+ }
+ this.partitionOwnerList = ownerList;
+ return ownerList;
+ }
+
+ @Override
+ public Collection<PartitionOwner> getCurrentPartitionOwners() {
+ return partitionOwnerList;
+ }
+
+ /**
+ * Subclasses can set the partition owner list.
+ *
+ * @param partitionOwnerList New partition owner list.
+ */
+ protected void setPartitionOwnerList(List<PartitionOwner>
+ partitionOwnerList) {
+ this.partitionOwnerList = partitionOwnerList;
+ }
+
+ @Override
+ public Collection<PartitionOwner> generateChangedPartitionOwners(
+ Collection<PartitionStats> allPartitionStatsList,
+ Collection<WorkerInfo> availableWorkerInfos,
+ int maxWorkers,
+ long superstep) {
+ return PartitionBalancer.balancePartitionsAcrossWorkers(
+ conf,
+ partitionOwnerList,
+ allPartitionStatsList,
+ availableWorkerInfos);
+ }
+
+ @Override
+ public PartitionStats createPartitionStats() {
+ return new PartitionStats();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java Thu Feb 16 22:12:31 2012
@@ -34,28 +34,28 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public class HashPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements Configurable,
- GraphPartitionerFactory<I, V, E, M> {
- private Configuration conf;
+ V extends Writable, E extends Writable, M extends Writable>
+ implements Configurable, GraphPartitionerFactory<I, V, E, M> {
+ /** Saved configuration */
+ private Configuration conf;
- @Override
- public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
- return new HashMasterPartitioner<I, V, E, M>(getConf());
- }
+ @Override
+ public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+ return new HashMasterPartitioner<I, V, E, M>(getConf());
+ }
- @Override
- public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
- return new HashWorkerPartitioner<I, V, E, M>();
- }
+ @Override
+ public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+ return new HashWorkerPartitioner<I, V, E, M>();
+ }
- @Override
- public Configuration getConf() {
- return conf;
- }
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java Thu Feb 16 22:12:31 2012
@@ -34,27 +34,28 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public class HashRangePartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements Configurable, GraphPartitionerFactory<I, V, E, M> {
- private Configuration conf;
+ V extends Writable, E extends Writable, M extends Writable>
+ implements Configurable, GraphPartitionerFactory<I, V, E, M> {
+ /** Saved configuration */
+ private Configuration conf;
- @Override
- public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
- return new HashMasterPartitioner<I, V, E, M>(getConf());
- }
+ @Override
+ public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
+ return new HashMasterPartitioner<I, V, E, M>(getConf());
+ }
- @Override
- public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
- return new HashRangeWorkerPartitioner<I, V, E, M>();
- }
+ @Override
+ public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
+ return new HashRangeWorkerPartitioner<I, V, E, M>();
+ }
- @Override
- public Configuration getConf() {
- return conf;
- }
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java Thu Feb 16 22:12:31 2012
@@ -31,12 +31,12 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public class HashRangeWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends HashWorkerPartitioner<I, V, E, M> {
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- int rangeSize = Integer.MAX_VALUE / getPartitionOwners().size();
- int index = Math.abs(vertexId.hashCode()) / rangeSize;
- return partitionOwnerList.get(index);
- }
+ V extends Writable, E extends Writable, M extends Writable>
+ extends HashWorkerPartitioner<I, V, E, M> {
+ @Override
+ public PartitionOwner getPartitionOwner(I vertexId) {
+ int rangeSize = Integer.MAX_VALUE / getPartitionOwners().size();
+ int index = Math.abs(vertexId.hashCode()) / rangeSize;
+ return partitionOwnerList.get(index);
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Thu Feb 16 22:12:31 2012
@@ -40,77 +40,77 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public class HashWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements WorkerGraphPartitioner<I, V, E, M> {
- /** Mapping of the vertex ids to {@link PartitionOwner} */
- protected List<PartitionOwner> partitionOwnerList =
- new ArrayList<PartitionOwner>();
-
- @Override
- public PartitionOwner createPartitionOwner() {
- return new BasicPartitionOwner();
- }
-
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- return partitionOwnerList.get(Math.abs(vertexId.hashCode())
- % partitionOwnerList.size());
- }
-
- @Override
- public Collection<PartitionStats> finalizePartitionStats(
- Collection<PartitionStats> workerPartitionStats,
- Map<Integer, Partition<I, V, E, M>> partitionMap) {
- // No modification necessary
- return workerPartitionStats;
- }
-
- @Override
- public PartitionExchange updatePartitionOwners(
- WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners,
- Map<Integer, Partition<I, V, E, M>> partitionMap) {
- partitionOwnerList.clear();
- partitionOwnerList.addAll(masterSetPartitionOwners);
-
- Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
- Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
- new HashMap<WorkerInfo, List<Integer>>();
- for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
- if (partitionOwner.getPreviousWorkerInfo() == null) {
- continue;
- } else if (partitionOwner.getWorkerInfo().equals(
- myWorkerInfo) &&
- partitionOwner.getPreviousWorkerInfo().equals(
- myWorkerInfo)) {
- throw new IllegalStateException(
- "updatePartitionOwners: Impossible to have the same " +
- "previous and current worker info " + partitionOwner +
- " as me " + myWorkerInfo);
- } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
- dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
- } else if (partitionOwner.getPreviousWorkerInfo().equals(
- myWorkerInfo)) {
- if (workerPartitionOwnerMap.containsKey(
- partitionOwner.getWorkerInfo())) {
- workerPartitionOwnerMap.get(
- partitionOwner.getWorkerInfo()).add(
- partitionOwner.getPartitionId());
- } else {
- List<Integer> partitionOwnerList = new ArrayList<Integer>();
- partitionOwnerList.add(partitionOwner.getPartitionId());
- workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
- partitionOwnerList);
- }
- }
+ V extends Writable, E extends Writable, M extends Writable>
+ implements WorkerGraphPartitioner<I, V, E, M> {
+ /** Mapping of the vertex ids to {@link PartitionOwner} */
+ protected List<PartitionOwner> partitionOwnerList =
+ new ArrayList<PartitionOwner>();
+
+ @Override
+ public PartitionOwner createPartitionOwner() {
+ return new BasicPartitionOwner();
+ }
+
+ @Override
+ public PartitionOwner getPartitionOwner(I vertexId) {
+ return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
+ partitionOwnerList.size());
+ }
+
+ @Override
+ public Collection<PartitionStats> finalizePartitionStats(
+ Collection<PartitionStats> workerPartitionStats,
+ Map<Integer, Partition<I, V, E, M>> partitionMap) {
+ // No modification necessary
+ return workerPartitionStats;
+ }
+
+ @Override
+ public PartitionExchange updatePartitionOwners(
+ WorkerInfo myWorkerInfo,
+ Collection<? extends PartitionOwner> masterSetPartitionOwners,
+ Map<Integer, Partition<I, V, E, M>> partitionMap) {
+ partitionOwnerList.clear();
+ partitionOwnerList.addAll(masterSetPartitionOwners);
+
+ Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
+ Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
+ new HashMap<WorkerInfo, List<Integer>>();
+ for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+ if (partitionOwner.getPreviousWorkerInfo() == null) {
+ continue;
+ } else if (partitionOwner.getWorkerInfo().equals(
+ myWorkerInfo) &&
+ partitionOwner.getPreviousWorkerInfo().equals(
+ myWorkerInfo)) {
+ throw new IllegalStateException(
+ "updatePartitionOwners: Impossible to have the same " +
+ "previous and current worker info " + partitionOwner +
+ " as me " + myWorkerInfo);
+ } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
+ dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
+ } else if (partitionOwner.getPreviousWorkerInfo().equals(
+ myWorkerInfo)) {
+ if (workerPartitionOwnerMap.containsKey(
+ partitionOwner.getWorkerInfo())) {
+ workerPartitionOwnerMap.get(
+ partitionOwner.getWorkerInfo()).add(
+ partitionOwner.getPartitionId());
+ } else {
+ List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
+ tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
+ workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
+ tmpPartitionOwnerList);
}
-
- return new PartitionExchange(dependentWorkerSet,
- workerPartitionOwnerMap);
+ }
}
- @Override
- public Collection<? extends PartitionOwner> getPartitionOwners() {
- return partitionOwnerList;
- }
+ return new PartitionExchange(dependentWorkerSet,
+ workerPartitionOwnerMap);
+ }
+
+ @Override
+ public Collection<? extends PartitionOwner> getPartitionOwners() {
+ return partitionOwnerList;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java Thu Feb 16 22:12:31 2012
@@ -35,48 +35,49 @@ import org.apache.giraph.graph.WorkerInf
*/
@SuppressWarnings("rawtypes")
public interface MasterGraphPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Set some initial partition owners for the graph. Guaranteed to be called
- * prior to the graph being loaded (initial or restart).
- *
- * @param availableWorkerInfos Workers available for partition assignment
- * @param maxWorkers Maximum number of workers
- */
- Collection<PartitionOwner> createInitialPartitionOwners(
- Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Set some initial partition owners for the graph. Guaranteed to be called
+ * prior to the graph being loaded (initial or restart).
+ *
+ * @param availableWorkerInfos Workers available for partition assignment
+ * @param maxWorkers Maximum number of workers
+ * @return Collection of generated partition owners.
+ */
+ Collection<PartitionOwner> createInitialPartitionOwners(
+ Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
- /**
- * After the worker stats have been merged to a single list, the master can
- * use this information to send commands to the workers for any
- * {@link Partition} changes. This protocol is specific to the
- * {@link MasterGraphPartitioner} implementation.
- *
- * @param allPartitionStatsList All partition stats from all workers.
- * @param availableWorkers Workers available for partition assignment
- * @param maxWorkers Maximum number of workers
- * @param superstep Partition owners will be set for this superstep
- * @return Collection of {@link PartitionOwner} objects that changed from
- * the previous superstep, empty list if no change.
- */
- Collection<PartitionOwner> generateChangedPartitionOwners(
- Collection<PartitionStats> allPartitionStatsList,
- Collection<WorkerInfo> availableWorkers,
- int maxWorkers,
- long superstep);
+ /**
+ * After the worker stats have been merged to a single list, the master can
+ * use this information to send commands to the workers for any
+ * {@link Partition} changes. This protocol is specific to the
+ * {@link MasterGraphPartitioner} implementation.
+ *
+ * @param allPartitionStatsList All partition stats from all workers.
+ * @param availableWorkers Workers available for partition assignment
+ * @param maxWorkers Maximum number of workers
+ * @param superstep Partition owners will be set for this superstep
+ * @return Collection of {@link PartitionOwner} objects that changed from
+ * the previous superstep, empty list if no change.
+ */
+ Collection<PartitionOwner> generateChangedPartitionOwners(
+ Collection<PartitionStats> allPartitionStatsList,
+ Collection<WorkerInfo> availableWorkers,
+ int maxWorkers,
+ long superstep);
- /**
- * Get current partition owners at this time.
- *
- * @return Collection of current {@link PartitionOwner} objects
- */
- Collection<PartitionOwner> getCurrentPartitionOwners();
+ /**
+ * Get current partition owners at this time.
+ *
+ * @return Collection of current {@link PartitionOwner} objects
+ */
+ Collection<PartitionOwner> getCurrentPartitionOwners();
- /**
- * Instantiate the {@link PartitionStats} implementation used to read the
- * worker stats
- *
- * @return Instantiated {@link PartitionStats} object
- */
- PartitionStats createPartitionStats();
+ /**
+ * Instantiate the {@link PartitionStats} implementation used to read the
+ * worker stats
+ *
+ * @return Instantiated {@link PartitionStats} object
+ */
+ PartitionStats createPartitionStats();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Thu Feb 16 22:12:31 2012
@@ -31,115 +31,126 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-
/**
* A generic container that stores vertices. Vertex ids will map to exactly
* one partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class Partition<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements Writable {
- /** Configuration from the worker */
- private final Configuration conf;
- /** Partition id */
- private final int partitionId;
- /** Vertex map for this range (keyed by index) */
- private final Map<I, BasicVertex<I, V, E, M>> vertexMap =
- new HashMap<I, BasicVertex<I, V, E, M>>();
-
- public Partition(Configuration conf, int partitionId) {
- this.conf = conf;
- this.partitionId = partitionId;
- }
-
- /**
- * Get the vertex for this vertex index.
- *
- * @param vertexIndex Vertex index to search for
- * @return Vertex if it exists, null otherwise
- */
- public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
- return vertexMap.get(vertexIndex);
- }
-
- /**
- * Put a vertex into the Partition
- *
- * @param vertex Vertex to put in the Partition
- * @return old vertex value (i.e. null if none existed prior)
- */
- public BasicVertex<I, V, E, M> putVertex(BasicVertex<I, V, E, M> vertex) {
- return vertexMap.put(vertex.getVertexId(), vertex);
- }
-
- /**
- * Remove a vertex from the Partition
- *
- * @param vertexIndex Vertex index to remove
- */
- public BasicVertex<I, V, E, M> removeVertex(I vertexIndex) {
- return vertexMap.remove(vertexIndex);
- }
-
- /**
- * Get a collection of the vertices.
- *
- * @return Collection of the vertices
- */
- public Collection<BasicVertex<I, V, E , M>> getVertices() {
- return vertexMap.values();
- }
-
- /**
- * Get the number of edges in this partition. Computed on the fly.
- *
- * @return Number of edges.
- */
- public long getEdgeCount() {
- long edges = 0;
- for (BasicVertex<I, V, E, M> vertex : vertexMap.values()) {
- edges += vertex.getNumOutEdges();
- }
- return edges;
- }
-
- /**
- * Get the partition id.
- *
- * @return Partition id of this partition.
- */
- public int getPartitionId() {
- return partitionId;
- }
-
- @Override
- public String toString() {
- return "(id=" + getPartitionId() + ",V=" + vertexMap.size() +
- ",E=" + getEdgeCount() + ")";
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- int vertices = input.readInt();
- for (int i = 0; i < vertices; ++i) {
- BasicVertex<I, V, E, M> vertex =
- BspUtils.<I, V, E, M>createVertex(conf);
- vertex.readFields(input);
- if (vertexMap.put(vertex.getVertexId(),
- (BasicVertex<I, V, E, M>) vertex) != null) {
- throw new IllegalStateException(
- "readFields: " + this +
- " already has same id " + vertex);
- }
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeInt(vertexMap.size());
- for (BasicVertex vertex : vertexMap.values()) {
- vertex.write(output);
- }
+ V extends Writable, E extends Writable, M extends Writable>
+ implements Writable {
+ /** Configuration from the worker */
+ private final Configuration conf;
+ /** Partition id */
+ private final int partitionId;
+ /** Vertex map for this range (keyed by index) */
+ private final Map<I, BasicVertex<I, V, E, M>> vertexMap =
+ new HashMap<I, BasicVertex<I, V, E, M>>();
+
+ /**
+ * Constructor.
+ *
+ * @param conf Configuration.
+ * @param partitionId Partition id.
+ */
+ public Partition(Configuration conf, int partitionId) {
+ this.conf = conf;
+ this.partitionId = partitionId;
+ }
+
+ /**
+ * Get the vertex for this vertex index.
+ *
+ * @param vertexIndex Vertex index to search for
+ * @return Vertex if it exists, null otherwise
+ */
+ public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
+ return vertexMap.get(vertexIndex);
+ }
+
+ /**
+ * Put a vertex into the Partition
+ *
+ * @param vertex Vertex to put in the Partition
+ * @return old vertex value (i.e. null if none existed prior)
+ */
+ public BasicVertex<I, V, E, M> putVertex(BasicVertex<I, V, E, M> vertex) {
+ return vertexMap.put(vertex.getVertexId(), vertex);
+ }
+
+ /**
+ * Remove a vertex from the Partition
+ *
+ * @param vertexIndex Vertex index to remove
+ * @return The removed vertex.
+ */
+ public BasicVertex<I, V, E, M> removeVertex(I vertexIndex) {
+ return vertexMap.remove(vertexIndex);
+ }
+
+ /**
+ * Get a collection of the vertices.
+ *
+ * @return Collection of the vertices
+ */
+ public Collection<BasicVertex<I, V, E , M>> getVertices() {
+ return vertexMap.values();
+ }
+
+ /**
+ * Get the number of edges in this partition. Computed on the fly.
+ *
+ * @return Number of edges.
+ */
+ public long getEdgeCount() {
+ long edges = 0;
+ for (BasicVertex<I, V, E, M> vertex : vertexMap.values()) {
+ edges += vertex.getNumOutEdges();
+ }
+ return edges;
+ }
+
+ /**
+ * Get the partition id.
+ *
+ * @return Partition id of this partition.
+ */
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ @Override
+ public String toString() {
+ return "(id=" + getPartitionId() + ",V=" + vertexMap.size() +
+ ",E=" + getEdgeCount() + ")";
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int vertices = input.readInt();
+ for (int i = 0; i < vertices; ++i) {
+ BasicVertex<I, V, E, M> vertex =
+ BspUtils.<I, V, E, M>createVertex(conf);
+ vertex.readFields(input);
+ if (vertexMap.put(vertex.getVertexId(),
+ (BasicVertex<I, V, E, M>) vertex) != null) {
+ throw new IllegalStateException(
+ "readFields: " + this +
+ " already has same id " + vertex);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(vertexMap.size());
+ for (BasicVertex vertex : vertexMap.values()) {
+ vertex.write(output);
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java Thu Feb 16 22:12:31 2012
@@ -35,234 +35,254 @@ import org.apache.log4j.Logger;
* Helper class for balancing partitions across a set of workers.
*/
public class PartitionBalancer {
- /** Partition balancing algorithm */
- public static final String PARTITION_BALANCE_ALGORITHM =
- "hash.partitionBalanceAlgorithm";
- public static final String STATIC_BALANCE_ALGORITHM =
- "static";
- public static final String EGDE_BALANCE_ALGORITHM =
- "edges";
- public static final String VERTICES_BALANCE_ALGORITHM =
- "vertices";
- /** Class logger */
- private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
-
- /**
- * What value to balance partitions with? Edges, vertices?
- */
- private enum BalanceValue {
- UNSET,
- EDGES,
- VERTICES
+ /** Partition balancing algorithm */
+ public static final String PARTITION_BALANCE_ALGORITHM =
+ "hash.partitionBalanceAlgorithm";
+ /** No rebalancing during the supersteps */
+ public static final String STATIC_BALANCE_ALGORITHM =
+ "static";
+ /** Rebalance across supersteps by edges */
+ public static final String EGDE_BALANCE_ALGORITHM =
+ "edges";
+ /** Rebalance across supersteps by vertices */
+ public static final String VERTICES_BALANCE_ALGORITHM =
+ "vertices";
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
+
+ /**
+ * What value to balance partitions with? Edges, vertices?
+ */
+ private enum BalanceValue {
+ /** Not chosen */
+ UNSET,
+ /** Balance with edges */
+ EDGES,
+ /** Balance with vertices */
+ VERTICES
+ }
+
+ /**
+ * Do not construct this class.
+ */
+ private PartitionBalancer() { }
+
+ /**
+ * Get the value used to balance.
+ *
+ * @param partitionStat Stats of this partition.
+ * @param balanceValue Type of the value to balance.
+ * @return Balance value.
+ */
+ private static long getBalanceValue(PartitionStats partitionStat,
+ BalanceValue balanceValue) {
+ switch (balanceValue) {
+ case EDGES:
+ return partitionStat.getEdgeCount();
+ case VERTICES:
+ return partitionStat.getVertexCount();
+ default:
+ throw new IllegalArgumentException(
+ "getBalanceValue: Illegal balance value " + balanceValue);
}
+ }
+
+ /**
+ * Used to sort the partition owners from lowest value to highest value
+ */
+ private static class PartitionOwnerComparator implements
+ Comparator<PartitionOwner> {
+ /** Map of owner to stats */
+ private final Map<PartitionOwner, PartitionStats> ownerStatMap;
+ /** Value type to compare on */
+ private final BalanceValue balanceValue;
+
/**
- * Get the value used to balance.
+ * Only constructor.
*
- * @param partitionStat
- * @param balanceValue
- * @return
+ * @param ownerStatMap Map of owners to stats.
+ * @param balanceValue Value to balance with.
*/
- private static long getBalanceValue(PartitionStats partitionStat,
- BalanceValue balanceValue) {
- switch (balanceValue) {
- case EDGES:
- return partitionStat.getEdgeCount();
- case VERTICES:
- return partitionStat.getVertexCount();
- default:
- throw new IllegalArgumentException(
- "getBalanceValue: Illegal balance value " + balanceValue);
- }
+ public PartitionOwnerComparator(
+ Map<PartitionOwner, PartitionStats> ownerStatMap,
+ BalanceValue balanceValue) {
+ this.ownerStatMap = ownerStatMap;
+ this.balanceValue = balanceValue;
+ }
+
+ @Override
+ public int compare(PartitionOwner owner1, PartitionOwner owner2) {
+ return (int)
+ (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
+ getBalanceValue(ownerStatMap.get(owner2), balanceValue));
}
+ }
+
+ /**
+ * Structure to keep track of how much value a {@link WorkerInfo} has
+ * been assigned.
+ */
+ private static class WorkerInfoAssignments implements
+ Comparable<WorkerInfoAssignments> {
+ /** Worker info associated */
+ private final WorkerInfo workerInfo;
+ /** Balance value */
+ private final BalanceValue balanceValue;
+ /** Map of owner to stats */
+ private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
+ /** Current value of this object */
+ private long value = 0;
/**
- * Used to sort the partition owners from lowest value to highest value
+ * Constructor with final values.
+ *
+ * @param workerInfo Worker info for assignment.
+ * @param balanceValue Value used to balance.
+ * @param ownerStatsMap Map of owner to stats.
*/
- private static class PartitionOwnerComparator implements
- Comparator<PartitionOwner> {
- /** Map of owner to stats */
- private final Map<PartitionOwner, PartitionStats> ownerStatMap;
- /** Value type to compare on */
- private final BalanceValue balanceValue;
-
-
- /**
- * Only constructor.
- *
- * @param comparatorValue What to compare with?
- */
- public PartitionOwnerComparator(
- Map<PartitionOwner, PartitionStats> ownerStatMap,
- BalanceValue balanceValue) {
- this.ownerStatMap = ownerStatMap;
- this.balanceValue = balanceValue;
- }
-
- @Override
- public int compare(PartitionOwner owner1, PartitionOwner owner2) {
- return (int)
- (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
- getBalanceValue(ownerStatMap.get(owner2), balanceValue));
- }
+ public WorkerInfoAssignments(
+ WorkerInfo workerInfo,
+ BalanceValue balanceValue,
+ Map<PartitionOwner, PartitionStats> ownerStatsMap) {
+ this.workerInfo = workerInfo;
+ this.balanceValue = balanceValue;
+ this.ownerStatsMap = ownerStatsMap;
}
/**
- * Structure to keep track of how much value a {@link WorkerInfo} has
- * been assigned.
+ * Get the total value of all partitions assigned to this worker.
+ *
+ * @return Total value of all partition assignments.
*/
- private static class WorkerInfoAssignments implements
- Comparable<WorkerInfoAssignments> {
- /** Worker info associated */
- private final WorkerInfo workerInfo;
- /** Balance value */
- private final BalanceValue balanceValue;
- /** Map of owner to stats */
- private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
- /** Current value of this object */
- private long value = 0;
-
- public WorkerInfoAssignments(
- WorkerInfo workerInfo,
- BalanceValue balanceValue,
- Map<PartitionOwner, PartitionStats> ownerStatsMap) {
- this.workerInfo = workerInfo;
- this.balanceValue = balanceValue;
- this.ownerStatsMap = ownerStatsMap;
- }
-
- /**
- * Get the total value of all partitions assigned to this worker.
- *
- * @return Total value of all partition assignments.
- */
- public long getValue() {
- return value;
- }
-
- /**
- * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
- *
- * @param partitionOwner PartitionOwner to assign.
- */
- public void assignPartitionOwner(
- PartitionOwner partitionOwner) {
- value += getBalanceValue(ownerStatsMap.get(partitionOwner),
- balanceValue);
- if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
- partitionOwner.setPreviousWorkerInfo(
- partitionOwner.getWorkerInfo());
- partitionOwner.setWorkerInfo(workerInfo);
- } else {
- partitionOwner.setPreviousWorkerInfo(null);
- }
- }
-
- @Override
- public int compareTo(WorkerInfoAssignments other) {
- return (int)
- (getValue() - ((WorkerInfoAssignments) other).getValue());
- }
+ public long getValue() {
+ return value;
}
/**
- * Balance the partitions with an algorithm based on a value.
+ * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
*
- * @param conf Configuration to find the algorithm
- * @param allPartitionStats All the partition stats
- * @param availableWorkerInfos All the available workers
- * @return Balanced partition owners
+ * @param partitionOwner PartitionOwner to assign.
*/
- public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
- Configuration conf,
- Collection<PartitionOwner> partitionOwners,
- Collection<PartitionStats> allPartitionStats,
- Collection<WorkerInfo> availableWorkerInfos) {
-
- String balanceAlgorithm =
- conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
- if (LOG.isInfoEnabled()) {
- LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
- balanceAlgorithm);
- }
- BalanceValue balanceValue = BalanceValue.UNSET;
- if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
- return partitionOwners;
- } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
- balanceValue = BalanceValue.EDGES;
- } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
- balanceValue = BalanceValue.VERTICES;
- } else {
- throw new IllegalArgumentException(
- "balancePartitionsAcrossWorkers: Illegal balance " +
- "algorithm - " + balanceAlgorithm);
- }
-
- // Join the partition stats and partition owners by partition id
- Map<Integer, PartitionStats> idStatMap =
- new HashMap<Integer, PartitionStats>();
- for (PartitionStats partitionStats : allPartitionStats) {
- if (idStatMap.put(partitionStats.getPartitionId(), partitionStats)
- != null) {
- throw new IllegalStateException(
- "balancePartitionsAcrossWorkers: Duplicate partition id " +
- "for " + partitionStats);
- }
- }
- Map<PartitionOwner, PartitionStats> ownerStatsMap =
- new HashMap<PartitionOwner, PartitionStats>();
- for (PartitionOwner partitionOwner : partitionOwners) {
- PartitionStats partitionStats =
- idStatMap.get(partitionOwner.getPartitionId());
- if (partitionStats == null) {
- throw new IllegalStateException(
- "balancePartitionsAcrossWorkers: Missing partition " +
- "stats for " + partitionOwner);
- }
- if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
- throw new IllegalStateException(
- "balancePartitionsAcrossWorkers: Duplicate partition " +
- "owner " + partitionOwner);
- }
- }
- if (ownerStatsMap.size() != partitionOwners.size()) {
- throw new IllegalStateException(
- "balancePartitionsAcrossWorkers: ownerStats count = " +
- ownerStatsMap.size() + ", partitionOwners count = " +
- partitionOwners.size() + " and should match.");
- }
-
- List<WorkerInfoAssignments> workerInfoAssignmentsList =
- new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
- for (WorkerInfo workerInfo : availableWorkerInfos) {
- workerInfoAssignmentsList.add(
- new WorkerInfoAssignments(
- workerInfo, balanceValue, ownerStatsMap));
- }
-
- // A simple heuristic for balancing the partitions across the workers
- // using a value (edges, vertices). An improvement would be to
- // take into account the already existing partition worker assignments.
- // 1. Sort the partitions by size
- // 2. Place the workers in a min heap sorted by their total balance
- // value.
- // 3. From largest partition to the smallest, take the partition
- // worker at the top of the heap, add the partition to it, and
- // then put it back in the heap
- List<PartitionOwner> partitionOwnerList =
- new ArrayList<PartitionOwner>(partitionOwners);
- Collections.sort(partitionOwnerList,
- Collections.reverseOrder(
- new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
- PriorityQueue<WorkerInfoAssignments> minQueue =
- new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
- for (PartitionOwner partitionOwner : partitionOwnerList) {
- WorkerInfoAssignments chosenWorker = minQueue.remove();
- chosenWorker.assignPartitionOwner(partitionOwner);
- minQueue.add(chosenWorker);
- }
+ public void assignPartitionOwner(
+ PartitionOwner partitionOwner) {
+ value += getBalanceValue(ownerStatsMap.get(partitionOwner),
+ balanceValue);
+ if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
+ partitionOwner.setPreviousWorkerInfo(
+ partitionOwner.getWorkerInfo());
+ partitionOwner.setWorkerInfo(workerInfo);
+ } else {
+ partitionOwner.setPreviousWorkerInfo(null);
+ }
+ }
- return partitionOwnerList;
+ @Override
+ public int compareTo(WorkerInfoAssignments other) {
+ return (int)
+ (getValue() - ((WorkerInfoAssignments) other).getValue());
}
+ }
+
+ /**
+ * Balance the partitions with an algorithm based on a value.
+ *
+ * @param conf Configuration to find the algorithm
+ * @param partitionOwners All the owners of all partitions
+ * @param allPartitionStats All the partition stats
+ * @param availableWorkerInfos All the available workers
+ * @return Balanced partition owners
+ */
+ public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
+ Configuration conf,
+ Collection<PartitionOwner> partitionOwners,
+ Collection<PartitionStats> allPartitionStats,
+ Collection<WorkerInfo> availableWorkerInfos) {
+
+ String balanceAlgorithm =
+ conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
+ balanceAlgorithm);
+ }
+ BalanceValue balanceValue = BalanceValue.UNSET;
+ if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
+ return partitionOwners;
+ } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
+ balanceValue = BalanceValue.EDGES;
+ } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
+ balanceValue = BalanceValue.VERTICES;
+ } else {
+ throw new IllegalArgumentException(
+ "balancePartitionsAcrossWorkers: Illegal balance " +
+ "algorithm - " + balanceAlgorithm);
+ }
+
+ // Join the partition stats and partition owners by partition id
+ Map<Integer, PartitionStats> idStatMap =
+ new HashMap<Integer, PartitionStats>();
+ for (PartitionStats partitionStats : allPartitionStats) {
+ if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
+ null) {
+ throw new IllegalStateException(
+ "balancePartitionsAcrossWorkers: Duplicate partition id " +
+ "for " + partitionStats);
+ }
+ }
+ Map<PartitionOwner, PartitionStats> ownerStatsMap =
+ new HashMap<PartitionOwner, PartitionStats>();
+ for (PartitionOwner partitionOwner : partitionOwners) {
+ PartitionStats partitionStats =
+ idStatMap.get(partitionOwner.getPartitionId());
+ if (partitionStats == null) {
+ throw new IllegalStateException(
+ "balancePartitionsAcrossWorkers: Missing partition " +
+ "stats for " + partitionOwner);
+ }
+ if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
+ throw new IllegalStateException(
+ "balancePartitionsAcrossWorkers: Duplicate partition " +
+ "owner " + partitionOwner);
+ }
+ }
+ if (ownerStatsMap.size() != partitionOwners.size()) {
+ throw new IllegalStateException(
+ "balancePartitionsAcrossWorkers: ownerStats count = " +
+ ownerStatsMap.size() + ", partitionOwners count = " +
+ partitionOwners.size() + " and should match.");
+ }
+
+ List<WorkerInfoAssignments> workerInfoAssignmentsList =
+ new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
+ for (WorkerInfo workerInfo : availableWorkerInfos) {
+ workerInfoAssignmentsList.add(
+ new WorkerInfoAssignments(
+ workerInfo, balanceValue, ownerStatsMap));
+ }
+
+ // A simple heuristic for balancing the partitions across the workers
+ // using a value (edges, vertices). An improvement would be to
+ // take into account the already existing partition worker assignments.
+ // 1. Sort the partitions by size
+ // 2. Place the workers in a min heap sorted by their total balance
+ // value.
+ // 3. From largest partition to the smallest, take the partition
+ // worker at the top of the heap, add the partition to it, and
+ // then put it back in the heap
+ List<PartitionOwner> partitionOwnerList =
+ new ArrayList<PartitionOwner>(partitionOwners);
+ Collections.sort(partitionOwnerList,
+ Collections.reverseOrder(
+ new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
+ PriorityQueue<WorkerInfoAssignments> minQueue =
+ new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
+ for (PartitionOwner partitionOwner : partitionOwnerList) {
+ WorkerInfoAssignments chosenWorker = minQueue.remove();
+ chosenWorker.assignPartitionOwner(partitionOwner);
+ minQueue.add(chosenWorker);
+ }
+
+ return partitionOwnerList;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java Thu Feb 16 22:12:31 2012
@@ -29,49 +29,49 @@ import org.apache.giraph.graph.WorkerInf
* exchange between workers.
*/
public class PartitionExchange {
- /** Workers that I am dependent on before I can continue */
- private final Set<WorkerInfo> myDependencyWorkerSet;
- /** Workers that I need to sent partitions to */
- private final Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap;
+ /** Workers that I am dependent on before I can continue */
+ private final Set<WorkerInfo> myDependencyWorkerSet;
+ /** Workers that I need to sent partitions to */
+ private final Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap;
- /**
- * Only constructor.
- *
- * @param myDependencyWorkerSet All the workers I must wait for
- * @param sendWorkerPartitionMap Partitions I need to send to other workers
- */
- public PartitionExchange(
- Set<WorkerInfo> myDependencyWorkerSet,
- Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap) {
- this.myDependencyWorkerSet = myDependencyWorkerSet;
- this.sendWorkerPartitionMap = sendWorkerPartitionMap;
- }
+ /**
+ * Only constructor.
+ *
+ * @param myDependencyWorkerSet All the workers I must wait for
+ * @param sendWorkerPartitionMap Partitions I need to send to other workers
+ */
+ public PartitionExchange(
+ Set<WorkerInfo> myDependencyWorkerSet,
+ Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap) {
+ this.myDependencyWorkerSet = myDependencyWorkerSet;
+ this.sendWorkerPartitionMap = sendWorkerPartitionMap;
+ }
- /**
- * Get the workers that I must wait for
- *
- * @return Set of workers I must wait for
- */
- public Set<WorkerInfo> getMyDependencyWorkerSet() {
- return myDependencyWorkerSet;
- }
+ /**
+ * Get the workers that I must wait for
+ *
+ * @return Set of workers I must wait for
+ */
+ public Set<WorkerInfo> getMyDependencyWorkerSet() {
+ return myDependencyWorkerSet;
+ }
- /**
- * Get a mapping of worker to list of partition ids I need to send to.
- *
- * @return Mapping of worker to partition id list I will send to.
- */
- public Map<WorkerInfo, List<Integer>> getSendWorkerPartitionMap() {
- return sendWorkerPartitionMap;
- }
+ /**
+ * Get a mapping of worker to list of partition ids I need to send to.
+ *
+ * @return Mapping of worker to partition id list I will send to.
+ */
+ public Map<WorkerInfo, List<Integer>> getSendWorkerPartitionMap() {
+ return sendWorkerPartitionMap;
+ }
- /**
- * Is this worker involved in a partition exchange? Receiving or sending?
- *
- * @return True if needs to be involved in the exchange, false otherwise.
- */
- public boolean doExchange() {
- return !myDependencyWorkerSet.isEmpty() ||
- !sendWorkerPartitionMap.isEmpty();
- }
+ /**
+ * Is this worker involved in a partition exchange? Receiving or sending?
+ *
+ * @return True if needs to be involved in the exchange, false otherwise.
+ */
+ public boolean doExchange() {
+ return !myDependencyWorkerSet.isEmpty() ||
+ !sendWorkerPartitionMap.isEmpty();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java Thu Feb 16 22:12:31 2012
@@ -25,57 +25,57 @@ import org.apache.hadoop.io.Writable;
* Metadata about ownership of a partition.
*/
public interface PartitionOwner extends Writable {
- /**
- * Get the partition id that maps to the relevant {@link Partition} object
- *
- * @return Partition id
- */
- int getPartitionId();
-
- /**
- * Get the worker information that is currently responsible for
- * the partition id.
- *
- * @return Owning worker information.
- */
- WorkerInfo getWorkerInfo();
-
- /**
- * Set the current worker info.
- *
- * @param workerInfo Worker info responsible for partition
- */
- void setWorkerInfo(WorkerInfo workerInfo);
-
- /**
- * Get the worker information that was previously responsible for the
- * partition id.
- *
- * @return Owning worker information or null if no previous worker info.
- */
- WorkerInfo getPreviousWorkerInfo();
-
- /**
- * Set the previous worker info.
- *
- * @param workerInfo Worker info that was previously responsible for the
- * partition.
- */
- void setPreviousWorkerInfo(WorkerInfo workerInfo);
-
- /**
- * If this is a restarted checkpoint, the worker will use this information
- * to determine where the checkpointed partition was stored on HDFS.
- *
- * @return Prefix of the checkpoint HDFS files for this partition, null if
- * this is not a restarted superstep.
- */
- String getCheckpointFilesPrefix();
-
- /**
- * Set the checkpoint files prefix. Master uses this.
- *
- * @param checkpointFilesPrefix HDFS checkpoint file prefix
- */
- void setCheckpointFilesPrefix(String checkpointFilesPrefix);
+ /**
+ * Get the partition id that maps to the relevant {@link Partition} object
+ *
+ * @return Partition id
+ */
+ int getPartitionId();
+
+ /**
+ * Get the worker information that is currently responsible for
+ * the partition id.
+ *
+ * @return Owning worker information.
+ */
+ WorkerInfo getWorkerInfo();
+
+ /**
+ * Set the current worker info.
+ *
+ * @param workerInfo Worker info responsible for partition
+ */
+ void setWorkerInfo(WorkerInfo workerInfo);
+
+ /**
+ * Get the worker information that was previously responsible for the
+ * partition id.
+ *
+ * @return Owning worker information or null if no previous worker info.
+ */
+ WorkerInfo getPreviousWorkerInfo();
+
+ /**
+ * Set the previous worker info.
+ *
+ * @param workerInfo Worker info that was previously responsible for the
+ * partition.
+ */
+ void setPreviousWorkerInfo(WorkerInfo workerInfo);
+
+ /**
+ * If this is a restarted checkpoint, the worker will use this information
+ * to determine where the checkpointed partition was stored on HDFS.
+ *
+ * @return Prefix of the checkpoint HDFS files for this partition, null if
+ * this is not a restarted superstep.
+ */
+ String getCheckpointFilesPrefix();
+
+ /**
+ * Set the checkpoint files prefix. Master uses this.
+ *
+ * @param checkpointFilesPrefix HDFS checkpoint file prefix
+ */
+ void setCheckpointFilesPrefix(String checkpointFilesPrefix);
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java Thu Feb 16 22:12:31 2012
@@ -29,74 +29,125 @@ import org.apache.hadoop.io.Writable;
* actual partition data, only the statistics.
*/
public class PartitionStats implements Writable {
- private int partitionId = -1;
- private long vertexCount = 0;
- private long finishedVertexCount = 0;
- private long edgeCount = 0;
-
- public PartitionStats() {}
-
- public PartitionStats(int partitionId,
- long vertexCount,
- long finishedVertexCount,
- long edgeCount) {
- this.partitionId = partitionId;
- this.vertexCount = vertexCount;
- this.finishedVertexCount = finishedVertexCount;
- this.edgeCount = edgeCount;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public void incrVertexCount() {
- ++vertexCount;
- }
-
- public long getVertexCount() {
- return vertexCount;
- }
-
- public void incrFinishedVertexCount() {
- ++finishedVertexCount;
- }
-
- public long getFinishedVertexCount() {
- return finishedVertexCount;
- }
-
- public void addEdgeCount(long edgeCount) {
- this.edgeCount += edgeCount;
- }
-
- public long getEdgeCount() {
- return edgeCount;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- partitionId = input.readInt();
- vertexCount = input.readLong();
- finishedVertexCount = input.readLong();
- edgeCount = input.readLong();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeInt(partitionId);
- output.writeLong(vertexCount);
- output.writeLong(finishedVertexCount);
- output.writeLong(edgeCount);
- }
-
- @Override
- public String toString() {
- return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
- finishedVertexCount + ",edges=" + edgeCount + ")";
- }
+ /** Id of partition to keep stats for */
+ private int partitionId = -1;
+ /** Vertices in this partition */
+ private long vertexCount = 0;
+ /** Finished vertices in this partition */
+ private long finishedVertexCount = 0;
+ /** Edges in this partition */
+ private long edgeCount = 0;
+
+ /**
+ * Default constructor for reflection.
+ */
+ public PartitionStats() { }
+
+ /**
+ * Constructor with the initial stats.
+ *
+ * @param partitionId Partition count.
+ * @param vertexCount Vertex count.
+ * @param finishedVertexCount Finished vertex count.
+ * @param edgeCount Edge count.
+ */
+ public PartitionStats(int partitionId,
+ long vertexCount,
+ long finishedVertexCount,
+ long edgeCount) {
+ this.partitionId = partitionId;
+ this.vertexCount = vertexCount;
+ this.finishedVertexCount = finishedVertexCount;
+ this.edgeCount = edgeCount;
+ }
+
+ /**
+ * Set the partition id.
+ *
+ * @param partitionId New partition id.
+ */
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ /**
+ * Get partition id.
+ *
+ * @return Partition id.
+ */
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ /**
+ * Increment the vertex count by one.
+ */
+ public void incrVertexCount() {
+ ++vertexCount;
+ }
+
+ /**
+ * Get the vertex count.
+ *
+ * @return Vertex count.
+ */
+ public long getVertexCount() {
+ return vertexCount;
+ }
+
+ /**
+ * Increment the finished vertex count by one.
+ */
+ public void incrFinishedVertexCount() {
+ ++finishedVertexCount;
+ }
+
+ /**
+ * Get the finished vertex count.
+ *
+ * @return Finished vertex count.
+ */
+ public long getFinishedVertexCount() {
+ return finishedVertexCount;
+ }
+
+ /**
+ * Add edges to the edge count.
+ *
+ * @param edgeCount Number of edges to add.
+ */
+ public void addEdgeCount(long edgeCount) {
+ this.edgeCount += edgeCount;
+ }
+
+ /**
+ * Get the edge count.
+ *
+ * @return Edge count.
+ */
+ public long getEdgeCount() {
+ return edgeCount;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ partitionId = input.readInt();
+ vertexCount = input.readLong();
+ finishedVertexCount = input.readLong();
+ edgeCount = input.readLong();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(partitionId);
+ output.writeLong(vertexCount);
+ output.writeLong(finishedVertexCount);
+ output.writeLong(edgeCount);
+ }
+
+ @Override
+ public String toString() {
+ return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
+ finishedVertexCount + ",edges=" + edgeCount + ")";
+ }
}