You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [6/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Thu Feb 16 22:12:31 2012
@@ -18,7 +18,8 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.graph.*;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerContext;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -36,153 +37,174 @@ import java.util.Iterator;
  * appropriate location and superstep.
  */
 public class VerifyMessage {
-    public static class VerifiableMessage implements Writable {
-        /** Superstep sent on */
-        public long superstep;
-        /** Source vertex id */
-        public long sourceVertexId;
-        /** Value */
-        public float value;
-
-        public VerifiableMessage() {}
-
-        public VerifiableMessage(
-                long superstep, long sourceVertexId, float value) {
-            this.superstep = superstep;
-            this.sourceVertexId = sourceVertexId;
-            this.value = value;
-        }
+  /**
+   * Message that will be sent in {@link VerifyMessageVertex}.
+   */
+  public static class VerifiableMessage implements Writable {
+    /** Superstep sent on */
+    private long superstep;
+    /** Source vertex id */
+    private long sourceVertexId;
+    /** Value */
+    private float value;
+
+    /**
+     * Default constructor used with reflection.
+     */
+    public VerifiableMessage() { }
+
+    /**
+     * Constructor with verifiable arguments.
+     * @param superstep Superstep this message was created on.
+     * @param sourceVertexId Who send this message.
+     * @param value A value associated with this message.
+     */
+    public VerifiableMessage(
+        long superstep, long sourceVertexId, float value) {
+      this.superstep = superstep;
+      this.sourceVertexId = sourceVertexId;
+      this.value = value;
+    }
 
-        @Override
-        public void readFields(DataInput input) throws IOException {
-            superstep = input.readLong();
-            sourceVertexId = input.readLong();
-            value = input.readFloat();
-        }
+    @Override
+    public void readFields(DataInput input) throws IOException {
+      superstep = input.readLong();
+      sourceVertexId = input.readLong();
+      value = input.readFloat();
+    }
 
-        @Override
-        public void write(DataOutput output) throws IOException {
-            output.writeLong(superstep);
-            output.writeLong(sourceVertexId);
-            output.writeFloat(value);
-        }
+    @Override
+    public void write(DataOutput output) throws IOException {
+      output.writeLong(superstep);
+      output.writeLong(sourceVertexId);
+      output.writeFloat(value);
+    }
 
-        @Override
-        public String toString() {
-            return "(superstep=" + superstep + ",sourceVertexId=" +
-                sourceVertexId + ",value=" + value + ")";
-        }
+    @Override
+    public String toString() {
+      return "(superstep=" + superstep + ",sourceVertexId=" +
+          sourceVertexId + ",value=" + value + ")";
     }
+  }
 
-    public static class VerifyMessageVertex extends
-            EdgeListVertex<LongWritable, IntWritable, FloatWritable,
-            VerifiableMessage> {
-        /** User can access this after the application finishes if local */
-        public static long finalSum;
-        /** Number of supersteps to run (6 by default) */
-        private static int supersteps = 6;
-        /** Class logger */
-        private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
-
-        /** Dynamically set number of supersteps */
-        public static final String SUPERSTEP_COUNT =
-            "verifyMessageVertex.superstepCount";
-
-        public static class VerifyMessageVertexWorkerContext extends
-                WorkerContext {
-            @Override
-            public void preApplication() throws InstantiationException,
-                    IllegalAccessException {
-                registerAggregator(LongSumAggregator.class.getName(),
-                    LongSumAggregator.class);
-                LongSumAggregator sumAggregator = (LongSumAggregator)
-                    getAggregator(LongSumAggregator.class.getName());
-                sumAggregator.setAggregatedValue(new LongWritable(0));
-                supersteps = getContext().getConfiguration().getInt(
-                    SUPERSTEP_COUNT, supersteps);
-            }
-
-            @Override
-            public void postApplication() {
-                LongSumAggregator sumAggregator = (LongSumAggregator)
-                    getAggregator(LongSumAggregator.class.getName());
-                finalSum = sumAggregator.getAggregatedValue().get();
-            }
-
-            @Override
-            public void preSuperstep() {
-                useAggregator(LongSumAggregator.class.getName());
-            }
+  /**
+   * Send and verify messages.
+   */
+  public static class VerifyMessageVertex extends
+      EdgeListVertex<LongWritable, IntWritable, FloatWritable,
+      VerifiableMessage> {
+    /** Dynamically set number of SUPERSTEPS */
+    public static final String SUPERSTEP_COUNT =
+        "verifyMessageVertex.superstepCount";
+    /** User can access this after the application finishes if local */
+    private static long FINAL_SUM;
+    /** Number of SUPERSTEPS to run (6 by default) */
+    private static int SUPERSTEPS = 6;
+    /** Class logger */
+    private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
 
-            @Override
-            public void postSuperstep() {}
-        }
+    public static long getFinalSum() {
+      return FINAL_SUM;
+    }
 
-        @Override
-        public void compute(Iterator<VerifiableMessage> msgIterator) {
-            LongSumAggregator sumAggregator = (LongSumAggregator)
-                getAggregator(LongSumAggregator.class.getName());
-            if (getSuperstep() > supersteps) {
-                voteToHalt();
-                return;
-            }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("compute: " + sumAggregator);
-            }
-            sumAggregator.aggregate(getVertexId().get());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("compute: sum = " +
-                          sumAggregator.getAggregatedValue().get() +
-                          " for vertex " + getVertexId());
-            }
-            float msgValue = 0.0f;
-            while (msgIterator.hasNext()) {
-                VerifiableMessage msg = msgIterator.next();
-                msgValue += msg.value;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("compute: got msg = " + msg +
-                              " for vertex id " + getVertexId() +
-                              ", vertex value " + getVertexValue() +
-                              " on superstep " + getSuperstep());
-                }
-                if (msg.superstep != getSuperstep() - 1) {
-                    throw new IllegalStateException(
-                        "compute: Impossible to not get a messsage from " +
-                        "the previous superstep, current superstep = " +
-                        getSuperstep());
-                }
-                if ((msg.sourceVertexId != getVertexId().get() - 1) &&
-                        (getVertexId().get() != 0)) {
-                    throw new IllegalStateException(
-                        "compute: Impossible that this message didn't come " +
-                        "from the previous vertex and came from " +
-                        msg.sourceVertexId);
-                }
-            }
-            int vertexValue = getVertexValue().get();
-            setVertexValue(new IntWritable(vertexValue + (int) msgValue));
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("compute: vertex " + getVertexId() +
-                          " has value " + getVertexValue() +
-                          " on superstep " + getSuperstep());
-            }
-            for (LongWritable targetVertexId : this) {
-                FloatWritable edgeValue = getEdgeValue(targetVertexId);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("compute: vertex " + getVertexId() +
-                              " sending edgeValue " + edgeValue +
-                              " vertexValue " + vertexValue +
-                              " total " +
-                              (edgeValue.get() + (float) vertexValue) +
-                              " to vertex " + targetVertexId +
-                              " on superstep " + getSuperstep());
-                }
-                edgeValue.set(edgeValue.get() + (float) vertexValue);
-                addEdge(targetVertexId, edgeValue);
-                sendMsg(targetVertexId,
-                    new VerifiableMessage(
-                        getSuperstep(), getVertexId().get(), edgeValue.get()));
-            }
-        }
+    /**
+     * Worker context used with {@link VerifyMessageVertex}.
+     */
+    public static class VerifyMessageVertexWorkerContext extends
+        WorkerContext {
+      @Override
+      public void preApplication() throws InstantiationException,
+      IllegalAccessException {
+        registerAggregator(LongSumAggregator.class.getName(),
+            LongSumAggregator.class);
+        LongSumAggregator sumAggregator = (LongSumAggregator)
+            getAggregator(LongSumAggregator.class.getName());
+        sumAggregator.setAggregatedValue(new LongWritable(0));
+        SUPERSTEPS = getContext().getConfiguration().getInt(
+            SUPERSTEP_COUNT, SUPERSTEPS);
+      }
+
+      @Override
+      public void postApplication() {
+        LongSumAggregator sumAggregator = (LongSumAggregator)
+            getAggregator(LongSumAggregator.class.getName());
+        FINAL_SUM = sumAggregator.getAggregatedValue().get();
+      }
+
+      @Override
+      public void preSuperstep() {
+        useAggregator(LongSumAggregator.class.getName());
+      }
+
+      @Override
+      public void postSuperstep() { }
+    }
+
+    @Override
+    public void compute(Iterator<VerifiableMessage> msgIterator) {
+      LongSumAggregator sumAggregator = (LongSumAggregator)
+          getAggregator(LongSumAggregator.class.getName());
+      if (getSuperstep() > SUPERSTEPS) {
+        voteToHalt();
+        return;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("compute: " + sumAggregator);
+      }
+      sumAggregator.aggregate(getVertexId().get());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("compute: sum = " +
+            sumAggregator.getAggregatedValue().get() +
+            " for vertex " + getVertexId());
+      }
+      float msgValue = 0.0f;
+      while (msgIterator.hasNext()) {
+        VerifiableMessage msg = msgIterator.next();
+        msgValue += msg.value;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("compute: got msg = " + msg +
+              " for vertex id " + getVertexId() +
+              ", vertex value " + getVertexValue() +
+              " on superstep " + getSuperstep());
+        }
+        if (msg.superstep != getSuperstep() - 1) {
+          throw new IllegalStateException(
+              "compute: Impossible to not get a messsage from " +
+                  "the previous superstep, current superstep = " +
+                  getSuperstep());
+        }
+        if ((msg.sourceVertexId != getVertexId().get() - 1) &&
+            (getVertexId().get() != 0)) {
+          throw new IllegalStateException(
+              "compute: Impossible that this message didn't come " +
+                  "from the previous vertex and came from " +
+                  msg.sourceVertexId);
+        }
+      }
+      int vertexValue = getVertexValue().get();
+      setVertexValue(new IntWritable(vertexValue + (int) msgValue));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("compute: vertex " + getVertexId() +
+            " has value " + getVertexValue() +
+            " on superstep " + getSuperstep());
+      }
+      for (LongWritable targetVertexId : this) {
+        FloatWritable edgeValue = getEdgeValue(targetVertexId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("compute: vertex " + getVertexId() +
+              " sending edgeValue " + edgeValue +
+              " vertexValue " + vertexValue +
+              " total " +
+              (edgeValue.get() + (float) vertexValue) +
+              " to vertex " + targetVertexId +
+              " on superstep " + getSuperstep());
+        }
+        edgeValue.set(edgeValue.get() + (float) vertexValue);
+        addEdge(targetVertexId, edgeValue);
+        sendMsg(targetVertexId,
+            new VerifiableMessage(
+                getSuperstep(), getVertexId().get(), edgeValue.get()));
+      }
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -1,20 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.giraph.examples;
 
