You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/15 01:54:22 UTC

svn commit: r1201987 [5/5] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/...

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Used to keep track of statistics of every {@link Partition}. Contains no
+ * actual partition data, only the statistics.
+ */
+public class PartitionStats implements Writable {
+    private int partitionId = -1;
+    private long vertexCount = 0;
+    private long finishedVertexCount = 0;
+    private long edgeCount = 0;
+
+    public PartitionStats() {}
+
+    public PartitionStats(int partitionId,
+                          long vertexCount,
+                          long finishedVertexCount,
+                          long edgeCount) {
+        this.partitionId = partitionId;
+        this.vertexCount = vertexCount;
+        this.finishedVertexCount = finishedVertexCount;
+        this.edgeCount = edgeCount;
+    }
+
+    public void setPartitionId(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public void incrVertexCount() {
+        ++vertexCount;
+    }
+
+    public long getVertexCount() {
+        return vertexCount;
+    }
+
+    public void incrFinishedVertexCount() {
+        ++finishedVertexCount;
+    }
+
+    public long getFinishedVertexCount() {
+        return finishedVertexCount;
+    }
+
+    public void addEdgeCount(long edgeCount) {
+        this.edgeCount += edgeCount;
+    }
+
+    public long getEdgeCount() {
+        return edgeCount;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        partitionId = input.readInt();
+        vertexCount = input.readLong();
+        finishedVertexCount = input.readLong();
+        edgeCount = input.readLong();
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.writeInt(partitionId);
+        output.writeLong(vertexCount);
+        output.writeLong(finishedVertexCount);
+        output.writeLong(edgeCount);
+    }
+
+    @Override
+    public String toString() {
+        return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
+               finishedVertexCount + ",edges=" + edgeCount + ")";
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Helper class for {@link Partition} related operations.
+ */
+public class PartitionUtils {
+    /** Class logger */
+    private static Logger LOG = Logger.getLogger(PartitionUtils.class);
+
+    private static class EdgeCountComparator implements
+            Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+
+        @Override
+        public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+                           Entry<WorkerInfo, VertexEdgeCount> worker2) {
+            return (int) (worker1.getValue().getEdgeCount() -
+                          worker2.getValue().getEdgeCount());
+        }
+    }
+
+    private static class VertexCountComparator implements
+            Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+
+        @Override
+        public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+                           Entry<WorkerInfo, VertexEdgeCount> worker2) {
+            return (int) (worker1.getValue().getEdgeCount() -
+                          worker2.getValue().getEdgeCount());
+        }
+    }
+
+    /**
+     * Check for imbalances on a per worker basis, by calculating the
+     * mean, high and low workers by edges and vertices.
+     */
+    public static void analyzePartitionStats(
+            Collection<PartitionOwner> partitionOwnerList,
+            List<PartitionStats> allPartitionStats) {
+        Map<Integer, PartitionOwner> idOwnerMap =
+            new HashMap<Integer, PartitionOwner>();
+        for (PartitionOwner partitionOwner : partitionOwnerList) {
+            if (idOwnerMap.put(partitionOwner.getPartitionId(),
+                               partitionOwner) != null) {
+                throw new IllegalStateException(
+                    "analyzePartitionStats: Duplicate partition " +
+                    partitionOwner);
+            }
+        }
+
+        Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
+        VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
+        for (PartitionStats partitionStats : allPartitionStats) {
+            WorkerInfo workerInfo =
+                idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
+            VertexEdgeCount vertexEdgeCount =
+                workerStatsMap.get(workerInfo);
+            if (vertexEdgeCount == null) {
+                workerStatsMap.put(
+                    workerInfo,
+                    new VertexEdgeCount(partitionStats.getVertexCount(),
+                                        partitionStats.getEdgeCount()));
+            } else {
+                workerStatsMap.put(
+                    workerInfo,
+                    vertexEdgeCount.incrVertexEdgeCount(
+                        partitionStats.getVertexCount(),
+                        partitionStats.getEdgeCount()));
+            }
+            totalVertexEdgeCount =
+                totalVertexEdgeCount.incrVertexEdgeCount(
+                    partitionStats.getVertexCount(),
+                    partitionStats.getEdgeCount());
+        }
+
+        List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
+            Lists.newArrayList(workerStatsMap.entrySet());
+
+        if (LOG.isInfoEnabled()) {
+            Collections.sort(workerEntryList, new VertexCountComparator());
+            LOG.info("analyzePartitionStats: Vertices - Mean: " +
+                    (totalVertexEdgeCount.getVertexCount() /
+                        workerStatsMap.size()) +
+                    ", Min: " +
+                    workerEntryList.get(0).getKey() + " - " +
+                    workerEntryList.get(0).getValue().getVertexCount() +
+                    ", Max: "+
+                    workerEntryList.get(workerEntryList.size() - 1).getKey() +
+                    " - " +
+                    workerEntryList.get(workerEntryList.size() - 1).
+                    getValue().getVertexCount());
+            Collections.sort(workerEntryList, new EdgeCountComparator());
+            LOG.info("analyzePartitionStats: Edges - Mean: " +
+                     (totalVertexEdgeCount.getEdgeCount() /
+                         workerStatsMap.size()) +
+                     ", Min: " +
+                     workerEntryList.get(0).getKey() + " - " +
+                     workerEntryList.get(0).getValue().getEdgeCount() +
+                     ", Max: "+
+                     workerEntryList.get(workerEntryList.size() - 1).getKey() +
+                     " - " +
+                     workerEntryList.get(workerEntryList.size() - 1).
+                     getValue().getEdgeCount());
+        }
+    }
+}

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Some functionality is provided, but this is meant for developers to
+ * determine the partitioning based on the actual types of data.  The
+ * implementation of several methods are left to the developer who is trying
+ * to control the amount of messages sent from one worker to another.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangeMasterPartitioner<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable> implements
+        MasterGraphPartitioner<I, V, E, M> {
+
+    @Override
+    public PartitionStats createPartitionStats() {
+        return new RangePartitionStats<I>();
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Added the max key index in to the {@link PartitionOwner}.  Also can provide
+ * a split hint if desired.
+ *
+ * @param <I> Vertex index type
+ */
+@SuppressWarnings("rawtypes")
+public class RangePartitionOwner<I extends WritableComparable>
+        extends BasicPartitionOwner {
+    /** Max index for this partition */
+    private I maxIndex;
+
+    public RangePartitionOwner() {
+    }
+
+    public RangePartitionOwner(I maxIndex) {
+        this.maxIndex = maxIndex;
+    }
+
+    public I getMaxIndex() {
+        return maxIndex;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        maxIndex = BspUtils.<I>createVertexIndex(getConf());
+        maxIndex.readFields(input);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        super.write(output);
+        maxIndex.write(output);
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Same as {@link PartitionStats}, but also includes the hint for range-based
+ * partitioning.
+ *
+ * @param <I> Vertex index type
+ */
+@SuppressWarnings("rawtypes")
+public class RangePartitionStats<I extends WritableComparable>
+        extends PartitionStats {
+    /** Can be null if no hint, otherwise a splitting hint */
+    private RangeSplitHint<I> hint;
+
+    /**
+     * Get the range split hint (if any)
+     *
+     * @return Hint of how to split the range if desired, null otherwise
+     */
+    public RangeSplitHint<I> getRangeSplitHint() {
+        return hint;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        boolean hintExists = input.readBoolean();
+        if (hintExists) {
+            hint = new RangeSplitHint<I>();
+            hint.readFields(input);
+        } else {
+            hint = null;
+        }
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        super.write(output);
+        output.writeBoolean(hint != null);
+        if (hint != null) {
+            hint.write(output);
+        }
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Range partitioning will split the vertices by a key range based on a generic
+ * type.  This allows vertices that have some locality with the vertex ids
+ * to reduce the amount of messages sent.  The tradeoffs are that
+ * range partitioning is more susceptible to hot spots if the keys
+ * are not randomly distributed.  Another negative is the user must implement
+ * some of the functionality around how to split the key range.
+ *
+ * See {@link RangeWorkerPartitioner}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangePartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GraphPartitionerFactory<I, V, E, M> {
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Hint to the {@link RangeMasterPartitioner} about how a
+ * {@link RangePartitionOwner} can be split.
+ *
+ * @param <I> Vertex index to split around
+ */
+@SuppressWarnings("rawtypes")
+public class RangeSplitHint<I extends WritableComparable>
+        implements Writable, Configurable {
+    /** Hinted split index */
+    private I splitIndex;
+    /** Number of vertices in this range before the split */
+    private long preSplitVertexCount;
+    /** Number of vertices in this range after the split */
+    private long postSplitVertexCount;
+    /** Configuration */
+    private Configuration conf;
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        splitIndex = BspUtils.<I>createVertexIndex(conf);
+        splitIndex.readFields(input);
+        preSplitVertexCount = input.readLong();
+        postSplitVertexCount = input.readLong();
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        splitIndex.write(output);
+        output.writeLong(preSplitVertexCount);
+        output.writeLong(postSplitVertexCount);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Range partitioning will split the vertices by a key range based on a generic
+ * type.  This allows vertices that have some locality with the vertex ids
+ * to reduce the amount of messages sent.  The tradeoffs are that
+ * range partitioning is more susceptible to hot spots if the keys
+ * are not randomly distributed.  Another negative is the user must implement
+ * some of the functionality around how to split the key range.
+ * 
+ * Note:  This implementation is incomplete, the developer must implement the 
+ * various methods based on their index type.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangeWorkerPartitioner<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable> implements
+        WorkerGraphPartitioner<I, V, E, M> {
+    /** Mapping of the vertex ids to the {@link PartitionOwner} */
+    private NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
+        new TreeMap<I, RangePartitionOwner<I>>();
+
+    @Override
+    public PartitionOwner createPartitionOwner() {
+        return new RangePartitionOwner<I>();
+    }
+
+    @Override
+    public PartitionOwner getPartitionOwner(I vertexId) {
+        // Find the partition owner based on the maximum partition id.
+        // If the vertex id exceeds any of the maximum partition ids, give
+        // it to the last one
+        if (vertexId == null) {
+            throw new IllegalArgumentException(
+                "getPartitionOwner: Illegal null vertex id");
+        }
+        I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
+        if (maxVertexIndex == null) {
+            return vertexRangeMap.lastEntry().getValue();
+        } else {
+            return vertexRangeMap.get(vertexId);
+        }
+    }
+
+    @Override
+    public Collection<? extends PartitionOwner> getPartitionOwners() {
+        return vertexRangeMap.values();
+    }
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Stores the {@link PartitionOwner} objects from the master and provides the
+ * mapping of vertex to {@link PartitionOwner}. Also generates the partition
+ * owner implementation.
+ */
+@SuppressWarnings("rawtypes")
+public interface WorkerGraphPartitioner<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable> {
+    /**
+     * Instantiate the {@link PartitionOwner} implementation used to read the
+     * master assignments.
+     *
+     * @return Instantiated {@link PartitionOwner} object
+     */
+    PartitionOwner createPartitionOwner();
+
+    /**
+     * Figure out the owner of a vertex
+     *
+     * @param vertexId Vertex id to get the partition for
+     * @return Correct partition owner
+     */
+    PartitionOwner getPartitionOwner(I vertexId);
+
+    /**
+     * At the end of a superstep, workers have {@link PartitionStats} generated
+     * for each of their partitions.  This method will allow the user to
+     * modify or create their own {@link PartitionStats} interfaces to send to
+     * the master.
+     *
+     * @param workerPartitionStats Stats generated by the infrastructure during
+     *        the superstep
+     * @param partitionMap Map of all the partitions owned by this worker
+     *        (could be used to provide more useful stat information)
+     * @return Final partition stats
+     */
+    Collection<PartitionStats> finalizePartitionStats(
+            Collection<PartitionStats> workerPartitionStats,
+            Map<Integer, Partition<I, V, E, M>> partitionMap);
+
+    /**
+     * Get the partitions owners and update locally.  Returns the partitions
+     * to send to other workers and other dependencies.
+     *
+     * @param myWorkerInfo Worker info.
+     * @param masterSetPartitionOwners Master set partition owners, received
+     *        prior to beginning the superstep
+     * @param partitionMap Map of all the partitions owned by this worker
+     *        (can be used to fill the return map of partitions to send)
+     * @return Information for the partition exchange.
+     */
+    PartitionExchange updatePartitionOwners(
+            WorkerInfo myWorkerInfo,
+            Collection<? extends PartitionOwner> masterSetPartitionOwners,
+            Map<Integer, Partition<I, V, E, M>> partitionMap);
+
+    /**
+     * Get a collection of the {@link PartitionOwner} objects.
+     *
+     * @return Collection of owners for every partition.
+     */
+    Collection<? extends PartitionOwner> getPartitionOwners();
+}

Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.zk.ZooKeeperExt.PathStat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Helper static methods for working with Writable objects.
+ */
+public class WritableUtils {
+    public static void readFieldsFromByteArray(
+            byte[] byteArray, Writable writableObject) {
+        DataInputStream inputStream =
+            new DataInputStream(new ByteArrayInputStream(byteArray));
+        try {
+            writableObject.readFields(inputStream);
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                "readFieldsFromByteArray: IOException", e);
+        }
+    }
+
+    public static void readFieldsFromZnode(ZooKeeperExt zkExt,
+                                           String zkPath,
+                                           boolean watch,
+                                           Stat stat,
+                                           Writable writableObject) {
+        try {
+            byte[] zkData = zkExt.getData(zkPath, false, stat);
+            readFieldsFromByteArray(zkData, writableObject);
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+               "readFieldsFromZnode: KeeperException on " + zkPath, e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
+               "readFieldsFromZnode: InterrruptedStateException on " + zkPath,
+               e);
+        }
+    }
+
+    public static byte[] writeToByteArray(Writable writableObject) {
+        ByteArrayOutputStream outputStream =
+            new ByteArrayOutputStream();
+        DataOutput output = new DataOutputStream(outputStream);
+        try {
+            writableObject.write(output);
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                "writeToByteArray: IOStateException", e);
+        }
+        return outputStream.toByteArray();
+    }
+
+    public static PathStat writeToZnode(ZooKeeperExt zkExt,
+                                        String zkPath,
+                                        int version,
+                                        Writable writableObject) {
+        try {
+            byte[] byteArray = writeToByteArray(writableObject);
+            return zkExt.createOrSetExt(zkPath,
+                                        byteArray,
+                                        Ids.OPEN_ACL_UNSAFE,
+                                        CreateMode.PERSISTENT,
+                                        true,
+                                        version);
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+               "writeToZnode: KeeperException on " + zkPath, e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "writeToZnode: InterruptedException on " + zkPath, e);
+        }
+    }
+
+    public static byte[] writeListToByteArray(
+            List<? extends Writable> writableList) {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutput output = new DataOutputStream(outputStream);
+        try {
+            output.writeInt(writableList.size());
+            for (Writable writable : writableList) {
+                writable.write(output);
+            }
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                "writeListToByteArray: IOException", e);
+        }
+        return outputStream.toByteArray();
+    }
+
+    public static PathStat writeListToZnode(
+            ZooKeeperExt zkExt,
+            String zkPath,
+            int version,
+            List<? extends Writable> writableList) {
+        try {
+            return zkExt.createOrSetExt(
+                zkPath,
+                writeListToByteArray(writableList),
+                Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT,
+                true,
+                version);
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+               "writeListToZnode: KeeperException on " + zkPath, e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "writeListToZnode: InterruptedException on " + zkPath, e);
+        }
+    }
+
+    public static List<? extends Writable> readListFieldsFromByteArray(
+            byte[] byteArray,
+            Class<? extends Writable> writableClass,
+            Configuration conf) {
+        try {
+            DataInputStream inputStream =
+                new DataInputStream(new ByteArrayInputStream(byteArray));
+            int size = inputStream.readInt();
+            List<Writable> writableList = new ArrayList<Writable>(size);
+            for (int i = 0; i < size; ++i) {
+                Writable writable =
+                    ReflectionUtils.newInstance(writableClass, conf);
+                writable.readFields(inputStream);
+                writableList.add(writable);
+            }
+            return writableList;
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                    "readListFieldsFromZnode: IOException", e);
+        }
+    }
+
+    public static List<? extends Writable> readListFieldsFromZnode(
+            ZooKeeperExt zkExt,
+            String zkPath,
+            boolean watch,
+            Stat stat,
+            Class<? extends Writable> writableClass,
+            Configuration conf) {
+        try {
+            byte[] zkData = zkExt.getData(zkPath, false, stat);
+            return readListFieldsFromByteArray(zkData, writableClass, conf);
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+                "readListFieldsFromZnode: KeeperException on " + zkPath, e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "readListFieldsFromZnode: InterruptedException on " + zkPath,
+                e);
+        }
+    }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java Tue Nov 15 00:54:20 2011
@@ -124,10 +124,10 @@ public class ZooKeeperExt extends ZooKee
     public class PathStat {
         private String path;
         private Stat stat;
-        
+
         /**
          * Put in results from createOrSet()
-         * 
+         *
          * @param path Path to created znode (or null)
          * @param stat Stat from set znode (if set)
          */
@@ -135,44 +135,44 @@ public class ZooKeeperExt extends ZooKee
             this.path = path;
             this.stat = stat;
         }
-        
+
         /**
          * Get the path of the created znode if it was created.
-         * 
+         *
          * @return Path of created znode or null if not created
          */
         public String getPath() {
             return path;
         }
-        
+
         /**
          * Get the stat of the set znode if set
-         * 
+         *
          * @return Stat of set znode or null if not set
          */
         public Stat getStat() {
             return stat;
         }
     }
-    
+
     /**
      * Create a znode.  Set the znode if the created znode already exists.
-     * 
+     *
      * @param path path to create
      * @param data data to set on the final znode
      * @param acl acls on each znode created
      * @param createMode only affects the final znode
      * @param recursive if true, creates all ancestors
      * @return Path of created znode or Stat of set znode
-     * @throws InterruptedException 
-     * @throws KeeperException 
+     * @throws InterruptedException
+     * @throws KeeperException
      */
     public PathStat createOrSetExt(final String path,
                                    byte data[],
                                    List<ACL> acl,
                                    CreateMode createMode,
                                    boolean recursive,
-                                   int version) 
+                                   int version)
             throws KeeperException, InterruptedException {
         String createdPath = null;
         Stat setStat = null;
@@ -188,6 +188,36 @@ public class ZooKeeperExt extends ZooKee
     }
 
     /**
+     * Create a znode if there is no other znode there
+     *
+     * @param path path to create
+     * @param data data to set on the final znode
+     * @param acl acls on each znode created
+     * @param createMode only affects the final znode
+     * @param recursive if true, creates all ancestors
+     * @return Path of created znode or Stat of set znode
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    public PathStat createOnceExt(final String path,
+                                   byte data[],
+                                   List<ACL> acl,
+                                   CreateMode createMode,
+                                   boolean recursive)
+            throws KeeperException, InterruptedException {
+        String createdPath = null;
+        Stat setStat = null;
+        try {
+            createdPath = createExt(path, data, acl, createMode, recursive);
+        } catch (KeeperException.NodeExistsException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("createOnceExt: Node already exists on path " + path);
+            }
+        }
+        return new PathStat(createdPath, setStat);
+    }
+
+    /**
      * Delete a path recursively.  When the deletion is recursive, it is a
      * non-atomic operation, hence, not part of ZooKeeper.
      * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)

Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.SimpleCheckpointVertex;
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.BasicPartitionOwner;
+import org.apache.giraph.graph.partition.HashMasterPartitioner;
+import org.apache.giraph.graph.partition.HashPartitionerFactory;
+import org.apache.giraph.graph.partition.HashRangePartitionerFactory;
+import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.PartitionBalancer;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for manual checkpoint restarting
+ */
+public class TestGraphPartitioner extends BspCase {
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    public TestGraphPartitioner(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(TestGraphPartitioner.class);
+    }
+
+    /**
+     * Example graph partitioner that builds on {@link HashMasterPartitioner} to
+     * send the partitions to the worker that matches the superstep.
+     */
+    @SuppressWarnings("rawtypes")
+    private static class SuperstepHashPartitionerFactory<
+            I extends WritableComparable,
+            V extends Writable, E extends Writable, M extends Writable>
+            extends HashPartitionerFactory<I, V, E, M> {
+
+        /**
+         * Changes the {@link HashMasterPartitioner} to make ownership of the
+         * partitions based on a superstep.  For testing only as it is totally
+         * unbalanced.
+         *
+         * @param <I> vertex id
+         * @param <V> vertex data
+         * @param <E> edge data
+         * @param <M> message data
+         */
+        private static class SuperstepMasterPartition<
+                I extends WritableComparable,
+                V extends Writable, E extends Writable, M extends Writable>
+                extends HashMasterPartitioner<I, V, E, M> {
+            /** Class logger */
+            private static Logger LOG =
+                Logger.getLogger(SuperstepMasterPartition.class);
+
+            public SuperstepMasterPartition(Configuration conf) {
+                super(conf);
+            }
+
+            @Override
+            public Collection<PartitionOwner> generateChangedPartitionOwners(
+                    Collection<PartitionStats> allPartitionStatsList,
+                    Collection<WorkerInfo> availableWorkerInfos,
+                    int maxWorkers,
+                    long superstep) {
+                // Assign all the partitions to
+                // superstep mod availableWorkerInfos
+                // Guaranteed to be different if the workers (and their order)
+                // do not change
+                long workerIndex = superstep % availableWorkerInfos.size();
+                int i = 0;
+                WorkerInfo chosenWorkerInfo = null;
+                for (WorkerInfo workerInfo : availableWorkerInfos) {
+                    if (workerIndex == i) {
+                        chosenWorkerInfo = workerInfo;
+                    }
+                    ++i;
+                }
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("generateChangedPartitionOwners: Chosen worker " +
+                             "for superstep " + superstep + " is " +
+                             chosenWorkerInfo);
+                }
+
+                List<PartitionOwner> partitionOwnerList =
+                    new ArrayList<PartitionOwner>();
+                for (PartitionOwner partitionOwner :
+                        getCurrentPartitionOwners()) {
+                    WorkerInfo prevWorkerinfo =
+                        partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
+                            null : partitionOwner.getWorkerInfo();
+                    PartitionOwner tmpPartitionOwner =
+                        new BasicPartitionOwner(partitionOwner.getPartitionId(),
+                                                chosenWorkerInfo,
+                                                prevWorkerinfo,
+                                                null);
+                    partitionOwnerList.add(tmpPartitionOwner);
+                    LOG.info("partition owner was " + partitionOwner +
+                            ", new " + tmpPartitionOwner);
+                }
+                setPartitionOwnerList(partitionOwnerList);
+                return partitionOwnerList;
+            }
+        }
+
+        @Override
+        public MasterGraphPartitioner<I, V, E, M>
+                createMasterGraphPartitioner() {
+            return new SuperstepMasterPartition<I, V, E, M>(getConf());
+        }
+    }
+
+    /**
+     * Run a sample BSP job locally and test various partitioners and
+     * partition algorithms.
+     *
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    public void testPartitioners()
+            throws IOException, InterruptedException, ClassNotFoundException {
+        final int correctLen = 123;
+
+        GiraphJob job = new GiraphJob("testVertexBalancer");
+        setupConfiguration(job);
+        job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setWorkerContextClass(
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+        job.getConfiguration().set(
+            PartitionBalancer.PARTITION_BALANCE_ALGORITHM,
+            PartitionBalancer.VERTICES_BALANCE_ALGORITHM);
+        Path outputPath = new Path("/tmp/testVertexBalancer");
+        removeAndSetOutput(job, outputPath);
+        assertTrue(job.run(true));
+        FileSystem hdfs = FileSystem.get(job.getConfiguration());
+        if (getJobTracker() != null) {
+            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+            int totalLen = 0;
+            for (FileStatus fileStatus : fileStatusArr) {
+                if (fileStatus.getPath().toString().contains("/part-m-")) {
+                    totalLen += fileStatus.getLen();
+                }
+            }
+            assertTrue(totalLen == correctLen);
+        }
+
+        job = new GiraphJob("testHashPartitioner");
+        setupConfiguration(job);
+        job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setWorkerContextClass(
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+        outputPath = new Path("/tmp/testHashPartitioner");
+        removeAndSetOutput(job, outputPath);
+        assertTrue(job.run(true));
+        if (getJobTracker() != null) {
+            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+            int totalLen = 0;
+            for (FileStatus fileStatus : fileStatusArr) {
+                if (fileStatus.getPath().toString().contains("/part-m-")) {
+                    totalLen += fileStatus.getLen();
+                }
+            }
+            assertTrue(totalLen == correctLen);
+        }
+
+        job = new GiraphJob("testSuperstepHashPartitioner");
+        setupConfiguration(job);
+        job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setWorkerContextClass(
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+        job.setGraphPartitionerFactoryClass(
+            SuperstepHashPartitionerFactory.class);
+        outputPath = new Path("/tmp/testSuperstepHashPartitioner");
+        removeAndSetOutput(job, outputPath);
+        assertTrue(job.run(true));
+        if (getJobTracker() != null) {
+            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+            int totalLen = 0;
+            for (FileStatus fileStatus : fileStatusArr) {
+                if (fileStatus.getPath().toString().contains("/part-m-")) {
+                    totalLen += fileStatus.getLen();
+                }
+            }
+            assertTrue(totalLen == correctLen);
+        }
+
+        job = new GiraphJob("testHashRangePartitioner");
+        setupConfiguration(job);
+        job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setWorkerContextClass(
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+        job.setGraphPartitionerFactoryClass(
+            HashRangePartitionerFactory.class);
+        outputPath = new Path("/tmp/testHashRangePartitioner");
+        removeAndSetOutput(job, outputPath);
+        assertTrue(job.run(true));
+        if (getJobTracker() != null) {
+            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+            int totalLen = 0;
+            for (FileStatus fileStatus : fileStatusArr) {
+                if (fileStatus.getPath().toString().contains("/part-m-")) {
+                    totalLen += fileStatus.getLen();
+                }
+            }
+            assertTrue(totalLen == correctLen);
+        }
+
+        job = new GiraphJob("testReverseIdSuperstepHashPartitioner");
+        setupConfiguration(job);
+        job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setWorkerContextClass(
+            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+        job.setGraphPartitionerFactoryClass(
+            SuperstepHashPartitionerFactory.class);
+        job.getConfiguration().setBoolean(
+            GeneratedVertexReader.REVERSE_ID_ORDER,
+            true);
+        outputPath = new Path("/tmp/testReverseIdSuperstepHashPartitioner");
+        removeAndSetOutput(job, outputPath);
+        assertTrue(job.run(true));
+        if (getJobTracker() != null) {
+            FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+            int totalLen = 0;
+            for (FileStatus fileStatus : fileStatusArr) {
+                if (fileStatus.getPath().toString().contains("/part-m-")) {
+                    totalLen += fileStatus.getLen();
+                }
+            }
+            assertTrue(totalLen == correctLen);
+        }
+    }
+}

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java Tue Nov 15 00:54:20 2011
@@ -25,6 +25,7 @@ import org.apache.giraph.examples.Simple
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat;
 import org.apache.giraph.graph.GiraphJob;
+
 import junit.framework.Test;
 import junit.framework.TestSuite;
 
@@ -61,7 +62,7 @@ public class TestMutateGraphVertex exten
         setupConfiguration(job);
         job.setVertexClass(SimpleMutateGraphVertex.class);
         job.setWorkerContextClass(
-        		SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class);
+            SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class);
         job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         Path outputPath = new Path("/tmp/" + getCallingMethodName());