You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [15/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/example...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java Thu Feb 16 22:12:31 2012
@@ -37,102 +37,115 @@ import com.google.common.collect.Maps;
* Helper class for {@link Partition} related operations.
*/
public class PartitionUtils {
- /** Class logger */
- private static Logger LOG = Logger.getLogger(PartitionUtils.class);
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(PartitionUtils.class);
- private static class EdgeCountComparator implements
- Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+ /**
+ * Do not construct this object.
+ */
+ private PartitionUtils() { }
+
+ /**
+ * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects.
+ */
+ private static class EdgeCountComparator implements
+ Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+ @Override
+ public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+ Entry<WorkerInfo, VertexEdgeCount> worker2) {
+ return (int) (worker1.getValue().getEdgeCount() -
+ worker2.getValue().getEdgeCount());
+ }
+ }
- @Override
- public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
- Entry<WorkerInfo, VertexEdgeCount> worker2) {
- return (int) (worker1.getValue().getEdgeCount() -
- worker2.getValue().getEdgeCount());
- }
+ /**
+ * Compare vertex counts between a {@link WorkerInfo} and
+ * {@link VertexEdgeCount}.
+ */
+ private static class VertexCountComparator implements
+ Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+ @Override
+ public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+ Entry<WorkerInfo, VertexEdgeCount> worker2) {
+ return (int) (worker1.getValue().getVertexCount() -
+ worker2.getValue().getVertexCount());
}
+ }
- private static class VertexCountComparator implements
- Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+ /**
+ * Check for imbalances on a per worker basis, by calculating the
+ * mean, high and low workers by edges and vertices.
+ *
+ * @param partitionOwnerList List of partition owners.
+ * @param allPartitionStats All the partition stats.
+ */
+ public static void analyzePartitionStats(
+ Collection<PartitionOwner> partitionOwnerList,
+ List<PartitionStats> allPartitionStats) {
+ Map<Integer, PartitionOwner> idOwnerMap =
+ new HashMap<Integer, PartitionOwner>();
+ for (PartitionOwner partitionOwner : partitionOwnerList) {
+ if (idOwnerMap.put(partitionOwner.getPartitionId(),
+ partitionOwner) != null) {
+ throw new IllegalStateException(
+ "analyzePartitionStats: Duplicate partition " +
+ partitionOwner);
+ }
+ }
- @Override
- public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
- Entry<WorkerInfo, VertexEdgeCount> worker2) {
- return (int) (worker1.getValue().getEdgeCount() -
- worker2.getValue().getEdgeCount());
- }
+ Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
+ VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
+ for (PartitionStats partitionStats : allPartitionStats) {
+ WorkerInfo workerInfo =
+ idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
+ VertexEdgeCount vertexEdgeCount =
+ workerStatsMap.get(workerInfo);
+ if (vertexEdgeCount == null) {
+ workerStatsMap.put(
+ workerInfo,
+ new VertexEdgeCount(partitionStats.getVertexCount(),
+ partitionStats.getEdgeCount()));
+ } else {
+ workerStatsMap.put(
+ workerInfo,
+ vertexEdgeCount.incrVertexEdgeCount(
+ partitionStats.getVertexCount(),
+ partitionStats.getEdgeCount()));
+ }
+ totalVertexEdgeCount =
+ totalVertexEdgeCount.incrVertexEdgeCount(
+ partitionStats.getVertexCount(),
+ partitionStats.getEdgeCount());
}
- /**
- * Check for imbalances on a per worker basis, by calculating the
- * mean, high and low workers by edges and vertices.
- */
- public static void analyzePartitionStats(
- Collection<PartitionOwner> partitionOwnerList,
- List<PartitionStats> allPartitionStats) {
- Map<Integer, PartitionOwner> idOwnerMap =
- new HashMap<Integer, PartitionOwner>();
- for (PartitionOwner partitionOwner : partitionOwnerList) {
- if (idOwnerMap.put(partitionOwner.getPartitionId(),
- partitionOwner) != null) {
- throw new IllegalStateException(
- "analyzePartitionStats: Duplicate partition " +
- partitionOwner);
- }
- }
-
- Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
- VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
- for (PartitionStats partitionStats : allPartitionStats) {
- WorkerInfo workerInfo =
- idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
- VertexEdgeCount vertexEdgeCount =
- workerStatsMap.get(workerInfo);
- if (vertexEdgeCount == null) {
- workerStatsMap.put(
- workerInfo,
- new VertexEdgeCount(partitionStats.getVertexCount(),
- partitionStats.getEdgeCount()));
- } else {
- workerStatsMap.put(
- workerInfo,
- vertexEdgeCount.incrVertexEdgeCount(
- partitionStats.getVertexCount(),
- partitionStats.getEdgeCount()));
- }
- totalVertexEdgeCount =
- totalVertexEdgeCount.incrVertexEdgeCount(
- partitionStats.getVertexCount(),
- partitionStats.getEdgeCount());
- }
-
- List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
- Lists.newArrayList(workerStatsMap.entrySet());
-
- if (LOG.isInfoEnabled()) {
- Collections.sort(workerEntryList, new VertexCountComparator());
- LOG.info("analyzePartitionStats: Vertices - Mean: " +
- (totalVertexEdgeCount.getVertexCount() /
- workerStatsMap.size()) +
- ", Min: " +
- workerEntryList.get(0).getKey() + " - " +
- workerEntryList.get(0).getValue().getVertexCount() +
- ", Max: "+
- workerEntryList.get(workerEntryList.size() - 1).getKey() +
- " - " +
- workerEntryList.get(workerEntryList.size() - 1).
- getValue().getVertexCount());
- Collections.sort(workerEntryList, new EdgeCountComparator());
- LOG.info("analyzePartitionStats: Edges - Mean: " +
- (totalVertexEdgeCount.getEdgeCount() /
- workerStatsMap.size()) +
- ", Min: " +
- workerEntryList.get(0).getKey() + " - " +
- workerEntryList.get(0).getValue().getEdgeCount() +
- ", Max: "+
- workerEntryList.get(workerEntryList.size() - 1).getKey() +
- " - " +
- workerEntryList.get(workerEntryList.size() - 1).
- getValue().getEdgeCount());
- }
+ List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
+ Lists.newArrayList(workerStatsMap.entrySet());
+
+ if (LOG.isInfoEnabled()) {
+ Collections.sort(workerEntryList, new VertexCountComparator());
+ LOG.info("analyzePartitionStats: Vertices - Mean: " +
+ (totalVertexEdgeCount.getVertexCount() /
+ workerStatsMap.size()) +
+ ", Min: " +
+ workerEntryList.get(0).getKey() + " - " +
+ workerEntryList.get(0).getValue().getVertexCount() +
+ ", Max: " +
+ workerEntryList.get(workerEntryList.size() - 1).getKey() +
+ " - " +
+ workerEntryList.get(workerEntryList.size() - 1).
+ getValue().getVertexCount());
+ Collections.sort(workerEntryList, new EdgeCountComparator());
+ LOG.info("analyzePartitionStats: Edges - Mean: " +
+ (totalVertexEdgeCount.getEdgeCount() /
+ workerStatsMap.size()) +
+ ", Min: " +
+ workerEntryList.get(0).getKey() + " - " +
+ workerEntryList.get(0).getValue().getEdgeCount() +
+ ", Max: " +
+ workerEntryList.get(workerEntryList.size() - 1).getKey() +
+ " - " +
+ workerEntryList.get(workerEntryList.size() - 1).
+ getValue().getEdgeCount());
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java Thu Feb 16 22:12:31 2012
@@ -34,11 +34,10 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public abstract class RangeMasterPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- MasterGraphPartitioner<I, V, E, M> {
-
- @Override
- public PartitionStats createPartitionStats() {
- return new RangePartitionStats<I>();
- }
+ V extends Writable, E extends Writable, M extends Writable> implements
+ MasterGraphPartitioner<I, V, E, M> {
+ @Override
+ public PartitionStats createPartitionStats() {
+ return new RangePartitionStats<I>();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java Thu Feb 16 22:12:31 2012
@@ -33,31 +33,43 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public class RangePartitionOwner<I extends WritableComparable>
- extends BasicPartitionOwner {
- /** Max index for this partition */
- private I maxIndex;
+ extends BasicPartitionOwner {
+ /** Max index for this partition */
+ private I maxIndex;
- public RangePartitionOwner() {
- }
+ /**
+ * Default constructor.
+ */
+ public RangePartitionOwner() { }
- public RangePartitionOwner(I maxIndex) {
- this.maxIndex = maxIndex;
- }
+ /**
+ * Constructor with the max index.
+ *
+ * @param maxIndex Max index of this partition.
+ */
+ public RangePartitionOwner(I maxIndex) {
+ this.maxIndex = maxIndex;
+ }
- public I getMaxIndex() {
- return maxIndex;
- }
+ /**
+ * Get the maximum index of this partition owner.
+ *
+ * @return Maximum index.
+ */
+ public I getMaxIndex() {
+ return maxIndex;
+ }
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
- maxIndex = BspUtils.<I>createVertexIndex(getConf());
- maxIndex.readFields(input);
- }
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ maxIndex = BspUtils.<I>createVertexIndex(getConf());
+ maxIndex.readFields(input);
+ }
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
- maxIndex.write(output);
- }
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+ maxIndex.write(output);
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java Thu Feb 16 22:12:31 2012
@@ -32,37 +32,37 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public class RangePartitionStats<I extends WritableComparable>
- extends PartitionStats {
- /** Can be null if no hint, otherwise a splitting hint */
- private RangeSplitHint<I> hint;
+ extends PartitionStats {
+ /** Can be null if no hint, otherwise a splitting hint */
+ private RangeSplitHint<I> hint;
- /**
- * Get the range split hint (if any)
- *
- * @return Hint of how to split the range if desired, null otherwise
- */
- public RangeSplitHint<I> getRangeSplitHint() {
- return hint;
- }
+ /**
+ * Get the range split hint (if any)
+ *
+ * @return Hint of how to split the range if desired, null otherwise
+ */
+ public RangeSplitHint<I> getRangeSplitHint() {
+ return hint;
+ }
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
- boolean hintExists = input.readBoolean();
- if (hintExists) {
- hint = new RangeSplitHint<I>();
- hint.readFields(input);
- } else {
- hint = null;
- }
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ boolean hintExists = input.readBoolean();
+ if (hintExists) {
+ hint = new RangeSplitHint<I>();
+ hint.readFields(input);
+ } else {
+ hint = null;
}
+ }
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
- output.writeBoolean(hint != null);
- if (hint != null) {
- hint.write(output);
- }
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+ output.writeBoolean(hint != null);
+ if (hint != null) {
+ hint.write(output);
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java Thu Feb 16 22:12:31 2012
@@ -36,38 +36,38 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public class RangeSplitHint<I extends WritableComparable>
- implements Writable, Configurable {
- /** Hinted split index */
- private I splitIndex;
- /** Number of vertices in this range before the split */
- private long preSplitVertexCount;
- /** Number of vertices in this range after the split */
- private long postSplitVertexCount;
- /** Configuration */
- private Configuration conf;
+ implements Writable, Configurable {
+ /** Hinted split index */
+ private I splitIndex;
+ /** Number of vertices in this range before the split */
+ private long preSplitVertexCount;
+ /** Number of vertices in this range after the split */
+ private long postSplitVertexCount;
+ /** Configuration */
+ private Configuration conf;
- @Override
- public void readFields(DataInput input) throws IOException {
- splitIndex = BspUtils.<I>createVertexIndex(conf);
- splitIndex.readFields(input);
- preSplitVertexCount = input.readLong();
- postSplitVertexCount = input.readLong();
- }
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ splitIndex = BspUtils.<I>createVertexIndex(conf);
+ splitIndex.readFields(input);
+ preSplitVertexCount = input.readLong();
+ postSplitVertexCount = input.readLong();
+ }
- @Override
- public void write(DataOutput output) throws IOException {
- splitIndex.write(output);
- output.writeLong(preSplitVertexCount);
- output.writeLong(postSplitVertexCount);
- }
+ @Override
+ public void write(DataOutput output) throws IOException {
+ splitIndex.write(output);
+ output.writeLong(preSplitVertexCount);
+ output.writeLong(postSplitVertexCount);
+ }
- @Override
- public Configuration getConf() {
- return conf;
- }
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java Thu Feb 16 22:12:31 2012
@@ -43,36 +43,36 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public abstract class RangeWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- WorkerGraphPartitioner<I, V, E, M> {
- /** Mapping of the vertex ids to the {@link PartitionOwner} */
- protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
- new TreeMap<I, RangePartitionOwner<I>>();
+ V extends Writable, E extends Writable, M extends Writable> implements
+ WorkerGraphPartitioner<I, V, E, M> {
+ /** Mapping of the vertex ids to the {@link PartitionOwner} */
+ protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
+ new TreeMap<I, RangePartitionOwner<I>>();
- @Override
- public PartitionOwner createPartitionOwner() {
- return new RangePartitionOwner<I>();
- }
+ @Override
+ public PartitionOwner createPartitionOwner() {
+ return new RangePartitionOwner<I>();
+ }
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- // Find the partition owner based on the maximum partition id.
- // If the vertex id exceeds any of the maximum partition ids, give
- // it to the last one
- if (vertexId == null) {
- throw new IllegalArgumentException(
- "getPartitionOwner: Illegal null vertex id");
- }
- I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
- if (maxVertexIndex == null) {
- return vertexRangeMap.lastEntry().getValue();
- } else {
- return vertexRangeMap.get(vertexId);
- }
+ @Override
+ public PartitionOwner getPartitionOwner(I vertexId) {
+ // Find the partition owner based on the maximum partition id.
+ // If the vertex id exceeds any of the maximum partition ids, give
+ // it to the last one
+ if (vertexId == null) {
+ throw new IllegalArgumentException(
+ "getPartitionOwner: Illegal null vertex id");
}
-
- @Override
- public Collection<? extends PartitionOwner> getPartitionOwners() {
- return vertexRangeMap.values();
+ I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
+ if (maxVertexIndex == null) {
+ return vertexRangeMap.lastEntry().getValue();
+ } else {
+ return vertexRangeMap.get(vertexId);
}
+ }
+
+ @Override
+ public Collection<? extends PartitionOwner> getPartitionOwners() {
+ return vertexRangeMap.values();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java Thu Feb 16 22:12:31 2012
@@ -29,62 +29,67 @@ import org.apache.hadoop.io.WritableComp
* Stores the {@link PartitionOwner} objects from the master and provides the
* mapping of vertex to {@link PartitionOwner}. Also generates the partition
* owner implementation.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface WorkerGraphPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Instantiate the {@link PartitionOwner} implementation used to read the
- * master assignments.
- *
- * @return Instantiated {@link PartitionOwner} object
- */
- PartitionOwner createPartitionOwner();
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Instantiate the {@link PartitionOwner} implementation used to read the
+ * master assignments.
+ *
+ * @return Instantiated {@link PartitionOwner} object
+ */
+ PartitionOwner createPartitionOwner();
- /**
- * Figure out the owner of a vertex
- *
- * @param vertexId Vertex id to get the partition for
- * @return Correct partition owner
- */
- PartitionOwner getPartitionOwner(I vertexId);
+ /**
+ * Figure out the owner of a vertex
+ *
+ * @param vertexId Vertex id to get the partition for
+ * @return Correct partition owner
+ */
+ PartitionOwner getPartitionOwner(I vertexId);
- /**
- * At the end of a superstep, workers have {@link PartitionStats} generated
- * for each of their partitions. This method will allow the user to
- * modify or create their own {@link PartitionStats} interfaces to send to
- * the master.
- *
- * @param workerPartitionStats Stats generated by the infrastructure during
- * the superstep
- * @param partitionMap Map of all the partitions owned by this worker
- * (could be used to provide more useful stat information)
- * @return Final partition stats
- */
- Collection<PartitionStats> finalizePartitionStats(
- Collection<PartitionStats> workerPartitionStats,
- Map<Integer, Partition<I, V, E, M>> partitionMap);
+ /**
+ * At the end of a superstep, workers have {@link PartitionStats} generated
+ * for each of their partitions. This method will allow the user to
+ * modify or create their own {@link PartitionStats} interfaces to send to
+ * the master.
+ *
+ * @param workerPartitionStats Stats generated by the infrastructure during
+ * the superstep
+ * @param partitionMap Map of all the partitions owned by this worker
+ * (could be used to provide more useful stat information)
+ * @return Final partition stats
+ */
+ Collection<PartitionStats> finalizePartitionStats(
+ Collection<PartitionStats> workerPartitionStats,
+ Map<Integer, Partition<I, V, E, M>> partitionMap);
- /**
- * Get the partitions owners and update locally. Returns the partitions
- * to send to other workers and other dependencies.
- *
- * @param myWorkerInfo Worker info.
- * @param masterSetPartitionOwners Master set partition owners, received
- * prior to beginning the superstep
- * @param partitionMap Map of all the partitions owned by this worker
- * (can be used to fill the return map of partitions to send)
- * @return Information for the partition exchange.
- */
- PartitionExchange updatePartitionOwners(
- WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners,
- Map<Integer, Partition<I, V, E, M>> partitionMap);
+ /**
+ * Get the partitions owners and update locally. Returns the partitions
+ * to send to other workers and other dependencies.
+ *
+ * @param myWorkerInfo Worker info.
+ * @param masterSetPartitionOwners Master set partition owners, received
+ * prior to beginning the superstep
+ * @param partitionMap Map of all the partitions owned by this worker
+ * (can be used to fill the return map of partitions to send)
+ * @return Information for the partition exchange.
+ */
+ PartitionExchange updatePartitionOwners(
+ WorkerInfo myWorkerInfo,
+ Collection<? extends PartitionOwner> masterSetPartitionOwners,
+ Map<Integer, Partition<I, V, E, M>> partitionMap);
- /**
- * Get a collection of the {@link PartitionOwner} objects.
- *
- * @return Collection of owners for every partition.
- */
- Collection<? extends PartitionOwner> getPartitionOwners();
+ /**
+ * Get a collection of the {@link PartitionOwner} objects.
+ *
+ * @return Collection of owners for every partition.
+ */
+ Collection<? extends PartitionOwner> getPartitionOwners();
}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.giraph.bsp;
-
/**
- * State of the BSP application
+ * Package of partitioning related objects.
*/
-public enum ApplicationState {
- UNKNOWN, ///< Shouldn't be seen, just an initial state
- START_SUPERSTEP, ///< Start from a desired superstep
- FAILED, ///< Unrecoverable
- FINISHED ///< Successful completion
-}
+package org.apache.giraph.graph.partition;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java Thu Feb 16 22:12:31 2012
@@ -23,18 +23,21 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.authorize.Service;
/**
- * {@link PolicyProvider} for Map-Reduce protocols.
- */
+ * {@link PolicyProvider} for Map-Reduce protocols.
+ */
public class BspPolicyProvider extends PolicyProvider {
- private static final Service[] bspCommunicationsServices =
- new Service[] {
- new Service("security.bsp.communications.protocol.acl",
- CommunicationsInterface.class),
+ /**
+ * Communication services array.
+ */
+ private static final Service[] BSP_COMMUNICATION_SERVICES =
+ new Service[] {
+ new Service("security.bsp.communications.protocol.acl",
+ CommunicationsInterface.class),
};
- @Override
- public Service[] getServices() {
- return bspCommunicationsServices;
- }
+ @Override
+ public Service[] getServices() {
+ return BSP_COMMUNICATION_SERVICES;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java Thu Feb 16 22:12:31 2012
@@ -27,25 +27,24 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenSelector;
/**
- * Look through tokens to find the first job token that matches the service
- * and return it.
- */
+ * Look through tokens to find the first job token that matches the service
+ * and return it.
+ */
public class BspTokenSelector implements TokenSelector<JobTokenIdentifier> {
-
- @SuppressWarnings("unchecked")
- @Override
- public Token<JobTokenIdentifier> selectToken(Text service,
- Collection<Token<? extends TokenIdentifier>> tokens) {
- if (service == null) {
- return null;
- }
- Text KIND_NAME = new Text("mapreduce.job");
- for (Token<? extends TokenIdentifier> token : tokens) {
- if (KIND_NAME.equals(token.getKind())) {
- return (Token<JobTokenIdentifier>) token;
- }
- }
- return null;
+ @SuppressWarnings("unchecked")
+ @Override
+ public Token<JobTokenIdentifier> selectToken(Text service,
+ Collection<Token<? extends TokenIdentifier>> tokens) {
+ if (service == null) {
+ return null;
+ }
+ Text kindName = new Text("mapreduce.job");
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ if (kindName.equals(token.getKind())) {
+ return (Token<JobTokenIdentifier>) token;
+ }
}
+ return null;
+ }
}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.giraph.bsp;
-
/**
- * State of the BSP application
+ * Package of policy and token information for Hadoop.
*/
-public enum ApplicationState {
- UNKNOWN, ///< Shouldn't be seen, just an initial state
- START_SUPERSTEP, ///< Start from a desired superstep
- FAILED, ///< Unrecoverable
- FINISHED ///< Successful completion
-}
+package org.apache.giraph.hadoop;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java Thu Feb 16 22:12:31 2012
@@ -38,84 +38,90 @@ import org.apache.log4j.Logger;
* Example graph partitioner that builds on {@link HashMasterPartitioner} to
* send the partitions to the worker that matches the superstep. It is for
* testing only and should never be used in practice.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public class SuperstepHashPartitionerFactory<
- I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends HashPartitionerFactory<I, V, E, M> {
+public class SuperstepHashPartitionerFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends HashPartitionerFactory<I, V, E, M> {
+ /**
+ * Changes the {@link HashMasterPartitioner} to make ownership of the
+ * partitions based on a superstep. For testing only as it is totally
+ * unbalanced.
+ *
+ * @param <I> vertex id
+ * @param <V> vertex data
+ * @param <E> edge data
+ * @param <M> message data
+ */
+ private static class SuperstepMasterPartition<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends HashMasterPartitioner<I, V, E, M> {
+ /** Class logger */
+ private static Logger LOG =
+ Logger.getLogger(SuperstepMasterPartition.class);
/**
- * Changes the {@link HashMasterPartitioner} to make ownership of the
- * partitions based on a superstep. For testing only as it is totally
- * unbalanced.
+ * Construction with configuration.
*
- * @param <I> vertex id
- * @param <V> vertex data
- * @param <E> edge data
- * @param <M> message data
+ * @param conf Configuration to be stored.
*/
- private static class SuperstepMasterPartition<
- I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends HashMasterPartitioner<I, V, E, M> {
- /** Class logger */
- private static Logger LOG =
- Logger.getLogger(SuperstepMasterPartition.class);
+ public SuperstepMasterPartition(Configuration conf) {
+ super(conf);
+ }
- public SuperstepMasterPartition(Configuration conf) {
- super(conf);
+ @Override
+ public Collection<PartitionOwner> generateChangedPartitionOwners(
+ Collection<PartitionStats> allPartitionStatsList,
+ Collection<WorkerInfo> availableWorkerInfos,
+ int maxWorkers,
+ long superstep) {
+ // Assign all the partitions to
+ // superstep mod availableWorkerInfos
+ // Guaranteed to be different if the workers (and their order)
+ // do not change
+ long workerIndex = superstep % availableWorkerInfos.size();
+ int i = 0;
+ WorkerInfo chosenWorkerInfo = null;
+ for (WorkerInfo workerInfo : availableWorkerInfos) {
+ if (workerIndex == i) {
+ chosenWorkerInfo = workerInfo;
}
+ ++i;
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("generateChangedPartitionOwners: Chosen worker " +
+ "for superstep " + superstep + " is " +
+ chosenWorkerInfo);
+ }
- @Override
- public Collection<PartitionOwner> generateChangedPartitionOwners(
- Collection<PartitionStats> allPartitionStatsList,
- Collection<WorkerInfo> availableWorkerInfos,
- int maxWorkers,
- long superstep) {
- // Assign all the partitions to
- // superstep mod availableWorkerInfos
- // Guaranteed to be different if the workers (and their order)
- // do not change
- long workerIndex = superstep % availableWorkerInfos.size();
- int i = 0;
- WorkerInfo chosenWorkerInfo = null;
- for (WorkerInfo workerInfo : availableWorkerInfos) {
- if (workerIndex == i) {
- chosenWorkerInfo = workerInfo;
- }
- ++i;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("generateChangedPartitionOwners: Chosen worker " +
- "for superstep " + superstep + " is " +
- chosenWorkerInfo);
- }
-
- List<PartitionOwner> partitionOwnerList =
- new ArrayList<PartitionOwner>();
- for (PartitionOwner partitionOwner :
- getCurrentPartitionOwners()) {
- WorkerInfo prevWorkerinfo =
- partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
- null : partitionOwner.getWorkerInfo();
- PartitionOwner tmpPartitionOwner =
- new BasicPartitionOwner(partitionOwner.getPartitionId(),
- chosenWorkerInfo,
- prevWorkerinfo,
- null);
- partitionOwnerList.add(tmpPartitionOwner);
- LOG.info("partition owner was " + partitionOwner +
- ", new " + tmpPartitionOwner);
- }
- setPartitionOwnerList(partitionOwnerList);
- return partitionOwnerList;
- }
+ List<PartitionOwner> partitionOwnerList = new ArrayList<PartitionOwner>();
+ for (PartitionOwner partitionOwner :
+ getCurrentPartitionOwners()) {
+ WorkerInfo prevWorkerinfo =
+ partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
+ null : partitionOwner.getWorkerInfo();
+ PartitionOwner tmpPartitionOwner =
+ new BasicPartitionOwner(partitionOwner.getPartitionId(),
+ chosenWorkerInfo,
+ prevWorkerinfo,
+ null);
+ partitionOwnerList.add(tmpPartitionOwner);
+ LOG.info("partition owner was " + partitionOwner +
+ ", new " + tmpPartitionOwner);
+ }
+ setPartitionOwnerList(partitionOwnerList);
+ return partitionOwnerList;
}
+ }
- @Override
- public MasterGraphPartitioner<I, V, E, M>
- createMasterGraphPartitioner() {
- return new SuperstepMasterPartition<I, V, E, M>(getConf());
- }
+ @Override
+ public MasterGraphPartitioner<I, V, E, M>
+ createMasterGraphPartitioner() {
+ return new SuperstepMasterPartition<I, V, E, M>(getConf());
+ }
}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.giraph.bsp;
-
/**
- * State of the BSP application
+ * Package of all helper integration test objects.
*/
-public enum ApplicationState {
- UNKNOWN, ///< Shouldn't be seen, just an initial state
- START_SUPERSTEP, ///< Start from a desired superstep
- FAILED, ///< Unrecoverable
- FINISHED ///< Successful completion
-}
+package org.apache.giraph.integration;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -38,26 +38,41 @@ import java.io.IOException;
* @param <E> Edge value
*/
@SuppressWarnings("rawtypes")
-public class AdjacencyListTextVertexOutputFormat <I extends WritableComparable,
- V extends Writable, E extends Writable> extends TextVertexOutputFormat<I, V, E>{
-
+public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends TextVertexOutputFormat<I, V, E> {
+
+ /**
+ * Vertex writer associated wtih {@link AdjacencyListTextVertexOutputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
static class AdjacencyListVertexWriter<I extends WritableComparable, V extends
Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
+ /** Split delimiter */
public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+ /** Default split delimiter */
public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-
+ /** Cached split delimeter */
private String delimiter;
- public AdjacencyListVertexWriter(RecordWriter<Text,Text> recordWriter) {
+ /**
+ * Constructor with writer.
+ *
+ * @param recordWriter Record writer used for writing.
+ */
+ public AdjacencyListVertexWriter(RecordWriter<Text, Text> recordWriter) {
super(recordWriter);
}
@Override
public void writeVertex(BasicVertex<I, V, E, ?> vertex) throws IOException,
- InterruptedException {
+ InterruptedException {
if (delimiter == null) {
delimiter = getContext().getConfiguration()
- .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+ .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
}
StringBuffer sb = new StringBuffer(vertex.getVertexId().toString());
@@ -75,9 +90,8 @@ public class AdjacencyListTextVertexOutp
@Override
public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
return new AdjacencyListVertexWriter<I, V, E>
- (textOutputFormat.getRecordWriter(context));
+ (textOutputFormat.getRecordWriter(context));
}
-
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java Thu Feb 16 22:12:31 2012
@@ -21,6 +21,7 @@ import com.google.common.collect.Maps;
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Edge;
+import org.apache.giraph.lib.TextVertexInputFormat.TextVertexReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -42,15 +43,17 @@ import java.util.Map;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public abstract class AdjacencyListVertexReader<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> extends
TextVertexInputFormat.TextVertexReader<I, V, E, M> {
-
+ /** Delimiter for split */
public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
+ /** Default delimiter for split */
public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-
+ /** Cached delimiter used for split */
private String splitValue = null;
/**
@@ -59,17 +62,37 @@ public abstract class AdjacencyListVerte
public interface LineSanitizer {
/**
* Clean string s before attempting to tokenize it.
+ *
+ * @param s String to be cleaned.
+ * @return Sanitized string.
*/
- public String sanitize(String s);
+ String sanitize(String s);
}
- private LineSanitizer sanitizer = null;
+ /**
+ * Sanitizer from constructor.
+ */
+ private final LineSanitizer sanitizer;
- public AdjacencyListVertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ /**
+ * Constructor with line record reader.
+ *
+ * @param lineRecordReader Reader from {@link TextVertexReader}.
+ */
+ public AdjacencyListVertexReader(
+ RecordReader<LongWritable, Text> lineRecordReader) {
super(lineRecordReader);
+ sanitizer = null;
}
- public AdjacencyListVertexReader(RecordReader<LongWritable, Text> lineRecordReader,
+ /**
+ * Constructor with line record reader.
+ *
+ * @param lineRecordReader Reader from {@link TextVertexReader}.
+ * @param sanitizer Sanitizer to be used.
+ */
+ public AdjacencyListVertexReader(
+ RecordReader<LongWritable, Text> lineRecordReader,
LineSanitizer sanitizer) {
super(lineRecordReader);
this.sanitizer = sanitizer;
@@ -77,17 +100,18 @@ public abstract class AdjacencyListVerte
/**
* Store the Id for this line in an instance of its correct type.
+ *
* @param s Id of vertex from line
* @param id Instance of Id's type, in which to store its value
*/
- abstract public void decodeId(String s, I id);
+ public abstract void decodeId(String s, I id);
/**
* Store the value for this line in an instance of its correct type.
* @param s Value from line
* @param value Instance of value's type, in which to store its value
*/
- abstract public void decodeValue(String s, V value);
+ public abstract void decodeValue(String s, V value);
/**
* Store an edge from the line into an instance of a correctly typed Edge
@@ -95,7 +119,7 @@ public abstract class AdjacencyListVerte
* @param value The edge's value from the line
* @param edge Instance of edge in which to store the id and value
*/
- abstract public void decodeEdge(String id, String value, Edge<I, E> edge);
+ public abstract void decodeEdge(String id, String value, Edge<I, E> edge);
@Override
@@ -104,7 +128,8 @@ public abstract class AdjacencyListVerte
}
@Override
- public BasicVertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException {
+ public BasicVertex<I, V, E, M> getCurrentVertex()
+ throws IOException, InterruptedException {
Configuration conf = getContext().getConfiguration();
String line = getRecordReader().getCurrentValue().toString();
BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
@@ -120,7 +145,8 @@ public abstract class AdjacencyListVerte
String [] values = line.split(splitValue);
if ((values.length < 2) || (values.length % 2 != 0)) {
- throw new IllegalArgumentException("Line did not split correctly: " + line);
+ throw new IllegalArgumentException(
+ "Line did not split correctly: " + line);
}
I vertexId = BspUtils.<I>createVertexIndex(conf);
@@ -132,7 +158,7 @@ public abstract class AdjacencyListVerte
int i = 2;
Map<I, E> edges = Maps.newHashMap();
Edge<I, E> edge = new Edge<I, E>();
- while(i < values.length) {
+ while (i < values.length) {
decodeEdge(values[i], values[i + 1], edge);
edges.put(edge.getDestVertexId(), edge.getEdgeValue());
i += 2;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -40,30 +40,45 @@ import java.io.IOException;
* @param <E> Edge value
*/
@SuppressWarnings("rawtypes")
-public class IdWithValueTextOutputFormat <I extends WritableComparable,
- V extends Writable, E extends Writable> extends TextVertexOutputFormat<I, V, E>{
-
+public class IdWithValueTextOutputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends TextVertexOutputFormat<I, V, E> {
+
+ /**
+ * Vertex writer used with {@link IdWithValueTextOutputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
static class IdWithValueVertexWriter<I extends WritableComparable, V extends
Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
-
+ /** Specify the output delimiter */
public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+ /** Default output delimiter */
public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-
+ /** Reverse id and value order? */
public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
+ /** Default is to not reverse id and value order. */
public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
-
+ /** Saved delimiter */
private String delimiter;
+ /**
+ * Constructor with record writer.
+ *
+ * @param recordWriter Writer from LineRecordWriter.
+ */
public IdWithValueVertexWriter(RecordWriter<Text, Text> recordWriter) {
super(recordWriter);
}
@Override
public void writeVertex(BasicVertex<I, V, E, ?> vertex) throws IOException,
- InterruptedException {
+ InterruptedException {
if (delimiter == null) {
delimiter = getContext().getConfiguration()
- .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+ .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
}
String first;
@@ -87,9 +102,8 @@ public class IdWithValueTextOutputFormat
@Override
public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
return new IdWithValueVertexWriter<I, V, E>
- (textOutputFormat.getRecordWriter(context));
+ (textOutputFormat.getRecordWriter(context));
}
-
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java Thu Feb 16 22:12:31 2012
@@ -21,11 +21,16 @@ package org.apache.giraph.lib;
/**
* Keeps the vertex keys for the input/output vertex format
*/
-public interface JsonBase64VertexFormat {
- /** Vertex id key */
- public static final String VERTEX_ID_KEY = "vertexId";
- /** Vertex value key*/
- public static final String VERTEX_VALUE_KEY = "vertexValue";
- /** Edge value array key (all the edges are stored here) */
- public static final String EDGE_ARRAY_KEY = "edgeArray";
+public class JsonBase64VertexFormat {
+ /** Vertex id key */
+ public static final String VERTEX_ID_KEY = "vertexId";
+ /** Vertex value key*/
+ public static final String VERTEX_VALUE_KEY = "vertexValue";
+ /** Edge value array key (all the edges are stored here) */
+ public static final String EDGE_ARRAY_KEY = "edgeArray";
+
+ /**
+ * Don't construct.
+ */
+ private JsonBase64VertexFormat() { }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -51,110 +51,110 @@ import java.util.Map;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public class JsonBase64VertexInputFormat<
- I extends WritableComparable, V extends Writable, E extends Writable,
- M extends Writable>
- extends TextVertexInputFormat<I, V, E, M> implements
- JsonBase64VertexFormat {
+public class JsonBase64VertexInputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends TextVertexInputFormat<I, V, E, M> {
+ /**
+ * Simple reader that supports {@link JsonBase64VertexInputFormat}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+ private static class JsonBase64VertexReader<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends TextVertexReader<I, V, E, M> {
/**
- * Simple reader that supports {@link JsonBase64VertexInputFormat}
+ * Only constructor. Requires the LineRecordReader
*
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
+ * @param lineRecordReader Line record reader to read from
*/
- private static class JsonBase64VertexReader<
- I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> extends TextVertexReader<I, V, E, M> {
- /**
- * Only constructor. Requires the LineRecordReader
- *
- * @param lineRecordReader Line record reader to read from
- */
- public JsonBase64VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
- super(lineRecordReader);
- }
+ public JsonBase64VertexReader(
+ RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
- @Override
- public BasicVertex<I, V, E, M> getCurrentVertex()
- throws IOException, InterruptedException {
- Configuration conf = getContext().getConfiguration();
- BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+ @Override
+ public BasicVertex<I, V, E, M> getCurrentVertex()
+ throws IOException, InterruptedException {
+ Configuration conf = getContext().getConfiguration();
+ BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
- Text line = getRecordReader().getCurrentValue();
- JSONObject vertexObject;
- try {
- vertexObject = new JSONObject(line.toString());
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get the vertex", e);
- }
- DataInput input = null;
- byte[] decodedWritable = null;
- I vertexId = null;
- try {
- decodedWritable = Base64.decode(
- vertexObject.getString(VERTEX_ID_KEY));
- input = new DataInputStream(
- new ByteArrayInputStream(decodedWritable));
- vertexId = BspUtils.<I>createVertexIndex(conf);
- vertexId.readFields(input);
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get vertex id", e);
- }
- V vertexValue = null;
- try {
- decodedWritable = Base64.decode(
- vertexObject.getString(VERTEX_VALUE_KEY));
- input = new DataInputStream(
- new ByteArrayInputStream(decodedWritable));
- vertexValue = BspUtils.<V>createVertexValue(conf);
- vertexValue.readFields(input);
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get vertex value", e);
- }
- JSONArray edgeArray = null;
- try {
- edgeArray = vertexObject.getJSONArray(EDGE_ARRAY_KEY);
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get edge array", e);
- }
- Map<I, E> edgeMap = Maps.newHashMap();
- for (int i = 0; i < edgeArray.length(); ++i) {
- try {
- decodedWritable =
- Base64.decode(edgeArray.getString(i));
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Failed to get edge value", e);
- }
- input = new DataInputStream(
- new ByteArrayInputStream(decodedWritable));
- Edge<I, E> edge = new Edge<I, E>();
- edge.setConf(getContext().getConfiguration());
- edge.readFields(input);
- edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue());
- }
- vertex.initialize(vertexId, vertexValue, edgeMap, null);
- return vertex;
+ Text line = getRecordReader().getCurrentValue();
+ JSONObject vertexObject;
+ try {
+ vertexObject = new JSONObject(line.toString());
+ } catch (JSONException e) {
+ throw new IllegalArgumentException(
+ "next: Failed to get the vertex", e);
+ }
+ DataInput input = null;
+ byte[] decodedWritable = null;
+ I vertexId = null;
+ try {
+ decodedWritable = Base64.decode(
+ vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
+ input = new DataInputStream(
+ new ByteArrayInputStream(decodedWritable));
+ vertexId = BspUtils.<I>createVertexIndex(conf);
+ vertexId.readFields(input);
+ } catch (JSONException e) {
+ throw new IllegalArgumentException(
+ "next: Failed to get vertex id", e);
+ }
+ V vertexValue = null;
+ try {
+ decodedWritable = Base64.decode(
+ vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
+ input = new DataInputStream(
+ new ByteArrayInputStream(decodedWritable));
+ vertexValue = BspUtils.<V>createVertexValue(conf);
+ vertexValue.readFields(input);
+ } catch (JSONException e) {
+ throw new IllegalArgumentException(
+ "next: Failed to get vertex value", e);
+ }
+ JSONArray edgeArray = null;
+ try {
+ edgeArray = vertexObject.getJSONArray(
+ JsonBase64VertexFormat.EDGE_ARRAY_KEY);
+ } catch (JSONException e) {
+ throw new IllegalArgumentException(
+ "next: Failed to get edge array", e);
+ }
+ Map<I, E> edgeMap = Maps.newHashMap();
+ for (int i = 0; i < edgeArray.length(); ++i) {
+ try {
+ decodedWritable = Base64.decode(edgeArray.getString(i));
+ } catch (JSONException e) {
+ throw new IllegalArgumentException(
+ "next: Failed to get edge value", e);
}
+ input = new DataInputStream(
+ new ByteArrayInputStream(decodedWritable));
+ Edge<I, E> edge = new Edge<I, E>();
+ edge.setConf(getContext().getConfiguration());
+ edge.readFields(input);
+ edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue());
+ }
+ vertex.initialize(vertexId, vertexValue, edgeMap, null);
+ return vertex;
}
+ }
- @Override
- public VertexReader<I, V, E, M> createVertexReader(
- InputSplit split,
- TaskAttemptContext context) throws IOException {
- return new JsonBase64VertexReader<I, V, E, M>(textInputFormat.createRecordReader(split,
- context));
- }
+ @Override
+ public VertexReader<I, V, E, M> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new JsonBase64VertexReader<I, V, E, M>(
+ textInputFormat.createRecordReader(split, context));
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -47,80 +47,80 @@ import java.io.IOException;
* @param <E> Edge value
*/
@SuppressWarnings("rawtypes")
-public class JsonBase64VertexOutputFormat<
- I extends WritableComparable, V extends Writable, E extends Writable>
- extends TextVertexOutputFormat<I, V, E>
- implements JsonBase64VertexFormat {
+public class JsonBase64VertexOutputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable> extends
+ TextVertexOutputFormat<I, V, E> {
+ /**
+ * Simple writer that supports {@link JsonBase64VertexOutputFormat}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+ private static class JsonBase64VertexWriter<I extends WritableComparable,
+ V extends Writable, E extends Writable> extends
+ TextVertexWriter<I, V, E> {
/**
- * Simple writer that supports {@link JsonBase64VertexOutputFormat}
+ * Only constructor. Requires the LineRecordWriter
*
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
+ * @param lineRecordWriter Line record writer to write to
*/
- private static class JsonBase64VertexWriter<
- I extends WritableComparable, V extends Writable,
- E extends Writable> extends TextVertexWriter<I, V, E> {
- /**
- * Only constructor. Requires the LineRecordWriter
- *
- * @param lineRecordWriter Line record writer to write to
- */
- public JsonBase64VertexWriter(
- RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(BasicVertex<I, V, E, ?> vertex)
- throws IOException, InterruptedException {
- ByteArrayOutputStream outputStream =
- new ByteArrayOutputStream();
- DataOutput output = new DataOutputStream(outputStream);
- JSONObject vertexObject = new JSONObject();
- vertex.getVertexId().write(output);
- try {
- vertexObject.put(
- VERTEX_ID_KEY,
- Base64.encodeBytes(outputStream.toByteArray()));
- } catch (JSONException e) {
- throw new IllegalStateException(
- "writerVertex: Failed to insert vertex id", e);
- }
- outputStream.reset();
- vertex.getVertexValue().write(output);
- try {
- vertexObject.put(
- VERTEX_VALUE_KEY,
- Base64.encodeBytes(outputStream.toByteArray()));
- } catch (JSONException e) {
- throw new IllegalStateException(
- "writerVertex: Failed to insert vertex value", e);
- }
- JSONArray edgeArray = new JSONArray();
- for (I targetVertexId : vertex) {
- Edge<I, E> edge = new Edge<I, E>(
- targetVertexId, vertex.getEdgeValue(targetVertexId));
- edge.setConf(getContext().getConfiguration());
- outputStream.reset();
- edge.write(output);
- edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
- }
- try {
- vertexObject.put(EDGE_ARRAY_KEY, edgeArray);
- } catch (JSONException e) {
- throw new IllegalStateException(
- "writerVertex: Failed to insert edge array", e);
- }
- getRecordWriter().write(new Text(vertexObject.toString()), null);
- }
+ public JsonBase64VertexWriter(
+ RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
}
@Override
- public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new JsonBase64VertexWriter<I, V, E>(
- textOutputFormat.getRecordWriter(context));
+ public void writeVertex(BasicVertex<I, V, E, ?> vertex)
+ throws IOException, InterruptedException {
+ ByteArrayOutputStream outputStream =
+ new ByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(outputStream);
+ JSONObject vertexObject = new JSONObject();
+ vertex.getVertexId().write(output);
+ try {
+ vertexObject.put(
+ JsonBase64VertexFormat.VERTEX_ID_KEY,
+ Base64.encodeBytes(outputStream.toByteArray()));
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ "writerVertex: Failed to insert vertex id", e);
+ }
+ outputStream.reset();
+ vertex.getVertexValue().write(output);
+ try {
+ vertexObject.put(
+ JsonBase64VertexFormat.VERTEX_VALUE_KEY,
+ Base64.encodeBytes(outputStream.toByteArray()));
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ "writerVertex: Failed to insert vertex value", e);
+ }
+ JSONArray edgeArray = new JSONArray();
+ for (I targetVertexId : vertex) {
+ Edge<I, E> edge = new Edge<I, E>(
+ targetVertexId, vertex.getEdgeValue(targetVertexId));
+ edge.setConf(getContext().getConfiguration());
+ outputStream.reset();
+ edge.write(output);
+ edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
+ }
+ try {
+ vertexObject.put(
+ JsonBase64VertexFormat.EDGE_ARRAY_KEY,
+ edgeArray);
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ "writerVertex: Failed to insert edge array", e);
+ }
+ getRecordWriter().write(new Text(vertexObject.toString()), null);
}
+ }
+ @Override
+ public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new JsonBase64VertexWriter<I, V, E>(
+ textOutputFormat.getRecordWriter(context));
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -35,19 +35,40 @@ import java.io.IOException;
* 22 0.1 45 0.3 99 0.44
* to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99,
* with values of 0.3 and 0.44, respectively.
+ *
+ * @param <M> Message data
*/
-public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable> extends
- TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> {
-
+public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
+ extends TextVertexInputFormat<LongWritable, DoubleWritable,
+ DoubleWritable, M> {
+
+ /**
+ * VertexReader associated with
+ * {@link LongDoubleDoubleAdjacencyListVertexInputFormat}.
+ *
+ * @param <M> Message data.
+ */
static class VertexReader<M extends Writable> extends
- AdjacencyListVertexReader<LongWritable, DoubleWritable, DoubleWritable, M> {
+ AdjacencyListVertexReader<LongWritable, DoubleWritable,
+ DoubleWritable, M> {
+ /**
+ * Constructor with Line record reader.
+ *
+ * @param lineRecordReader Reader to internally use.
+ */
VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
super(lineRecordReader);
}
+ /**
+ * Constructor with Line record reader and sanitizer.
+ *
+ * @param lineRecordReader Reader to internally use.
+ * @param sanitizer Line sanitizer.
+ */
VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
- LineSanitizer sanitizer) {
+ LineSanitizer sanitizer) {
super(lineRecordReader, sanitizer);
}
@@ -62,8 +83,10 @@ public class LongDoubleDoubleAdjacencyLi
}
@Override
- public void decodeEdge(String s1, String s2, Edge<LongWritable, DoubleWritable>
- textIntWritableEdge) {
+ public void decodeEdge(
+ String s1,
+ String s2,
+ Edge<LongWritable, DoubleWritable> textIntWritableEdge) {
textIntWritableEdge.setDestVertexId(new LongWritable(Long.valueOf(s1)));
textIntWritableEdge.setEdgeValue(new DoubleWritable(Double.valueOf(s2)));
}
@@ -71,10 +94,10 @@ public class LongDoubleDoubleAdjacencyLi
@Override
public org.apache.giraph.graph.VertexReader<LongWritable,
- DoubleWritable, DoubleWritable, M> createVertexReader(
+ DoubleWritable, DoubleWritable, M> createVertexReader(
InputSplit split,
TaskAttemptContext context) throws IOException {
return new VertexReader<M>(textInputFormat.createRecordReader(
- split, context));
+ split, context));
}
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -31,49 +31,73 @@ import org.apache.hadoop.mapreduce.lib.i
import java.io.IOException;
import java.util.List;
+/**
+ * Sequence file vertex input format based on {@link SequenceFileInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param <X> Value type
+ */
public class SequenceFileVertexInputFormat<I extends WritableComparable<I>,
- V extends Writable,
- E extends Writable,
- M extends Writable,
- X extends BasicVertex<I, V, E, M>>
+ V extends Writable, E extends Writable, M extends Writable,
+ X extends BasicVertex<I, V, E, M>>
extends VertexInputFormat<I, V, E, M> {
- protected SequenceFileInputFormat<I, X> sequenceFileInputFormat
- = new SequenceFileInputFormat<I, X>();
+ /** Internal input format */
+ protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
+ new SequenceFileInputFormat<I, X>();
- @Override public List<InputSplit> getSplits(JobContext context, int numWorkers)
- throws IOException, InterruptedException {
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
return sequenceFileInputFormat.getSplits(context);
}
@Override
public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
- TaskAttemptContext context)
- throws IOException {
+ TaskAttemptContext context) throws IOException {
return new SequenceFileVertexReader<I, V, E, M, X>(
sequenceFileInputFormat.createRecordReader(split, context));
}
+ /**
+ * Vertex reader used with {@link SequenceFileVertexInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param <X> Value type
+ */
public static class SequenceFileVertexReader<I extends WritableComparable<I>,
V extends Writable, E extends Writable, M extends Writable,
X extends BasicVertex<I, V, E, M>>
implements VertexReader<I, V, E, M> {
+ /** Internal record reader from {@link SequenceFileInputFormat} */
private final RecordReader<I, X> recordReader;
+ /**
+ * Constructor with record reader.
+ *
+ * @param recordReader Reader from {@link SequenceFileInputFormat}.
+ */
public SequenceFileVertexReader(RecordReader<I, X> recordReader) {
this.recordReader = recordReader;
}
- @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
+ @Override public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
recordReader.initialize(inputSplit, context);
}
- @Override public boolean nextVertex() throws IOException, InterruptedException {
+ @Override public boolean nextVertex() throws IOException,
+ InterruptedException {
return recordReader.nextKeyValue();
}
@Override public BasicVertex<I, V, E, M> getCurrentVertex()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
return recordReader.getCurrentValue();
}
@@ -82,7 +106,8 @@ public class SequenceFileVertexInputForm
recordReader.close();
}
- @Override public float getProgress() throws IOException, InterruptedException {
+ @Override public float getProgress() throws IOException,
+ InterruptedException {
return recordReader.getProgress();
}
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -32,19 +32,38 @@ import java.io.IOException;
* Class to read graphs stored as adjacency lists with ids represented by
* Strings and values as doubles. This is a good inputformat for reading
* graphs where the id types do not matter and can be stashed in a String.
+ *
+ * @param <M> Message type.
*/
public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
extends TextVertexInputFormat<Text, DoubleWritable, DoubleWritable, M> {
- static class VertexReader<M extends Writable> extends AdjacencyListVertexReader<Text,
- DoubleWritable, DoubleWritable, M> {
+ /**
+ * Vertex reader used with
+ * {@link TextDoubleDoubleAdjacencyListVertexInputFormat}
+ *
+ * @param <M> Message type.
+ */
+ static class VertexReader<M extends Writable> extends
+ AdjacencyListVertexReader<Text, DoubleWritable, DoubleWritable, M> {
+ /**
+ * Constructor without sanitzer.
+ *
+ * @param lineRecordReader Internal reader.
+ */
VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
super(lineRecordReader);
}
+ /**
+ * Constructor with {@link LineRecordReader}
+ *
+ * @param lineRecordReader Internal reader.
+ * @param sanitizer Sanitizer of the lines.
+ */
VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
- LineSanitizer sanitizer) {
+ LineSanitizer sanitizer) {
super(lineRecordReader, sanitizer);
}
@@ -59,8 +78,8 @@ public class TextDoubleDoubleAdjacencyLi
}
@Override
- public void decodeEdge(String s1, String s2, Edge<Text, DoubleWritable>
- textIntWritableEdge) {
+ public void decodeEdge(String s1, String s2,
+ Edge<Text, DoubleWritable> textIntWritableEdge) {
textIntWritableEdge.setDestVertexId(new Text(s1));
textIntWritableEdge.setEdgeValue(new DoubleWritable(Double.valueOf(s2)));
}
@@ -68,11 +87,9 @@ public class TextDoubleDoubleAdjacencyLi
@Override
public org.apache.giraph.graph.VertexReader<Text, DoubleWritable,
- DoubleWritable, M> createVertexReader(
- InputSplit split,
+ DoubleWritable, M> createVertexReader(InputSplit split,
TaskAttemptContext context) throws IOException {
return new VertexReader<M>(textInputFormat.createRecordReader(
- split, context));
+ split, context));
}
-
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -43,85 +43,80 @@ import java.util.List;
* @param <M> Message value
*/
@SuppressWarnings("rawtypes")
-public abstract class TextVertexInputFormat<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends VertexInputFormat<I, V, E, M> {
- /** Uses the TextInputFormat to do everything */
- protected TextInputFormat textInputFormat = new TextInputFormat();
+public abstract class TextVertexInputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+ /** Uses the TextInputFormat to do everything */
+ protected TextInputFormat textInputFormat = new TextInputFormat();
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex input. Easiest to ignore the key value separator and only use
+ * key instead.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+ public abstract static class TextVertexReader<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+ /** Internal line record reader */
+ private final RecordReader<LongWritable, Text> lineRecordReader;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
/**
- * Abstract class to be implemented by the user based on their specific
- * vertex input. Easiest to ignore the key value separator and only use
- * key instead.
+ * Initialize with the LineRecordReader.
*
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
+ * @param lineRecordReader Line record reader from TextInputFormat
*/
- public static abstract class TextVertexReader<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements VertexReader<I, V, E, M> {
- /** Internal line record reader */
- private final RecordReader<LongWritable, Text> lineRecordReader;
- /** Context passed to initialize */
- private TaskAttemptContext context;
-
- /**
- * Initialize with the LineRecordReader.
- *
- * @param lineRecordReader Line record reader from TextInputFormat
- */
- public TextVertexReader(
- RecordReader<LongWritable, Text> lineRecordReader) {
- this.lineRecordReader = lineRecordReader;
- }
-
- @Override
- public void initialize(InputSplit inputSplit,
- TaskAttemptContext context)
- throws IOException, InterruptedException {
- lineRecordReader.initialize(inputSplit, context);
- this.context = context;
- }
-
- @Override
- public void close() throws IOException {
- lineRecordReader.close();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return lineRecordReader.getProgress();
- }
-
- /**
- * Get the line record reader.
- *
- * @return Record reader to be used for reading.
- */
- protected RecordReader<LongWritable, Text> getRecordReader() {
- return lineRecordReader;
- }
-
- /**
- * Get the context.
- *
- * @return Context passed to initialize.
- */
- protected TaskAttemptContext getContext() {
- return context;
- }
+ public TextVertexReader(
+ RecordReader<LongWritable, Text> lineRecordReader) {
+ this.lineRecordReader = lineRecordReader;
}
@Override
- public List<InputSplit> getSplits(
- JobContext context, int numWorkers)
- throws IOException, InterruptedException {
- // Ignore the hint of numWorkers here since we are using TextInputFormat
- // to do this for us
- return textInputFormat.getSplits(context);
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ lineRecordReader.initialize(inputSplit, context);
+ this.context = context;
}
+
+ @Override
+ public void close() throws IOException {
+ lineRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return lineRecordReader.getProgress();
+ }
+
+ /**
+ * Get the line record reader.
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected RecordReader<LongWritable, Text> getRecordReader() {
+ return lineRecordReader;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ // Ignore the hint of numWorkers here since we are using TextInputFormat
+ // to do this for us
+ return textInputFormat.getSplits(context);
+ }
}