You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/15 01:54:22 UTC
svn commit: r1201987 [5/5] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/...
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Used to keep track of statistics of every {@link Partition}. Contains no
+ * actual partition data, only the statistics.
+ */
+public class PartitionStats implements Writable {
+ private int partitionId = -1;
+ private long vertexCount = 0;
+ private long finishedVertexCount = 0;
+ private long edgeCount = 0;
+
+ public PartitionStats() {}
+
+ public PartitionStats(int partitionId,
+ long vertexCount,
+ long finishedVertexCount,
+ long edgeCount) {
+ this.partitionId = partitionId;
+ this.vertexCount = vertexCount;
+ this.finishedVertexCount = finishedVertexCount;
+ this.edgeCount = edgeCount;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void incrVertexCount() {
+ ++vertexCount;
+ }
+
+ public long getVertexCount() {
+ return vertexCount;
+ }
+
+ public void incrFinishedVertexCount() {
+ ++finishedVertexCount;
+ }
+
+ public long getFinishedVertexCount() {
+ return finishedVertexCount;
+ }
+
+ public void addEdgeCount(long edgeCount) {
+ this.edgeCount += edgeCount;
+ }
+
+ public long getEdgeCount() {
+ return edgeCount;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ partitionId = input.readInt();
+ vertexCount = input.readLong();
+ finishedVertexCount = input.readLong();
+ edgeCount = input.readLong();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(partitionId);
+ output.writeLong(vertexCount);
+ output.writeLong(finishedVertexCount);
+ output.writeLong(edgeCount);
+ }
+
+ @Override
+ public String toString() {
+ return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
+ finishedVertexCount + ",edges=" + edgeCount + ")";
+ }
+}
Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Helper class for {@link Partition} related operations.
+ */
+public class PartitionUtils {
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(PartitionUtils.class);
+
+ private static class EdgeCountComparator implements
+ Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+
+ @Override
+ public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+ Entry<WorkerInfo, VertexEdgeCount> worker2) {
+ return (int) (worker1.getValue().getEdgeCount() -
+ worker2.getValue().getEdgeCount());
+ }
+ }
+
+ private static class VertexCountComparator implements
+ Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+
+ @Override
+ public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+ Entry<WorkerInfo, VertexEdgeCount> worker2) {
+ return (int) (worker1.getValue().getEdgeCount() -
+ worker2.getValue().getEdgeCount());
+ }
+ }
+
+ /**
+ * Check for imbalances on a per worker basis, by calculating the
+ * mean, high and low workers by edges and vertices.
+ */
+ public static void analyzePartitionStats(
+ Collection<PartitionOwner> partitionOwnerList,
+ List<PartitionStats> allPartitionStats) {
+ Map<Integer, PartitionOwner> idOwnerMap =
+ new HashMap<Integer, PartitionOwner>();
+ for (PartitionOwner partitionOwner : partitionOwnerList) {
+ if (idOwnerMap.put(partitionOwner.getPartitionId(),
+ partitionOwner) != null) {
+ throw new IllegalStateException(
+ "analyzePartitionStats: Duplicate partition " +
+ partitionOwner);
+ }
+ }
+
+ Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
+ VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
+ for (PartitionStats partitionStats : allPartitionStats) {
+ WorkerInfo workerInfo =
+ idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
+ VertexEdgeCount vertexEdgeCount =
+ workerStatsMap.get(workerInfo);
+ if (vertexEdgeCount == null) {
+ workerStatsMap.put(
+ workerInfo,
+ new VertexEdgeCount(partitionStats.getVertexCount(),
+ partitionStats.getEdgeCount()));
+ } else {
+ workerStatsMap.put(
+ workerInfo,
+ vertexEdgeCount.incrVertexEdgeCount(
+ partitionStats.getVertexCount(),
+ partitionStats.getEdgeCount()));
+ }
+ totalVertexEdgeCount =
+ totalVertexEdgeCount.incrVertexEdgeCount(
+ partitionStats.getVertexCount(),
+ partitionStats.getEdgeCount());
+ }
+
+ List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
+ Lists.newArrayList(workerStatsMap.entrySet());
+
+ if (LOG.isInfoEnabled()) {
+ Collections.sort(workerEntryList, new VertexCountComparator());
+ LOG.info("analyzePartitionStats: Vertices - Mean: " +
+ (totalVertexEdgeCount.getVertexCount() /
+ workerStatsMap.size()) +
+ ", Min: " +
+ workerEntryList.get(0).getKey() + " - " +
+ workerEntryList.get(0).getValue().getVertexCount() +
+ ", Max: "+
+ workerEntryList.get(workerEntryList.size() - 1).getKey() +
+ " - " +
+ workerEntryList.get(workerEntryList.size() - 1).
+ getValue().getVertexCount());
+ Collections.sort(workerEntryList, new EdgeCountComparator());
+ LOG.info("analyzePartitionStats: Edges - Mean: " +
+ (totalVertexEdgeCount.getEdgeCount() /
+ workerStatsMap.size()) +
+ ", Min: " +
+ workerEntryList.get(0).getKey() + " - " +
+ workerEntryList.get(0).getValue().getEdgeCount() +
+ ", Max: "+
+ workerEntryList.get(workerEntryList.size() - 1).getKey() +
+ " - " +
+ workerEntryList.get(workerEntryList.size() - 1).
+ getValue().getEdgeCount());
+ }
+ }
+}
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Some functionality is provided, but this is meant for developers to
+ * determine the partitioning based on the actual types of data. The
+ * implementation of several methods are left to the developer who is trying
+ * to control the amount of messages sent from one worker to another.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangeMasterPartitioner<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> implements
+ MasterGraphPartitioner<I, V, E, M> {
+
+ @Override
+ public PartitionStats createPartitionStats() {
+ return new RangePartitionStats<I>();
+ }
+}
Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Added the max key index in to the {@link PartitionOwner}. Also can provide
+ * a split hint if desired.
+ *
+ * @param <I> Vertex index type
+ */
+@SuppressWarnings("rawtypes")
+public class RangePartitionOwner<I extends WritableComparable>
+ extends BasicPartitionOwner {
+ /** Max index for this partition */
+ private I maxIndex;
+
+ public RangePartitionOwner() {
+ }
+
+ public RangePartitionOwner(I maxIndex) {
+ this.maxIndex = maxIndex;
+ }
+
+ public I getMaxIndex() {
+ return maxIndex;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ maxIndex = BspUtils.<I>createVertexIndex(getConf());
+ maxIndex.readFields(input);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+ maxIndex.write(output);
+ }
+}
Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Same as {@link PartitionStats}, but also includes the hint for range-based
+ * partitioning.
+ *
+ * @param <I> Vertex index type
+ */
+@SuppressWarnings("rawtypes")
+public class RangePartitionStats<I extends WritableComparable>
+ extends PartitionStats {
+ /** Can be null if no hint, otherwise a splitting hint */
+ private RangeSplitHint<I> hint;
+
+ /**
+ * Get the range split hint (if any)
+ *
+ * @return Hint of how to split the range if desired, null otherwise
+ */
+ public RangeSplitHint<I> getRangeSplitHint() {
+ return hint;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ boolean hintExists = input.readBoolean();
+ if (hintExists) {
+ hint = new RangeSplitHint<I>();
+ hint.readFields(input);
+ } else {
+ hint = null;
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+ output.writeBoolean(hint != null);
+ if (hint != null) {
+ hint.write(output);
+ }
+ }
+}
Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Range partitioning will split the vertices by a key range based on a generic
+ * type. This allows vertices that have some locality with the vertex ids
+ * to reduce the amount of messages sent. The tradeoffs are that
+ * range partitioning is more susceptible to hot spots if the keys
+ * are not randomly distributed. Another negative is the user must implement
+ * some of the functionality around how to split the key range.
+ *
+ * See {@link RangeWorkerPartitioner}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangePartitionerFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements GraphPartitionerFactory<I, V, E, M> {
+}
Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Hint to the {@link RangeMasterPartitioner} about how a
+ * {@link RangePartitionOwner} can be split.
+ *
+ * @param <I> Vertex index to split around
+ */
+@SuppressWarnings("rawtypes")
+public class RangeSplitHint<I extends WritableComparable>
+ implements Writable, Configurable {
+ /** Hinted split index */
+ private I splitIndex;
+ /** Number of vertices in this range before the split */
+ private long preSplitVertexCount;
+ /** Number of vertices in this range after the split */
+ private long postSplitVertexCount;
+ /** Configuration */
+ private Configuration conf;
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ splitIndex = BspUtils.<I>createVertexIndex(conf);
+ splitIndex.readFields(input);
+ preSplitVertexCount = input.readLong();
+ postSplitVertexCount = input.readLong();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ splitIndex.write(output);
+ output.writeLong(preSplitVertexCount);
+ output.writeLong(postSplitVertexCount);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+}
Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Range partitioning will split the vertices by a key range based on a generic
+ * type. This allows vertices that have some locality with the vertex ids
+ * to reduce the amount of messages sent. The tradeoffs are that
+ * range partitioning is more susceptible to hot spots if the keys
+ * are not randomly distributed. Another negative is the user must implement
+ * some of the functionality around how to split the key range.
+ *
+ * Note: This implementation is incomplete, the developer must implement the
+ * various methods based on their index type.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangeWorkerPartitioner<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> implements
+ WorkerGraphPartitioner<I, V, E, M> {
+ /** Mapping of the vertex ids to the {@link PartitionOwner} */
+ private NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
+ new TreeMap<I, RangePartitionOwner<I>>();
+
+ @Override
+ public PartitionOwner createPartitionOwner() {
+ return new RangePartitionOwner<I>();
+ }
+
+ @Override
+ public PartitionOwner getPartitionOwner(I vertexId) {
+ // Find the partition owner based on the maximum partition id.
+ // If the vertex id exceeds any of the maximum partition ids, give
+ // it to the last one
+ if (vertexId == null) {
+ throw new IllegalArgumentException(
+ "getPartitionOwner: Illegal null vertex id");
+ }
+ I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
+ if (maxVertexIndex == null) {
+ return vertexRangeMap.lastEntry().getValue();
+ } else {
+ return vertexRangeMap.get(vertexId);
+ }
+ }
+
+ @Override
+ public Collection<? extends PartitionOwner> getPartitionOwners() {
+ return vertexRangeMap.values();
+ }
+}
Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Stores the {@link PartitionOwner} objects from the master and provides the
+ * mapping of vertex to {@link PartitionOwner}. Also generates the partition
+ * owner implementation.
+ */
+@SuppressWarnings("rawtypes")
+public interface WorkerGraphPartitioner<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Instantiate the {@link PartitionOwner} implementation used to read the
+ * master assignments.
+ *
+ * @return Instantiated {@link PartitionOwner} object
+ */
+ PartitionOwner createPartitionOwner();
+
+ /**
+ * Figure out the owner of a vertex
+ *
+ * @param vertexId Vertex id to get the partition for
+ * @return Correct partition owner
+ */
+ PartitionOwner getPartitionOwner(I vertexId);
+
+ /**
+ * At the end of a superstep, workers have {@link PartitionStats} generated
+ * for each of their partitions. This method will allow the user to
+ * modify or create their own {@link PartitionStats} interfaces to send to
+ * the master.
+ *
+ * @param workerPartitionStats Stats generated by the infrastructure during
+ * the superstep
+ * @param partitionMap Map of all the partitions owned by this worker
+ * (could be used to provide more useful stat information)
+ * @return Final partition stats
+ */
+ Collection<PartitionStats> finalizePartitionStats(
+ Collection<PartitionStats> workerPartitionStats,
+ Map<Integer, Partition<I, V, E, M>> partitionMap);
+
+ /**
+ * Get the partitions owners and update locally. Returns the partitions
+ * to send to other workers and other dependencies.
+ *
+ * @param myWorkerInfo Worker info.
+ * @param masterSetPartitionOwners Master set partition owners, received
+ * prior to beginning the superstep
+ * @param partitionMap Map of all the partitions owned by this worker
+ * (can be used to fill the return map of partitions to send)
+ * @return Information for the partition exchange.
+ */
+ PartitionExchange updatePartitionOwners(
+ WorkerInfo myWorkerInfo,
+ Collection<? extends PartitionOwner> masterSetPartitionOwners,
+ Map<Integer, Partition<I, V, E, M>> partitionMap);
+
+ /**
+ * Get a collection of the {@link PartitionOwner} objects.
+ *
+ * @return Collection of owners for every partition.
+ */
+ Collection<? extends PartitionOwner> getPartitionOwners();
+}
Propchange: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.zk.ZooKeeperExt.PathStat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Helper static methods for working with Writable objects.
+ */
+public class WritableUtils {
+ public static void readFieldsFromByteArray(
+ byte[] byteArray, Writable writableObject) {
+ DataInputStream inputStream =
+ new DataInputStream(new ByteArrayInputStream(byteArray));
+ try {
+ writableObject.readFields(inputStream);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "readFieldsFromByteArray: IOException", e);
+ }
+ }
+
+ public static void readFieldsFromZnode(ZooKeeperExt zkExt,
+ String zkPath,
+ boolean watch,
+ Stat stat,
+ Writable writableObject) {
+ try {
+ byte[] zkData = zkExt.getData(zkPath, false, stat);
+ readFieldsFromByteArray(zkData, writableObject);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "readFieldsFromZnode: KeeperException on " + zkPath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "readFieldsFromZnode: InterrruptedStateException on " + zkPath,
+ e);
+ }
+ }
+
+ public static byte[] writeToByteArray(Writable writableObject) {
+ ByteArrayOutputStream outputStream =
+ new ByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(outputStream);
+ try {
+ writableObject.write(output);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "writeToByteArray: IOStateException", e);
+ }
+ return outputStream.toByteArray();
+ }
+
+ public static PathStat writeToZnode(ZooKeeperExt zkExt,
+ String zkPath,
+ int version,
+ Writable writableObject) {
+ try {
+ byte[] byteArray = writeToByteArray(writableObject);
+ return zkExt.createOrSetExt(zkPath,
+ byteArray,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true,
+ version);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "writeToZnode: KeeperException on " + zkPath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "writeToZnode: InterruptedException on " + zkPath, e);
+ }
+ }
+
+ public static byte[] writeListToByteArray(
+ List<? extends Writable> writableList) {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(outputStream);
+ try {
+ output.writeInt(writableList.size());
+ for (Writable writable : writableList) {
+ writable.write(output);
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "writeListToByteArray: IOException", e);
+ }
+ return outputStream.toByteArray();
+ }
+
+ public static PathStat writeListToZnode(
+ ZooKeeperExt zkExt,
+ String zkPath,
+ int version,
+ List<? extends Writable> writableList) {
+ try {
+ return zkExt.createOrSetExt(
+ zkPath,
+ writeListToByteArray(writableList),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true,
+ version);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "writeListToZnode: KeeperException on " + zkPath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "writeListToZnode: InterruptedException on " + zkPath, e);
+ }
+ }
+
+ public static List<? extends Writable> readListFieldsFromByteArray(
+ byte[] byteArray,
+ Class<? extends Writable> writableClass,
+ Configuration conf) {
+ try {
+ DataInputStream inputStream =
+ new DataInputStream(new ByteArrayInputStream(byteArray));
+ int size = inputStream.readInt();
+ List<Writable> writableList = new ArrayList<Writable>(size);
+ for (int i = 0; i < size; ++i) {
+ Writable writable =
+ ReflectionUtils.newInstance(writableClass, conf);
+ writable.readFields(inputStream);
+ writableList.add(writable);
+ }
+ return writableList;
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "readListFieldsFromZnode: IOException", e);
+ }
+ }
+
+ public static List<? extends Writable> readListFieldsFromZnode(
+ ZooKeeperExt zkExt,
+ String zkPath,
+ boolean watch,
+ Stat stat,
+ Class<? extends Writable> writableClass,
+ Configuration conf) {
+ try {
+ byte[] zkData = zkExt.getData(zkPath, false, stat);
+ return readListFieldsFromByteArray(zkData, writableClass, conf);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "readListFieldsFromZnode: KeeperException on " + zkPath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "readListFieldsFromZnode: InterruptedException on " + zkPath,
+ e);
+ }
+ }
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java Tue Nov 15 00:54:20 2011
@@ -124,10 +124,10 @@ public class ZooKeeperExt extends ZooKee
public class PathStat {
private String path;
private Stat stat;
-
+
/**
* Put in results from createOrSet()
- *
+ *
* @param path Path to created znode (or null)
* @param stat Stat from set znode (if set)
*/
@@ -135,44 +135,44 @@ public class ZooKeeperExt extends ZooKee
this.path = path;
this.stat = stat;
}
-
+
/**
* Get the path of the created znode if it was created.
- *
+ *
* @return Path of created znode or null if not created
*/
public String getPath() {
return path;
}
-
+
/**
* Get the stat of the set znode if set
- *
+ *
* @return Stat of set znode or null if not set
*/
public Stat getStat() {
return stat;
}
}
-
+
/**
* Create a znode. Set the znode if the created znode already exists.
- *
+ *
* @param path path to create
* @param data data to set on the final znode
* @param acl acls on each znode created
* @param createMode only affects the final znode
* @param recursive if true, creates all ancestors
* @return Path of created znode or Stat of set znode
- * @throws InterruptedException
- * @throws KeeperException
+ * @throws InterruptedException
+ * @throws KeeperException
*/
public PathStat createOrSetExt(final String path,
byte data[],
List<ACL> acl,
CreateMode createMode,
boolean recursive,
- int version)
+ int version)
throws KeeperException, InterruptedException {
String createdPath = null;
Stat setStat = null;
@@ -188,6 +188,36 @@ public class ZooKeeperExt extends ZooKee
}
/**
+ * Create a znode if there is no other znode there
+ *
+ * @param path path to create
+ * @param data data to set on the final znode
+ * @param acl acls on each znode created
+ * @param createMode only affects the final znode
+ * @param recursive if true, creates all ancestors
+ * @return Path of created znode or Stat of set znode
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public PathStat createOnceExt(final String path,
+ byte data[],
+ List<ACL> acl,
+ CreateMode createMode,
+ boolean recursive)
+ throws KeeperException, InterruptedException {
+ String createdPath = null;
+ Stat setStat = null;
+ try {
+ createdPath = createExt(path, data, acl, createMode, recursive);
+ } catch (KeeperException.NodeExistsException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("createOnceExt: Node already exists on path " + path);
+ }
+ }
+ return new PathStat(createdPath, setStat);
+ }
+
+ /**
* Delete a path recursively. When the deletion is recursive, it is a
* non-atomic operation, hence, not part of ZooKeeper.
* @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.SimpleCheckpointVertex;
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.BasicPartitionOwner;
+import org.apache.giraph.graph.partition.HashMasterPartitioner;
+import org.apache.giraph.graph.partition.HashPartitionerFactory;
+import org.apache.giraph.graph.partition.HashRangePartitionerFactory;
+import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.PartitionBalancer;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for manual checkpoint restarting
+ */
+public class TestGraphPartitioner extends BspCase {
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public TestGraphPartitioner(String testName) {
+ super(testName);
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite() {
+ return new TestSuite(TestGraphPartitioner.class);
+ }
+
+ /**
+ * Example graph partitioner that builds on {@link HashMasterPartitioner} to
+ * send the partitions to the worker that matches the superstep.
+ */
+ @SuppressWarnings("rawtypes")
+ private static class SuperstepHashPartitionerFactory<
+ I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends HashPartitionerFactory<I, V, E, M> {
+
+ /**
+ * Changes the {@link HashMasterPartitioner} to make ownership of the
+ * partitions based on a superstep. For testing only as it is totally
+ * unbalanced.
+ *
+ * @param <I> vertex id
+ * @param <V> vertex data
+ * @param <E> edge data
+ * @param <M> message data
+ */
+ private static class SuperstepMasterPartition<
+ I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends HashMasterPartitioner<I, V, E, M> {
+ /** Class logger */
+ private static Logger LOG =
+ Logger.getLogger(SuperstepMasterPartition.class);
+
+ public SuperstepMasterPartition(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public Collection<PartitionOwner> generateChangedPartitionOwners(
+ Collection<PartitionStats> allPartitionStatsList,
+ Collection<WorkerInfo> availableWorkerInfos,
+ int maxWorkers,
+ long superstep) {
+ // Assign all the partitions to
+ // superstep mod availableWorkerInfos
+ // Guaranteed to be different if the workers (and their order)
+ // do not change
+ long workerIndex = superstep % availableWorkerInfos.size();
+ int i = 0;
+ WorkerInfo chosenWorkerInfo = null;
+ for (WorkerInfo workerInfo : availableWorkerInfos) {
+ if (workerIndex == i) {
+ chosenWorkerInfo = workerInfo;
+ }
+ ++i;
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("generateChangedPartitionOwners: Chosen worker " +
+ "for superstep " + superstep + " is " +
+ chosenWorkerInfo);
+ }
+
+ List<PartitionOwner> partitionOwnerList =
+ new ArrayList<PartitionOwner>();
+ for (PartitionOwner partitionOwner :
+ getCurrentPartitionOwners()) {
+ WorkerInfo prevWorkerinfo =
+ partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
+ null : partitionOwner.getWorkerInfo();
+ PartitionOwner tmpPartitionOwner =
+ new BasicPartitionOwner(partitionOwner.getPartitionId(),
+ chosenWorkerInfo,
+ prevWorkerinfo,
+ null);
+ partitionOwnerList.add(tmpPartitionOwner);
+ LOG.info("partition owner was " + partitionOwner +
+ ", new " + tmpPartitionOwner);
+ }
+ setPartitionOwnerList(partitionOwnerList);
+ return partitionOwnerList;
+ }
+ }
+
+ @Override
+ public MasterGraphPartitioner<I, V, E, M>
+ createMasterGraphPartitioner() {
+ return new SuperstepMasterPartition<I, V, E, M>(getConf());
+ }
+ }
+
+ /**
+ * Run a sample BSP job locally and test various partitioners and
+ * partition algorithms.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ public void testPartitioners()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ final int correctLen = 123;
+
+ GiraphJob job = new GiraphJob("testVertexBalancer");
+ setupConfiguration(job);
+ job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ job.getConfiguration().set(
+ PartitionBalancer.PARTITION_BALANCE_ALGORITHM,
+ PartitionBalancer.VERTICES_BALANCE_ALGORITHM);
+ Path outputPath = new Path("/tmp/testVertexBalancer");
+ removeAndSetOutput(job, outputPath);
+ assertTrue(job.run(true));
+ FileSystem hdfs = FileSystem.get(job.getConfiguration());
+ if (getJobTracker() != null) {
+ FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+ int totalLen = 0;
+ for (FileStatus fileStatus : fileStatusArr) {
+ if (fileStatus.getPath().toString().contains("/part-m-")) {
+ totalLen += fileStatus.getLen();
+ }
+ }
+ assertTrue(totalLen == correctLen);
+ }
+
+ job = new GiraphJob("testHashPartitioner");
+ setupConfiguration(job);
+ job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ outputPath = new Path("/tmp/testHashPartitioner");
+ removeAndSetOutput(job, outputPath);
+ assertTrue(job.run(true));
+ if (getJobTracker() != null) {
+ FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+ int totalLen = 0;
+ for (FileStatus fileStatus : fileStatusArr) {
+ if (fileStatus.getPath().toString().contains("/part-m-")) {
+ totalLen += fileStatus.getLen();
+ }
+ }
+ assertTrue(totalLen == correctLen);
+ }
+
+ job = new GiraphJob("testSuperstepHashPartitioner");
+ setupConfiguration(job);
+ job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ job.setGraphPartitionerFactoryClass(
+ SuperstepHashPartitionerFactory.class);
+ outputPath = new Path("/tmp/testSuperstepHashPartitioner");
+ removeAndSetOutput(job, outputPath);
+ assertTrue(job.run(true));
+ if (getJobTracker() != null) {
+ FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+ int totalLen = 0;
+ for (FileStatus fileStatus : fileStatusArr) {
+ if (fileStatus.getPath().toString().contains("/part-m-")) {
+ totalLen += fileStatus.getLen();
+ }
+ }
+ assertTrue(totalLen == correctLen);
+ }
+
+ job = new GiraphJob("testHashRangePartitioner");
+ setupConfiguration(job);
+ job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ job.setGraphPartitionerFactoryClass(
+ HashRangePartitionerFactory.class);
+ outputPath = new Path("/tmp/testHashRangePartitioner");
+ removeAndSetOutput(job, outputPath);
+ assertTrue(job.run(true));
+ if (getJobTracker() != null) {
+ FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+ int totalLen = 0;
+ for (FileStatus fileStatus : fileStatusArr) {
+ if (fileStatus.getPath().toString().contains("/part-m-")) {
+ totalLen += fileStatus.getLen();
+ }
+ }
+ assertTrue(totalLen == correctLen);
+ }
+
+ job = new GiraphJob("testReverseIdSuperstepHashPartitioner");
+ setupConfiguration(job);
+ job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ job.setGraphPartitionerFactoryClass(
+ SuperstepHashPartitionerFactory.class);
+ job.getConfiguration().setBoolean(
+ GeneratedVertexReader.REVERSE_ID_ORDER,
+ true);
+ outputPath = new Path("/tmp/testReverseIdSuperstepHashPartitioner");
+ removeAndSetOutput(job, outputPath);
+ assertTrue(job.run(true));
+ if (getJobTracker() != null) {
+ FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
+ int totalLen = 0;
+ for (FileStatus fileStatus : fileStatusArr) {
+ if (fileStatus.getPath().toString().contains("/part-m-")) {
+ totalLen += fileStatus.getLen();
+ }
+ }
+ assertTrue(totalLen == correctLen);
+ }
+ }
+}
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java Tue Nov 15 00:54:20 2011
@@ -25,6 +25,7 @@ import org.apache.giraph.examples.Simple
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat;
import org.apache.giraph.graph.GiraphJob;
+
import junit.framework.Test;
import junit.framework.TestSuite;
@@ -61,7 +62,7 @@ public class TestMutateGraphVertex exten
setupConfiguration(job);
job.setVertexClass(SimpleMutateGraphVertex.class);
job.setWorkerContextClass(
- SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class);
+ SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class);
job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
Path outputPath = new Path("/tmp/" + getCallingMethodName());