You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2013/02/21 11:16:37 UTC
svn commit: r1448567 - in /hama/trunk: ./ core/
core/src/main/java/org/apache/hama/bsp/
graph/src/main/java/org/apache/hama/graph/
Author: tjungblut
Date: Thu Feb 21 10:16:36 2013
New Revision: 1448567
URL: http://svn.apache.org/r1448567
Log:
fixing build issues on Java6 with type inference, update to guava 13 and start refactoring of vertices and their storage
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
- copied, changed from r1448553, hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
Removed:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
Modified:
hama/trunk/core/pom.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
hama/trunk/pom.xml
Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Thu Feb 21 10:16:36 2013
@@ -39,7 +39,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>10.0.1</version>
+ <version>13.0.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Feb 21 10:16:36 2013
@@ -24,9 +24,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
@@ -43,7 +41,7 @@ public class BSPMessageBundle<M extends
public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
- private HashMap<String, LinkedList<M>> messages = new HashMap<String, LinkedList<M>>();
+ private HashMap<String, List<M>> messages = new HashMap<String, List<M>>();
private HashMap<String, Class<M>> classCache = new HashMap<String, Class<M>>();
public BSPMessageBundle() {
@@ -56,22 +54,20 @@ public class BSPMessageBundle<M extends
*/
public void addMessage(M message) {
String className = message.getClass().getName();
- if (!messages.containsKey(className)) {
- // use linked list because we're just iterating over them
- LinkedList<M> list = new LinkedList<M>();
- list.add(message);
+ List<M> list = messages.get(className);
+ if (list == null) {
+ list = new ArrayList<M>();
messages.put(className, list);
- list = null;
- } else {
- messages.get(className).add(message);
}
+
+ list.add(message);
}
public List<M> getMessages() {
// here we use an arraylist, because we know the size and outside may need
// random access
List<M> mergeList = new ArrayList<M>(messages.size());
- for (LinkedList<M> c : messages.values()) {
+ for (List<M> c : messages.values()) {
mergeList.addAll(c);
}
return mergeList;
@@ -88,9 +84,9 @@ public class BSPMessageBundle<M extends
int classNames = 0;
DataOutputStream dos = null;
- for (Map.Entry<String, LinkedList<M>> e : messages.entrySet()) {
+ for (Entry<String, List<M>> e : messages.entrySet()) {
classNames += e.getKey().length();
- LinkedList<M> c = e.getValue();
+ List<M> c = e.getValue();
if (messages.size() == 1 && c.size() < sample) {
dos = new DataOutputStream(new ByteArrayOutputStream());
@@ -118,9 +114,9 @@ public class BSPMessageBundle<M extends
// writes the k/v mapping size
out.writeInt(messages.size());
if (messages.size() > 0) {
- for (Entry<String, LinkedList<M>> entry : messages.entrySet()) {
+ for (Entry<String, List<M>> entry : messages.entrySet()) {
out.writeUTF(entry.getKey());
- LinkedList<M> messageList = entry.getValue();
+ List<M> messageList = entry.getValue();
out.writeInt(messageList.size());
for (M msg : messageList) {
msg.write(out);
@@ -133,14 +129,14 @@ public class BSPMessageBundle<M extends
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
if (messages == null) {
- messages = new HashMap<String, LinkedList<M>>();
+ messages = new HashMap<String, List<M>>();
}
int numMessages = in.readInt();
if (numMessages > 0) {
for (int entries = 0; entries < numMessages; entries++) {
String className = in.readUTF();
int size = in.readInt();
- LinkedList<M> msgList = new LinkedList<M>();
+ List<M> msgList = new ArrayList<M>();
messages.put(className, msgList);
Class<M> clazz = null;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Thu Feb 21 10:16:36 2013
@@ -97,10 +97,15 @@ public class PartitioningRunner extends
KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
- @SuppressWarnings("rawtypes")
- Partitioner partitioner, Configuration conf,
- @SuppressWarnings("rawtypes")
- BSPPeer peer, int numTasks);
+ @SuppressWarnings("rawtypes") Partitioner partitioner,
+ Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+ int numTasks);
+
+ /**
+ * @return a map implementation, so order can be changed in subclasses if
+ * needed.
+ */
+ public Map<Writable, Writable> newMap();
}
/**
@@ -117,10 +122,9 @@ public class PartitioningRunner extends
@SuppressWarnings("unchecked")
@Override
public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
- @SuppressWarnings("rawtypes")
- Partitioner partitioner, Configuration conf,
- @SuppressWarnings("rawtypes")
- BSPPeer peer, int numTasks) {
+ @SuppressWarnings("rawtypes") Partitioner partitioner,
+ Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+ int numTasks) {
return Math.abs(partitioner.getPartition(outputRecord.getKey(),
outputRecord.getValue(), numTasks));
}
@@ -129,6 +133,11 @@ public class PartitioningRunner extends
public void setup(Configuration conf) {
}
+
+ @Override
+ public Map<Writable, Writable> newMap() {
+ return new HashMap<Writable, Writable>();
+ }
}
@Override
@@ -164,10 +173,12 @@ public class PartitioningRunner extends
int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
desiredNum);
- if (!values.containsKey(index)) {
- values.put(index, new HashMap<Writable, Writable>());
+ Map<Writable, Writable> map = values.get(index);
+ if (map == null) {
+ map = converter.newMap();
+ values.put(index, map);
}
- values.get(index).put(outputPair.getKey(), outputPair.getValue());
+ map.put(outputPair.getKey(), outputPair.getValue());
}
// The reason of use of Memory is to reduce file opens
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Feb 21 10:16:36 2013
@@ -77,7 +77,7 @@ public final class GraphJobRunner<V exte
public static Class<? extends Writable> EDGE_VALUE_CLASS;
public static Class<Vertex<?, ?, ?>> vertexClass;
- private VerticesInfo<V, E, M> vertices;
+ private IVerticesInfo<V, E, M> vertices;
private boolean updated = true;
private int globalUpdateCounts = 0;
@@ -234,7 +234,7 @@ public final class GraphJobRunner<V exte
maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
-1);
- initClasses(conf);
+ GraphJobRunner.<V, E, M> initClasses(conf);
partitioner = (Partitioner<V, M>) org.apache.hadoop.util.ReflectionUtils
.newInstance(
@@ -253,11 +253,11 @@ public final class GraphJobRunner<V exte
aggregationRunner = new AggregationRunner<V, E, M>();
aggregationRunner.setupAggregators(peer);
- vertices = new VerticesInfo<V, E, M>();
+ vertices = new ListVerticesInfo<V, E, M>();
}
@SuppressWarnings("unchecked")
- public static <V extends WritableComparable<V>, E extends Writable, M extends Writable> void initClasses(
+ public static <V extends WritableComparable<? super V>, E extends Writable, M extends Writable> void initClasses(
Configuration conf) {
Class<V> vertexIdClass = (Class<V>) conf.getClass(
GraphJob.VERTEX_ID_CLASS_ATTR, Text.class, Writable.class);
@@ -289,14 +289,16 @@ public final class GraphJobRunner<V exte
LOG.debug("Vertex class: " + vertexClass);
KeyValuePair<Writable, Writable> next;
+ // our VertexInputReader ensures that the incoming vertices are sorted by
+ // ID.
while ((next = peer.readNext()) != null) {
Vertex<V, E, M> vertex = (Vertex<V, E, M>) next.getKey();
vertex.runner = this;
vertex.setup(conf);
- vertices.addVertex(vertex);
if (selfReference) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
+ vertices.addVertex(vertex);
}
LOG.info(vertices.size() + " vertices are loaded into "
Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java?rev=1448567&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java Thu Feb 21 10:16:36 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * VerticesInfo interface encapsulates the storage of vertices in a BSP Task.
+ *
+ * @param <V> Vertex ID object type
+ * @param <E> Edge cost object type
+ * @param <M> Vertex value object type
+ */
+public interface IVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
+ extends Iterable<Vertex<V, E, M>> {
+
+ /**
+ * Add a vertex to the underlying structure.
+ */
+ public void addVertex(Vertex<V, E, M> vertex);
+
+ /**
+ * @return the number of vertices added to the underlying structure.
+ * Implementations should take care this is a constant time operation.
+ */
+ public int size();
+
+ @Override
+ public Iterator<Vertex<V, E, M>> iterator();
+
+ // to be added and documented soon
+ public void recoverState(DataInput in);
+
+ public void saveState(DataOutput out);
+
+}
Copied: hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (from r1448553, hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java)
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?p2=hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java&p1=hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java&r1=1448553&r2=1448567&rev=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java Thu Feb 21 10:16:36 2013
@@ -33,59 +33,58 @@ import org.apache.hadoop.io.WritableComp
* @param <E> Edge cost object type
* @param <M> Vertex value object type
*/
-public class VerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
- implements Iterable<Vertex<V, E, M>> {
+public final class ListVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
+ implements IVerticesInfo<V, E, M> {
private final List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>(
100);
+ /*
+ * (non-Javadoc)
+ * @see
+ * org.apache.hama.graph.IVerticesInfo#addVertex(org.apache.hama.graph.Vertex)
+ */
+ @Override
public void addVertex(Vertex<V, E, M> vertex) {
- int i = 0;
- for (Vertex<V, E, M> check : this) {
- if (check.getVertexID().equals(vertex.getVertexID())) {
- this.vertices.set(i, vertex);
- return;
- }
- ++i;
- }
vertices.add(vertex);
}
- public Vertex<V, E, M> getVertex(V vertexId) {
- for (Vertex<V, E, M> vertex : this) {
- if (vertex.getVertexID().equals(vertexId)) {
- return vertex;
- }
- }
- return null;
- }
-
- public boolean containsVertex(V vertexId) {
- for (Vertex<V, E, M> vertex : this) {
- if (vertex.getVertexID().equals(vertexId)) {
- return true;
- }
- }
- return false;
- }
-
public void clear() {
vertices.clear();
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.graph.IVerticesInfo#size()
+ */
+ @Override
public int size() {
return this.vertices.size();
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.graph.IVerticesInfo#iterator()
+ */
@Override
public Iterator<Vertex<V, E, M>> iterator() {
return vertices.iterator();
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.graph.IVerticesInfo#recoverState(java.io.DataInput)
+ */
+ @Override
public void recoverState(DataInput in) {
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.graph.IVerticesInfo#saveState(java.io.DataOutput)
+ */
+ @Override
public void saveState(DataOutput out) {
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Feb 21 10:16:36 2013
@@ -47,7 +47,7 @@ import org.apache.hama.bsp.Partitioner;
* @param <M> Vertex value object type
*/
public abstract class Vertex<V extends WritableComparable<? super V>, E extends Writable, M extends Writable>
- implements VertexInterface<V, E, M>, Writable {
+ implements VertexInterface<V, E, M> {
GraphJobRunner<?, ?, ?> runner;
@@ -310,6 +310,12 @@ public abstract class Vertex<V extends W
}
+ // compare across the vertex ID
+ @Override
+ public final int compareTo(VertexInterface<V, E, M> o) {
+ return getVertexID().compareTo(o.getVertexID());
+ }
+
/**
* Read the state of the vertex from the input stream. The framework would
* have already constructed and loaded the vertex-id, edges and voteToHalt
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java Thu Feb 21 10:16:36 2013
@@ -17,6 +17,9 @@
*/
package org.apache.hama.graph;
+import java.util.Map;
+import java.util.TreeMap;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -46,7 +49,7 @@ public abstract class VertexInputReader<
@Override
public void setup(Configuration conf) {
// initialize the usual vertex structures for read/write methods
- GraphJobRunner.initClasses(conf);
+ GraphJobRunner.<V, E, M> initClasses(conf);
}
private final KeyValuePair<Writable, Writable> outputRecord = new KeyValuePair<Writable, Writable>();
@@ -66,7 +69,8 @@ public abstract class VertexInputReader<
Class<Vertex<V, E, M>> vertexClass = (Class<Vertex<V, E, M>>) conf
.getClass(GraphJob.VERTEX_CLASS_ATTR, Vertex.class);
boolean vertexCreation;
- Vertex<V, E, M> vertex = GraphJobRunner.newVertexInstance(vertexClass);
+ Vertex<V, E, M> vertex = GraphJobRunner
+ .<V, E, M> newVertexInstance(vertexClass);
try {
vertexCreation = parseVertex((KEYIN) inputRecord.getKey(),
(VALUEIN) inputRecord.getValue(), vertex);
@@ -93,4 +97,10 @@ public abstract class VertexInputReader<
vertex.getValue(), numTasks));
}
+ // final because we don't want vertices to change ordering
+ @Override
+ public final Map<Writable, Writable> newMap() {
+ return new TreeMap<Writable, Writable>();
+ }
+
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Feb 21 10:16:36 2013
@@ -34,7 +34,8 @@ import org.apache.hadoop.io.WritableComp
* edge.
* @param <M> the type used for messaging, usually the value of a vertex.
*/
-public interface VertexInterface<V extends WritableComparable<? super V>, E extends Writable, M extends Writable> {
+public interface VertexInterface<V extends WritableComparable<? super V>, E extends Writable, M extends Writable>
+ extends WritableComparable<VertexInterface<V, E, M>> {
/**
* Used to setup a vertex.
Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1448567&r1=1448566&r2=1448567&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Thu Feb 21 10:16:36 2013
@@ -111,7 +111,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>10.0.1</version>
+ <version>13.0.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>