@@ -33,39 +33,44 @@ import java.io.IOException;
  * Text-based {@link org.apache.giraph.graph.VertexOutputFormat} for usage with
  * {@link ConnectedComponentsVertex}
  *
- * Each line consists of a vertex and its associated component (represented by the smallest
- * vertex id in the component)
+ * Each line consists of a vertex and its associated component (represented
+ *  by the smallest vertex id in the component)
  */
 public class VertexWithComponentTextOutputFormat extends
-        TextVertexOutputFormat<IntWritable, IntWritable, NullWritable> {
-
-    @Override
-    public VertexWriter<IntWritable, IntWritable, NullWritable>
-            createVertexWriter(TaskAttemptContext context)
-            throws IOException, InterruptedException {
-        RecordWriter<Text, Text> recordWriter =
-                textOutputFormat.getRecordWriter(context);
-        return new VertexWithComponentWriter(recordWriter);
+    TextVertexOutputFormat<IntWritable, IntWritable, NullWritable> {
+  @Override
+  public VertexWriter<IntWritable, IntWritable, NullWritable>
+  createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    RecordWriter<Text, Text> recordWriter =
+        textOutputFormat.getRecordWriter(context);
+    return new VertexWithComponentWriter(recordWriter);
+  }
+
+  /**
+   * Vertex writer used with {@link VertexWithComponentTextOutputFormat}.
+   */
+  public static class VertexWithComponentWriter extends
+      TextVertexOutputFormat.TextVertexWriter<IntWritable, IntWritable,
+      NullWritable> {
+    /**
+     * Constructor with record writer.
+     *
+     * @param writer Where the vertices will finally be written.
+     */
+    public VertexWithComponentWriter(RecordWriter<Text, Text> writer) {
+      super(writer);
     }
 
-    public static class VertexWithComponentWriter extends
-            TextVertexOutputFormat.TextVertexWriter<IntWritable, IntWritable,
-            NullWritable> {
-
-        public VertexWithComponentWriter(RecordWriter<Text, Text> writer) {
-            super(writer);
-        }
-
-        @Override
-        public void writeVertex(BasicVertex<IntWritable, IntWritable,
-                NullWritable,?> vertex) throws IOException,
-                InterruptedException {
-            StringBuilder output = new StringBuilder();
-            output.append(vertex.getVertexId().get());
-            output.append('\t');
-            output.append(vertex.getVertexValue().get());
-            getRecordWriter().write(new Text(output.toString()), null);
-        }
-
+    @Override
+    public void writeVertex(BasicVertex<IntWritable, IntWritable,
+        NullWritable, ?> vertex) throws IOException,
+        InterruptedException {
+      StringBuilder output = new StringBuilder();
+      output.append(vertex.getVertexId().get());
+      output.append('\t');
+      output.append(vertex.getVertexValue().get());
+      getRecordWriter().write(new Text(output.toString()), null);
     }
-}
\ No newline at end of file
+  }
+}

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of Giraph examples.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.examples;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java Thu Feb 16 22:12:31 2012
@@ -24,39 +24,39 @@ import org.apache.hadoop.io.Writable;
  * Interface for Aggregator.  Allows aggregate operations for all vertices
  * in a given superstep.
  *
- * @param <A extends Writable> Aggregated value
+ * @param <A> Aggregated value
  */
 public interface Aggregator<A extends Writable> {
-    /**
-     * Add a new value.
-     * Needs to be commutative and associative
-     *
-     * @param value
-     */
-    void aggregate(A value);
+  /**
+   * Add a new value.
+   * Needs to be commutative and associative
+   *
+   * @param value Value to be aggregated.
+   */
+  void aggregate(A value);
 
-    /**
-     * Set aggregated value.
-     * Can be used for initialization or reset.
-     *
-     * @param value
-     */
-    void setAggregatedValue(A value);
+  /**
+   * Set aggregated value.
+   * Can be used for initialization or reset.
+   *
+   * @param value Value to be set.
+   */
+  void setAggregatedValue(A value);
 
-    /**
-     * Return current aggregated value.
-     * Needs to be initialized if aggregate or setAggregatedValue
-     * have not been called before.
-     *
-     * @return A
-     */
-    A getAggregatedValue();
+  /**
+   * Return current aggregated value.
+   * Needs to be initialized if aggregate or setAggregatedValue
+   * have not been called before.
+   *
+   * @return Aggregated
+   */
+  A getAggregatedValue();
 
-    /**
-     * Return new aggregated value.
-     * Must be changeable without affecting internals of Aggregator
-     *
-     * @return Writable
-     */
-    A createAggregatedValue();
+  /**
+   * Return new aggregated value.
+   * Must be changeable without affecting internals of Aggregator
+   *
+   * @return Writable
+   */
+  A createAggregatedValue();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java Thu Feb 16 22:12:31 2012
@@ -24,34 +24,35 @@ import org.apache.hadoop.io.Writable;
  * Vertex classes can use this interface to register and use aggregators
  */
 public interface AggregatorUsage {
-    /**
-     * Register an aggregator in preSuperstep() and/or preApplication().
-     *
-     * @param name of aggregator
-     * @param aggregatorClass Class type of the aggregator
-     * @return created Aggregator or null when already registered
-     */
-    public <A extends Writable> Aggregator<A> registerAggregator(
-        String name,
-        Class<? extends Aggregator<A>> aggregatorClass)
-        throws InstantiationException, IllegalAccessException;
+  /**
+   * Register an aggregator in preSuperstep() and/or preApplication().
+   *
+   * @param <A> Aggregator type
+   * @param name of aggregator
+   * @param aggregatorClass Class type of the aggregator
+   * @return created Aggregator or null when already registered
+   */
+  <A extends Writable> Aggregator<A> registerAggregator(
+    String name,
+    Class<? extends Aggregator<A>> aggregatorClass)
+    throws InstantiationException, IllegalAccessException;
 
-    /**
-     * Get a registered aggregator.
-     *
-     * @param name Name of aggregator
-     * @return Aggregator<A> (null when not registered)
-     */
-    public Aggregator<? extends Writable> getAggregator(String name);
+  /**
+   * Get a registered aggregator.
+   *
+   * @param name Name of aggregator
+   * @return Aggregator (null when not registered)
+   */
+  Aggregator<? extends Writable> getAggregator(String name);
 
-    /**
-     * Use a registered aggregator in current superstep.
-     * Even when the same aggregator should be used in the next
-     * superstep, useAggregator needs to be called at the beginning
-     * of that superstep in preSuperstep().
-     *
-     * @param name Name of aggregator
-     * @return boolean (false when not registered)
-     */
-    public boolean useAggregator(String name);
+  /**
+   * Use a registered aggregator in current superstep.
+   * Even when the same aggregator should be used in the next
+   * superstep, useAggregator needs to be called at the beginning
+   * of that superstep in preSuperstep().
+   *
+   * @param name Name of aggregator
+   * @return boolean (false when not registered)
+   */
+  boolean useAggregator(String name);
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java Thu Feb 16 22:12:31 2012
@@ -25,49 +25,49 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 
 /**
- *  An AggregatorWriter is used to export Aggregators during or at the end of 
+ *  An AggregatorWriter is used to export Aggregators during or at the end of
  *  each computation. It runs on the master and it's called at the end of each
- *  superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is 
- *  passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the 
+ *  superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is
+ *  passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the
  *  superstep value to signal the end of computation.
  */
 public interface AggregatorWriter {
-    /** Signal for last superstep */
-    public static final int LAST_SUPERSTEP = -1;
+  /** Signal for last superstep */
+  int LAST_SUPERSTEP = -1;
 
-    /**
-     * The method is called at the initialization of the AggregatorWriter.
-     * More precisely, the aggregatorWriter is initialized each time a new
-     * master is elected.
-     * 
-     * @param context Mapper Context where the master is running on
-     * @param applicationAttempt ID of the applicationAttempt, used to
-     *        disambiguate aggregator writes for different attempts
-     * @throws IOException
-     */
-    @SuppressWarnings("rawtypes")
-    void initialize(Context context, long applicationAttempt) throws IOException;
+  /**
+   * The method is called at the initialization of the AggregatorWriter.
+   * More precisely, the aggregatorWriter is initialized each time a new
+   * master is elected.
+   *
+   * @param context Mapper Context where the master is running on
+   * @param applicationAttempt ID of the applicationAttempt, used to
+   *        disambiguate aggregator writes for different attempts
+   * @throws IOException
+   */
+  @SuppressWarnings("rawtypes")
+  void initialize(Context context, long applicationAttempt) throws IOException;
 
-    /**
-     * The method is called at the end of each superstep. The user might decide
-     * whether to write the aggregators values for the current superstep. For 
-     * the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed.
-     * 
-     * @param aggregatorMap Map of aggregators to write
-     * @param superstep Current superstep
-     * @throws IOException
-     */
-    void writeAggregator(
-            Map<String, Aggregator<Writable>> aggregatorMap, 
-            long superstep) throws IOException;
+  /**
+   * The method is called at the end of each superstep. The user might decide
+   * whether to write the aggregators values for the current superstep. For
+   * the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed.
+   *
+   * @param aggregatorMap Map of aggregators to write
+   * @param superstep Current superstep
+   * @throws IOException
+   */
+  void writeAggregator(
+      Map<String, Aggregator<Writable>> aggregatorMap,
+      long superstep) throws IOException;
 
-    /**
-     * The method is called at the end of a successful computation. The method
-     * is not called when the job fails and a new master is elected. For this
-     * reason it's advised to flush data at the end of 
-     * {@link AggregatorWriter#writeAggregator(Map, long)}.
-     * 
-     * @throws IOException
-     */
-    void close() throws IOException;
+  /**
+   * The method is called at the end of a successful computation. The method
+   * is not called when the job fails and a new master is elected. For this
+   * reason it's advised to flush data at the end of
+   * {@link AggregatorWriter#writeAggregator(Map, long)}.
+   *
+   * @throws IOException
+   */
+  void close() throws IOException;
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Thu Feb 16 22:12:31 2012
@@ -1,4 +1,4 @@
- /*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,249 +28,257 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
- /**
+/**
  * Basic interface for writing a BSP application for computation.
  *
- * @param <I> vertex id
- * @param <V> vertex data
- * @param <E> edge data
- * @param <M> message data
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public abstract class BasicVertex<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable>
-        implements AggregatorUsage, Iterable<I>, Writable, Configurable {
-    /** Global graph state **/
-    private GraphState<I,V,E,M> graphState;
-    /** Configuration */
-    private Configuration conf;
-    /** If true, do not do anymore computation on this vertex. */
-    boolean halt = false;
-
-    /**
-     * This method must be called after instantiation of a vertex with BspUtils
-     * unless deserialization from readFields() is called.
-     *
-     * @param vertexId Will be the vertex id
-     * @param vertexValue Will be the vertex value
-     * @param edges A map of destination edge ids to edge values (can be null)
-     * @param messages Initial messages for this vertex (can be null)
-     */
-    public abstract void initialize(
-        I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages);
-
-    /**
-     * Must be defined by user to do computation on a single Vertex.
-     *
-     * @param msgIterator Iterator to the messages that were sent to this
-     *        vertex in the previous superstep
-     * @throws IOException
-     */
-    public abstract void compute(Iterator<M> msgIterator) throws IOException;
-
-    /**
-     * Retrieves the current superstep.
-     *
-     * @return Current superstep
-     */
-    public long getSuperstep() {
-        return getGraphState().getSuperstep();
-    }
-
-    /**
-     * Get the vertex id
-     */
-    public abstract I getVertexId();
-
-    /**
-     * Get the vertex value (data stored with vertex)
-     *
-     * @return Vertex value
-     */
-    public abstract V getVertexValue();
-
-    /**
-     * Set the vertex data (immediately visible in the computation)
-     *
-     * @param vertexValue Vertex data to be set
-     */
-    public abstract void setVertexValue(V vertexValue);
-
-    /**
-     * Get the total (all workers) number of vertices that
-     * existed in the previous superstep.
-     *
-     * @return Total number of vertices (-1 if first superstep)
-     */
-    public long getNumVertices() {
-        return getGraphState().getNumVertices();
-    }
-
-    /**
-     * Get the total (all workers) number of edges that
-     * existed in the previous superstep.
-     *
-     * @return Total number of edges (-1 if first superstep)
-     */
-    public long getNumEdges() {
-        return getGraphState().getNumEdges();
-    }
-
-    /**
-     * Get a read-only view of the out-edges of this vertex.
-     *
-     * @return the out edges (sort order determined by subclass implementation).
-     */
-    @Override
-    public abstract Iterator<I> iterator();
-
-    /**
-     * Get the edge value associated with a target vertex id.
-     *
-     * @param targetVertexId Target vertex id to check
-     *
-     * @return the value of the edge to targetVertexId (or null if there
-     *         is no edge to it)
-     */
-    public abstract E getEdgeValue(I targetVertexId);
-
-    /**
-     * Does an edge with the target vertex id exist?
-     *
-     * @param targetVertexId Target vertex id to check
-     * @return true if there is an edge to the target
-     */
-    public abstract boolean hasEdge(I targetVertexId);
-
-    /**
-     * Get the number of outgoing edges on this vertex.
-     *
-     * @return the total number of outbound edges from this vertex
-     */
-    public abstract int getNumOutEdges();
-
-    /**
-     * Send a message to a vertex id.  The message should not be mutated after
-     * this method returns or else undefined results could occur.
-     *
-     * @param id Vertex id to send the message to
-     * @param msg Message data to send.  Note that after the message is sent,
-     *        the user should not modify the object.
-     */
-    public void sendMsg(I id, M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException(
-                "sendMsg: Cannot send null message to " + id);
-        }
-        getGraphState().getWorkerCommunications().
-            sendMessageReq(id, msg);
-    }
-
-    /**
-     * Send a message to all edges.
-     */
-    public abstract void sendMsgToAllEdges(M msg);
-
-    /**
-     * After this is called, the compute() code will no longer be called for
-     * this vertex unless a message is sent to it.  Then the compute() code
-     * will be called once again until this function is called.  The
-     * application finishes only when all vertices vote to halt.
-     */
-    public void voteToHalt() {
-        halt = true;
-    }
-
-    /**
-     * Is this vertex done?
-     */
-    public boolean isHalted() {
-        return halt;
-    }
-
-    /**
-     *  Get the list of incoming messages from the previous superstep.  Same as
-     *  the message iterator passed to compute().
-     */
-    public abstract Iterable<M> getMessages();
-
-    /**
-     * Copy the messages this vertex should process in the current superstep
-     *
-     * @param messages the messages sent to this vertex in the previous superstep
-     */
-    abstract void putMessages(Iterable<M> messages);
-
-    /**
-     * Release unnecessary resources (will be called after vertex returns from
-     * {@link #compute()})
-     */
-    abstract void releaseResources();
-
-    /**
-     * Get the graph state for all workers.
-     *
-     * @return Graph state for all workers
-     */
-    GraphState<I, V, E, M> getGraphState() {
-        return graphState;
-    }
-
-    /**
-     * Set the graph state for all workers
-     *
-     * @param graphState Graph state for all workers
-     */
-    void setGraphState(GraphState<I, V, E, M> graphState) {
-        this.graphState = graphState;
-    }
-
-    /**
-     * Get the mapper context
-     *
-     * @return Mapper context
-     */
-     public Mapper.Context getContext() {
-         return getGraphState().getContext();
-     }
-
-    /**
-     * Get the worker context
-     *
-     * @return WorkerContext context
-     */
-    public WorkerContext getWorkerContext() {
-        return getGraphState().getGraphMapper().getWorkerContext();
-    }
-
-    @Override
-    public final <A extends Writable> Aggregator<A> registerAggregator(
-            String name,
-            Class<? extends Aggregator<A>> aggregatorClass)
-            throws InstantiationException, IllegalAccessException {
-        return getGraphState().getGraphMapper().getAggregatorUsage().
-            registerAggregator(name, aggregatorClass);
-    }
-
-    @Override
-    public final Aggregator<? extends Writable> getAggregator(String name) {
-        return getGraphState().getGraphMapper().getAggregatorUsage().
-            getAggregator(name);
-    }
-
-    @Override
-    public final boolean useAggregator(String name) {
-        return getGraphState().getGraphMapper().getAggregatorUsage().
-            useAggregator(name);
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
+    V extends Writable, E extends Writable, M extends Writable>
+    implements AggregatorUsage, Iterable<I>, Writable, Configurable {
+  /** If true, do not do anymore computation on this vertex. */
+  protected boolean halt = false;
+  /** Global graph state **/
+  private GraphState<I, V, E, M> graphState;
+  /** Configuration */
+  private Configuration conf;
+
+
+  /**
+   * This method must be called after instantiation of a vertex with BspUtils
+   * unless deserialization from readFields() is called.
+   *
+   * @param vertexId Will be the vertex id
+   * @param vertexValue Will be the vertex value
+   * @param edges A map of destination edge ids to edge values (can be null)
+   * @param messages Initial messages for this vertex (can be null)
+   */
+  public abstract void initialize(
+      I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages);
+
+  /**
+   * Must be defined by user to do computation on a single Vertex.
+   *
+   * @param msgIterator Iterator to the messages that were sent to this
+   *        vertex in the previous superstep
+   * @throws IOException
+   */
+  public abstract void compute(Iterator<M> msgIterator) throws IOException;
+
+  /**
+   * Retrieves the current superstep.
+   *
+   * @return Current superstep
+   */
+  public long getSuperstep() {
+    return getGraphState().getSuperstep();
+  }
+
+  /**
+   * Get the vertex id.
+   *
+   * @return My vertex id.
+   */
+  public abstract I getVertexId();
+
+  /**
+   * Get the vertex value (data stored with vertex)
+   *
+   * @return Vertex value
+   */
+  public abstract V getVertexValue();
+
+  /**
+   * Set the vertex data (immediately visible in the computation)
+   *
+   * @param vertexValue Vertex data to be set
+   */
+  public abstract void setVertexValue(V vertexValue);
+
+  /**
+   * Get the total (all workers) number of vertices that
+   * existed in the previous superstep.
+   *
+   * @return Total number of vertices (-1 if first superstep)
+   */
+  public long getNumVertices() {
+    return getGraphState().getNumVertices();
+  }
+
+  /**
+   * Get the total (all workers) number of edges that
+   * existed in the previous superstep.
+   *
+   * @return Total number of edges (-1 if first superstep)
+   */
+  public long getNumEdges() {
+    return getGraphState().getNumEdges();
+  }
+
+  /**
+   * Get a read-only view of the out-edges of this vertex.
+   *
+   * @return the out edges (sort order determined by subclass implementation).
+   */
+  @Override
+  public abstract Iterator<I> iterator();
+
+  /**
+   * Get the edge value associated with a target vertex id.
+   *
+   * @param targetVertexId Target vertex id to check
+   *
+   * @return the value of the edge to targetVertexId (or null if there
+   *         is no edge to it)
+   */
+  public abstract E getEdgeValue(I targetVertexId);
+
+  /**
+   * Does an edge with the target vertex id exist?
+   *
+   * @param targetVertexId Target vertex id to check
+   * @return true if there is an edge to the target
+   */
+  public abstract boolean hasEdge(I targetVertexId);
+
+  /**
+   * Get the number of outgoing edges on this vertex.
+   *
+   * @return the total number of outbound edges from this vertex
+   */
+  public abstract int getNumOutEdges();
+
+  /**
+   * Send a message to a vertex id.  The message should not be mutated after
+   * this method returns or else undefined results could occur.
+   *
+   * @param id Vertex id to send the message to
+   * @param msg Message data to send.  Note that after the message is sent,
+   *        the user should not modify the object.
+   */
+  public void sendMsg(I id, M msg) {
+    if (msg == null) {
+      throw new IllegalArgumentException(
+          "sendMsg: Cannot send null message to " + id);
+    }
+    getGraphState().getWorkerCommunications().
+    sendMessageReq(id, msg);
+  }
+
+  /**
+   * Send a message to all edges.
+   *
+   * @param msg Message sent to all edges.
+   */
+  public abstract void sendMsgToAllEdges(M msg);
+
+  /**
+   * After this is called, the compute() code will no longer be called for
+   * this vertex unless a message is sent to it.  Then the compute() code
+   * will be called once again until this function is called.  The
+   * application finishes only when all vertices vote to halt.
+   */
+  public void voteToHalt() {
+    halt = true;
+  }
+
+  /**
+   * Is this vertex done?
+   *
+   * @return True if halted, false otherwise.
+   */
+  public boolean isHalted() {
+    return halt;
+  }
+
+  /**
+   *  Get the list of incoming messages from the previous superstep.  Same as
+   *  the message iterator passed to compute().
+   *
+   *  @return Iterator of messages.
+   */
+  public abstract Iterable<M> getMessages();
+
+  /**
+   * Copy the messages this vertex should process in the current superstep
+   *
+   * @param messages the messages sent to this vertex in the previous superstep
+   */
+  abstract void putMessages(Iterable<M> messages);
+
+  /**
+   * Release unnecessary resources (will be called after vertex returns from
+   * {@link #compute()})
+   */
+  abstract void releaseResources();
+
+  /**
+   * Get the graph state for all workers.
+   *
+   * @return Graph state for all workers
+   */
+  GraphState<I, V, E, M> getGraphState() {
+    return graphState;
+  }
+
+  /**
+   * Set the graph state for all workers
+   *
+   * @param graphState Graph state for all workers
+   */
+  void setGraphState(GraphState<I, V, E, M> graphState) {
+    this.graphState = graphState;
+  }
+
+  /**
+   * Get the mapper context
+   *
+   * @return Mapper context
+   */
+  public Mapper.Context getContext() {
+    return getGraphState().getContext();
+  }
+
+  /**
+   * Get the worker context
+   *
+   * @return WorkerContext context
+   */
+  public WorkerContext getWorkerContext() {
+    return getGraphState().getGraphMapper().getWorkerContext();
+  }
+
+  @Override
+  public final <A extends Writable> Aggregator<A> registerAggregator(
+    String name, Class<? extends Aggregator<A>> aggregatorClass)
+    throws InstantiationException, IllegalAccessException {
+    return getGraphState().getGraphMapper().getAggregatorUsage().
+        registerAggregator(name, aggregatorClass);
+  }
+
+  @Override
+  public final Aggregator<? extends Writable> getAggregator(String name) {
+    return getGraphState().getGraphMapper().getAggregatorUsage().
+      getAggregator(name);
+  }
+
+  @Override
+  public final boolean useAggregator(String name) {
+    return getGraphState().getGraphMapper().getAggregatorUsage().
+      useAggregator(name);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java Thu Feb 16 22:12:31 2012
@@ -24,36 +24,38 @@ import org.apache.hadoop.io.WritableComp
 /**
  * Handles all the situations that can arise upon creation/removal of
  * vertices and edges.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public interface BasicVertexResolver<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable> {
-    /**
-     * A vertex may have been removed, created zero or more times and had
-     * zero or more messages sent to it.  This method will handle all situations
-     * excluding the normal case (a vertex already exists and has zero or more
-     * messages sent it to).
-     *
-     * @param vertexId Vertex id (can be used for {@link BasicVertex}'s
-     *        initialize())
-     * @param vertex Original vertex or null if none
-     * @param vertexChanges Changes that happened to this vertex or null if none
-     * @param messages messages received in the last superstep or null if none
-     * @return Vertex to be returned, if null, and a vertex currently exists
-     *         it will be removed
-     */
-    BasicVertex<I, V, E, M> resolve(I vertexId,
-                                    BasicVertex<I, V, E, M> vertex,
-                                    VertexChanges<I, V, E, M> vertexChanges,
-                                    Iterable<M> messages);
+public interface BasicVertexResolver<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * A vertex may have been removed, created zero or more times and had
+   * zero or more messages sent to it.  This method will handle all situations
+   * excluding the normal case (a vertex already exists and has zero or more
+   * messages sent it to).
+   *
+   * @param vertexId Vertex id (can be used for {@link BasicVertex}'s
+   *        initialize())
+   * @param vertex Original vertex or null if none
+   * @param vertexChanges Changes that happened to this vertex or null if none
+   * @param messages messages received in the last superstep or null if none
+   * @return Vertex to be returned, if null, and a vertex currently exists
+   *         it will be removed
+   */
+  BasicVertex<I, V, E, M> resolve(I vertexId,
+      BasicVertex<I, V, E, M> vertex,
+      VertexChanges<I, V, E, M> vertexChanges,
+      Iterable<M> messages);
 
-    /**
-     * Create a default vertex that can be used to return from resolve().
-     *
-     * @return Newly instantiated vertex.
-     */
-    BasicVertex<I, V, E, M> instantiateVertex();
+  /**
+   * Create a default vertex that can be used to return from resolve().
+   *
+   * @return Newly instantiated vertex.
+   */
+  BasicVertex<I, V, E, M> instantiateVertex();
 }