You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2013/02/21 11:16:37 UTC

svn commit: r1448567 - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ graph/src/main/java/org/apache/hama/graph/

Author: tjungblut
Date: Thu Feb 21 10:16:36 2013
New Revision: 1448567

URL: http://svn.apache.org/r1448567
Log:
fixing build issues on Java6 with type inference, update to guava 13 and start refactoring of vertices and their storage

Added:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
      - copied, changed from r1448553, hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
Removed:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
Modified:
    hama/trunk/core/pom.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    hama/trunk/pom.xml

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Thu Feb 21 10:16:36 2013
@@ -39,7 +39,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>10.0.1</version>
+      <version>13.0.1</version>
     </dependency>
     <dependency>
       <groupId>commons-logging</groupId>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Feb 21 10:16:36 2013
@@ -24,9 +24,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -43,7 +41,7 @@ public class BSPMessageBundle<M extends 
 
   public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
 
-  private HashMap<String, LinkedList<M>> messages = new HashMap<String, LinkedList<M>>();
+  private HashMap<String, List<M>> messages = new HashMap<String, List<M>>();
   private HashMap<String, Class<M>> classCache = new HashMap<String, Class<M>>();
 
   public BSPMessageBundle() {
@@ -56,22 +54,20 @@ public class BSPMessageBundle<M extends 
    */
   public void addMessage(M message) {
     String className = message.getClass().getName();
-    if (!messages.containsKey(className)) {
-      // use linked list because we're just iterating over them
-      LinkedList<M> list = new LinkedList<M>();
-      list.add(message);
+    List<M> list = messages.get(className);
+    if (list == null) {
+      list = new ArrayList<M>();
       messages.put(className, list);
-      list = null;
-    } else {
-      messages.get(className).add(message);
     }
+
+    list.add(message);
   }
 
   public List<M> getMessages() {
     // here we use an arraylist, because we know the size and outside may need
     // random access
     List<M> mergeList = new ArrayList<M>(messages.size());
-    for (LinkedList<M> c : messages.values()) {
+    for (List<M> c : messages.values()) {
       mergeList.addAll(c);
     }
     return mergeList;
@@ -88,9 +84,9 @@ public class BSPMessageBundle<M extends 
     int classNames = 0;
     DataOutputStream dos = null;
 
-    for (Map.Entry<String, LinkedList<M>> e : messages.entrySet()) {
+    for (Entry<String, List<M>> e : messages.entrySet()) {
       classNames += e.getKey().length();
-      LinkedList<M> c = e.getValue();
+      List<M> c = e.getValue();
 
       if (messages.size() == 1 && c.size() < sample) {
         dos = new DataOutputStream(new ByteArrayOutputStream());
@@ -118,9 +114,9 @@ public class BSPMessageBundle<M extends 
     // writes the k/v mapping size
     out.writeInt(messages.size());
     if (messages.size() > 0) {
-      for (Entry<String, LinkedList<M>> entry : messages.entrySet()) {
+      for (Entry<String, List<M>> entry : messages.entrySet()) {
         out.writeUTF(entry.getKey());
-        LinkedList<M> messageList = entry.getValue();
+        List<M> messageList = entry.getValue();
         out.writeInt(messageList.size());
         for (M msg : messageList) {
           msg.write(out);
@@ -133,14 +129,14 @@ public class BSPMessageBundle<M extends 
   @SuppressWarnings("unchecked")
   public void readFields(DataInput in) throws IOException {
     if (messages == null) {
-      messages = new HashMap<String, LinkedList<M>>();
+      messages = new HashMap<String, List<M>>();
     }
     int numMessages = in.readInt();
     if (numMessages > 0) {
       for (int entries = 0; entries < numMessages; entries++) {
         String className = in.readUTF();
         int size = in.readInt();
-        LinkedList<M> msgList = new LinkedList<M>();
+        List<M> msgList = new ArrayList<M>();
         messages.put(className, msgList);
 
         Class<M> clazz = null;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Thu Feb 21 10:16:36 2013
@@ -97,10 +97,15 @@ public class PartitioningRunner extends
         KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
 
     public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
-        @SuppressWarnings("rawtypes")
-        Partitioner partitioner, Configuration conf,
-        @SuppressWarnings("rawtypes")
-        BSPPeer peer, int numTasks);
+        @SuppressWarnings("rawtypes") Partitioner partitioner,
+        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+        int numTasks);
+
+    /**
+     * @return a map implementation, so order can be changed in subclasses if
+     *         needed.
+     */
+    public Map<Writable, Writable> newMap();
   }
 
   /**
@@ -117,10 +122,9 @@ public class PartitioningRunner extends
     @SuppressWarnings("unchecked")
     @Override
     public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
-        @SuppressWarnings("rawtypes")
-        Partitioner partitioner, Configuration conf,
-        @SuppressWarnings("rawtypes")
-        BSPPeer peer, int numTasks) {
+        @SuppressWarnings("rawtypes") Partitioner partitioner,
+        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+        int numTasks) {
       return Math.abs(partitioner.getPartition(outputRecord.getKey(),
           outputRecord.getValue(), numTasks));
     }
@@ -129,6 +133,11 @@ public class PartitioningRunner extends
     public void setup(Configuration conf) {
 
     }
+
+    @Override
+    public Map<Writable, Writable> newMap() {
+      return new HashMap<Writable, Writable>();
+    }
   }
 
   @Override
@@ -164,10 +173,12 @@ public class PartitioningRunner extends
       int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
           desiredNum);
 
-      if (!values.containsKey(index)) {
-        values.put(index, new HashMap<Writable, Writable>());
+      Map<Writable, Writable> map = values.get(index);
+      if (map == null) {
+        map = converter.newMap();
+        values.put(index, map);
       }
-      values.get(index).put(outputPair.getKey(), outputPair.getValue());
+      map.put(outputPair.getKey(), outputPair.getValue());
     }
 
     // The reason of use of Memory is to reduce file opens

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Feb 21 10:16:36 2013
@@ -77,7 +77,7 @@ public final class GraphJobRunner<V exte
   public static Class<? extends Writable> EDGE_VALUE_CLASS;
   public static Class<Vertex<?, ?, ?>> vertexClass;
 
-  private VerticesInfo<V, E, M> vertices;
+  private IVerticesInfo<V, E, M> vertices;
   private boolean updated = true;
   private int globalUpdateCounts = 0;
 
@@ -234,7 +234,7 @@ public final class GraphJobRunner<V exte
     maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
         -1);
 
-    initClasses(conf);
+    GraphJobRunner.<V, E, M> initClasses(conf);
 
     partitioner = (Partitioner<V, M>) org.apache.hadoop.util.ReflectionUtils
         .newInstance(
@@ -253,11 +253,11 @@ public final class GraphJobRunner<V exte
     aggregationRunner = new AggregationRunner<V, E, M>();
     aggregationRunner.setupAggregators(peer);
 
-    vertices = new VerticesInfo<V, E, M>();
+    vertices = new ListVerticesInfo<V, E, M>();
   }
 
   @SuppressWarnings("unchecked")
-  public static <V extends WritableComparable<V>, E extends Writable, M extends Writable> void initClasses(
+  public static <V extends WritableComparable<? super V>, E extends Writable, M extends Writable> void initClasses(
       Configuration conf) {
     Class<V> vertexIdClass = (Class<V>) conf.getClass(
         GraphJob.VERTEX_ID_CLASS_ATTR, Text.class, Writable.class);
@@ -289,14 +289,16 @@ public final class GraphJobRunner<V exte
       LOG.debug("Vertex class: " + vertexClass);
 
     KeyValuePair<Writable, Writable> next;
+    // our VertexInputReader ensures that the incoming vertices are sorted by
+    // ID.
     while ((next = peer.readNext()) != null) {
       Vertex<V, E, M> vertex = (Vertex<V, E, M>) next.getKey();
       vertex.runner = this;
       vertex.setup(conf);
-      vertices.addVertex(vertex);
       if (selfReference) {
         vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
       }
+      vertices.addVertex(vertex);
     }
 
     LOG.info(vertices.size() + " vertices are loaded into "

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java?rev=1448567&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java Thu Feb 21 10:16:36 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * VerticesInfo interface encapsulates the storage of vertices in a BSP Task.
+ * 
+ * @param <V> Vertex ID object type
+ * @param <E> Edge cost object type
+ * @param <M> Vertex value object type
+ */
+public interface IVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
+    extends Iterable<Vertex<V, E, M>> {
+
+  /**
+   * Add a vertex to the underlying structure.
+   */
+  public void addVertex(Vertex<V, E, M> vertex);
+
+  /**
+   * @return the number of vertices added to the underlying structure.
+   *         Implementations should take care this is a constant time operation.
+   */
+  public int size();
+
+  @Override
+  public Iterator<Vertex<V, E, M>> iterator();
+
+  // to be added and documented soon
+  public void recoverState(DataInput in);
+
+  public void saveState(DataOutput out);
+
+}

Copied: hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (from r1448553, hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java)
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?p2=hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java&p1=hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java&r1=1448553&r2=1448567&rev=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java Thu Feb 21 10:16:36 2013
@@ -33,59 +33,58 @@ import org.apache.hadoop.io.WritableComp
  * @param <E> Edge cost object type
  * @param <M> Vertex value object type
  */
-public class VerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
-    implements Iterable<Vertex<V, E, M>> {
+public final class ListVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
+    implements IVerticesInfo<V, E, M> {
 
   private final List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>(
       100);
 
+  /*
+   * (non-Javadoc)
+   * @see
+   * org.apache.hama.graph.IVerticesInfo#addVertex(org.apache.hama.graph.Vertex)
+   */
+  @Override
   public void addVertex(Vertex<V, E, M> vertex) {
-    int i = 0;
-    for (Vertex<V, E, M> check : this) {
-      if (check.getVertexID().equals(vertex.getVertexID())) {
-        this.vertices.set(i, vertex);
-        return;
-      }
-      ++i;
-    }
     vertices.add(vertex);
   }
 
-  public Vertex<V, E, M> getVertex(V vertexId) {
-    for (Vertex<V, E, M> vertex : this) {
-      if (vertex.getVertexID().equals(vertexId)) {
-        return vertex;
-      }
-    }
-    return null;
-  }
-
-  public boolean containsVertex(V vertexId) {
-    for (Vertex<V, E, M> vertex : this) {
-      if (vertex.getVertexID().equals(vertexId)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   public void clear() {
     vertices.clear();
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.graph.IVerticesInfo#size()
+   */
+  @Override
   public int size() {
     return this.vertices.size();
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.graph.IVerticesInfo#iterator()
+   */
   @Override
   public Iterator<Vertex<V, E, M>> iterator() {
     return vertices.iterator();
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.graph.IVerticesInfo#recoverState(java.io.DataInput)
+   */
+  @Override
   public void recoverState(DataInput in) {
 
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.graph.IVerticesInfo#saveState(java.io.DataOutput)
+   */
+  @Override
   public void saveState(DataOutput out) {
 
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Feb 21 10:16:36 2013
@@ -47,7 +47,7 @@ import org.apache.hama.bsp.Partitioner;
  * @param <M> Vertex value object type
  */
 public abstract class Vertex<V extends WritableComparable<? super V>, E extends Writable, M extends Writable>
-    implements VertexInterface<V, E, M>, Writable {
+    implements VertexInterface<V, E, M> {
 
   GraphJobRunner<?, ?, ?> runner;
 
@@ -310,6 +310,12 @@ public abstract class Vertex<V extends W
 
   }
 
+  // compare across the vertex ID
+  @Override
+  public final int compareTo(VertexInterface<V, E, M> o) {
+    return getVertexID().compareTo(o.getVertexID());
+  }
+
   /**
    * Read the state of the vertex from the input stream. The framework would
    * have already constructed and loaded the vertex-id, edges and voteToHalt

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java Thu Feb 21 10:16:36 2013
@@ -17,6 +17,9 @@
  */
 package org.apache.hama.graph;
 
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -46,7 +49,7 @@ public abstract class VertexInputReader<
   @Override
   public void setup(Configuration conf) {
     // initialize the usual vertex structures for read/write methods
-    GraphJobRunner.initClasses(conf);
+    GraphJobRunner.<V, E, M> initClasses(conf);
   }
 
   private final KeyValuePair<Writable, Writable> outputRecord = new KeyValuePair<Writable, Writable>();
@@ -66,7 +69,8 @@ public abstract class VertexInputReader<
     Class<Vertex<V, E, M>> vertexClass = (Class<Vertex<V, E, M>>) conf
         .getClass(GraphJob.VERTEX_CLASS_ATTR, Vertex.class);
     boolean vertexCreation;
-    Vertex<V, E, M> vertex = GraphJobRunner.newVertexInstance(vertexClass);
+    Vertex<V, E, M> vertex = GraphJobRunner
+        .<V, E, M> newVertexInstance(vertexClass);
     try {
       vertexCreation = parseVertex((KEYIN) inputRecord.getKey(),
           (VALUEIN) inputRecord.getValue(), vertex);
@@ -93,4 +97,10 @@ public abstract class VertexInputReader<
         vertex.getValue(), numTasks));
   }
 
+  // final because we don't want vertices to change ordering
+  @Override
+  public final Map<Writable, Writable> newMap() {
+    return new TreeMap<Writable, Writable>();
+  }
+
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Feb 21 10:16:36 2013
@@ -34,7 +34,8 @@ import org.apache.hadoop.io.WritableComp
  *          edge.
  * @param <M> the type used for messaging, usually the value of a vertex.
  */
-public interface VertexInterface<V extends WritableComparable<? super V>, E extends Writable, M extends Writable> {
+public interface VertexInterface<V extends WritableComparable<? super V>, E extends Writable, M extends Writable>
+    extends WritableComparable<VertexInterface<V, E, M>> {
 
   /**
    * Used to setup a vertex.

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Thu Feb 21 10:16:36 2013
@@ -111,7 +111,7 @@
       <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
-        <version>10.0.1</version> 
+        <version>13.0.1</version> 
       </dependency>    
       <dependency>
         <groupId>commons-logging</groupId>