You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2013/02/26 11:52:53 UTC
svn commit: r1450126 - in /hama/branches/hama-732/graph: pom.xml
src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
Author: tommaso
Date: Tue Feb 26 10:52:52 2013
New Revision: 1450126
URL: http://svn.apache.org/r1450126
Log:
HAMA-732 - readded sketch impl of OffHeapVerticesInfo
Added:
hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (with props)
hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java (with props)
Modified:
hama/branches/hama-732/graph/pom.xml
Modified: hama/branches/hama-732/graph/pom.xml
URL: http://svn.apache.org/viewvc/hama/branches/hama-732/graph/pom.xml?rev=1450126&r1=1450125&r2=1450126&view=diff
==============================================================================
--- hama/branches/hama-732/graph/pom.xml (original)
+++ hama/branches/hama-732/graph/pom.xml Tue Feb 26 10:52:52 2013
@@ -43,6 +43,16 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.directmemory</groupId>
+ <artifactId>directmemory-cache</artifactId>
+ <version>0.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.directmemory</groupId>
+ <artifactId>directmemory-protostuff</artifactId>
+ <version>0.2-SNAPSHOT</version>
+ </dependency>
</dependencies>
<build>
<finalName>hama-graph-${project.version}</finalName>
Added: hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1450126&view=auto
==============================================================================
--- hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (added)
+++ hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java Tue Feb 26 10:52:52 2013
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.directmemory.DirectMemory;
+import org.apache.directmemory.cache.CacheService;
+import org.apache.directmemory.utils.CacheValuesIterable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * An off heap version of a {@link Vertex} storage.
+ */
+public class OffHeapVerticesInfo<V extends WritableComparable, E extends Writable, M extends Writable>
+ implements VerticesInfo<V, E, M> {
+
+ public static final String DM_STRICT_ITERATOR = "dm.iterator.strict";
+ public static final String DM_BUFFERS = "dm.buffers";
+ public static final String DM_SIZE = "dm.size";
+ public static final String DM_CAPACITY = "dm.capacity";
+ public static final String DM_CONCURRENCY = "dm.concurrency";
+ public static final String DM_DISPOSAL_TIME = "dm.disposal.time";
+ public static final String DM_SERIALIZER = "dm.serializer";
+
+ private CacheService<V, Vertex<V, E, M>> vertices;
+
+ private boolean strict;
+
+ @Override
+ public void init(GraphJobRunner<V, E, M> runner, Configuration conf, TaskAttemptID attempt) throws IOException {
+ this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true);
+ this.vertices = new DirectMemory<V, Vertex<V, E, M>>()
+ .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 10))
+ .setSize(conf.getInt(DM_SIZE, 102400))
+ .setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
+ .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10))
+ .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 360000))
+ .newCacheService();
+ }
+
+ @Override
+ public void cleanup(Configuration conf, TaskAttemptID attempt) throws IOException {
+ }
+
+ public void addVertex(Vertex<V, E, M> vertex) {
+ vertices.put(vertex.getVertexID(), vertex);
+ }
+
+ @Override
+ public void finishAdditions() {
+ }
+
+ @Override
+ public void startSuperstep() throws IOException {
+ }
+
+ @Override
+ public void finishSuperstep() throws IOException {
+ }
+
+ @Override
+ public void finishVertexComputation(Vertex<V, E, M> vertex) throws IOException {
+ }
+
+ @Override
+ public boolean isFinishedAdditions() {
+ return false;
+ }
+
+ public void clear() {
+ vertices.clear();
+ }
+
+ public int size() {
+ return (int) this.vertices.entries();
+ }
+
+ @Override
+ public IDSkippingIterator<V, E, M> skippingIterator() {
+ final Iterator<Vertex<V, E, M>> vertexIterator =
+ new CacheValuesIterable<V, Vertex<V, E, M>>(vertices, strict).iterator();
+ return new IDSkippingIterator<V, E, M>() {
+ @Override
+ public boolean hasNext(V e, Strategy strat) {
+ return vertexIterator.hasNext();
+ }
+
+ @Override
+ public Vertex<V, E, M> next() {
+ return vertexIterator.next();
+ }
+ };
+ }
+
+}
Propchange: hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java?rev=1450126&view=auto
==============================================================================
--- hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java (added)
+++ hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java Tue Feb 26 10:52:52 2013
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.graph.example.PageRank.PageRankVertex;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestOffHeapVerticesInfo {
+
+ @Test
+ public void testOffHeapVerticesInfoLifeCycle() throws Exception {
+ OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> info = new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
+ Configuration conf = new Configuration();
+ conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName());
+ conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR,
+ NullWritable.class.getName());
+ conf.set(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class.getName());
+ conf.set(GraphJob.VERTEX_VALUE_CLASS_ATTR, DoubleWritable.class.getName());
+ GraphJobRunner.<Text, NullWritable, DoubleWritable> initClasses(conf);
+ TaskAttemptID attempt = new TaskAttemptID("omg", 1, 1, 0);
+ try {
+ ArrayList<PageRankVertex> list = new ArrayList<PageRankVertex>();
+
+ for (int i = 0; i < 10; i++) {
+ PageRankVertex v = new PageRankVertex();
+ v.setVertexID(new Text(i + ""));
+ if (i % 2 == 0) {
+ v.setValue(new DoubleWritable(i * 2));
+ }
+ v.addEdge(new Edge<Text, NullWritable>(new Text((10 - i) + ""), null));
+
+ list.add(v);
+ }
+
+ info.init(null, conf, attempt);
+ for (PageRankVertex v : list) {
+ info.addVertex(v);
+ }
+
+ info.finishAdditions();
+
+ assertEquals(10, info.size());
+ // no we want to iterate and check if the result can properly be obtained
+
+ int index = 0;
+ IDSkippingIterator<Text, NullWritable, DoubleWritable> iterator = info
+ .skippingIterator();
+ while (iterator.hasNext()) {
+ Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+ PageRankVertex pageRankVertex = list.get(index);
+ assertEquals(pageRankVertex.getVertexID().toString(), next
+ .getVertexID().toString());
+ if (index % 2 == 0) {
+ assertEquals((int) next.getValue().get(), index * 2);
+ } else {
+ assertNull(next.getValue());
+ }
+ assertEquals(next.isHalted(), false);
+ // check edges
+ List<Edge<Text, NullWritable>> edges = next.getEdges();
+ assertEquals(1, edges.size());
+ Edge<Text, NullWritable> edge = edges.get(0);
+ assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
+ .toString(), edge.getDestinationVertexID().toString());
+ assertNull(edge.getValue());
+
+ index++;
+ }
+ assertEquals(index, list.size());
+ info.finishSuperstep();
+ // iterate again and compute so vertices change internally
+ iterator = info.skippingIterator();
+ info.startSuperstep();
+ while (iterator.hasNext()) {
+ Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+ // override everything with constant 2
+ next.setValue(new DoubleWritable(2));
+ if (Integer.parseInt(next.getVertexID().toString()) == 3) {
+ next.voteToHalt();
+ }
+ info.finishVertexComputation(next);
+ }
+ info.finishSuperstep();
+
+ index = 0;
+ // now reread
+ info.startSuperstep();
+ iterator = info.skippingIterator();
+ while (iterator.hasNext()) {
+ Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+ PageRankVertex pageRankVertex = list.get(index);
+ assertEquals(pageRankVertex.getVertexID().toString(), next
+ .getVertexID().toString());
+ assertEquals((int) next.getValue().get(), 2);
+ // check edges
+ List<Edge<Text, NullWritable>> edges = next.getEdges();
+ assertEquals(1, edges.size());
+ Edge<Text, NullWritable> edge = edges.get(0);
+ assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
+ .toString(), edge.getDestinationVertexID().toString());
+ assertNull(edge.getValue());
+ if (index == 3) {
+ assertEquals(true, next.isHalted());
+ }
+
+ index++;
+ }
+ assertEquals(index, list.size());
+
+ } finally {
+ info.cleanup(conf, attempt);
+ }
+
+ }
+
+ @Test
+ public void testAdditionWithDefaults() throws Exception {
+ OffHeapVerticesInfo<DoubleWritable, DoubleWritable, DoubleWritable> verticesInfo =
+ new OffHeapVerticesInfo<DoubleWritable, DoubleWritable, DoubleWritable>();
+ Configuration conf = new Configuration();
+ verticesInfo.init(null, conf, null);
+ Vertex<DoubleWritable, DoubleWritable, DoubleWritable> vertex = creteDoubleVertex(1d);
+ assertNotNull(vertex.getVertexID());
+ verticesInfo.addVertex(vertex);
+ assertTrue("added vertex could not be found in the cache", verticesInfo.skippingIterator().hasNext());
+ }
+
+ private Vertex<DoubleWritable, DoubleWritable, DoubleWritable> creteDoubleVertex(final Double id) {
+ return new Vertex<DoubleWritable, DoubleWritable, DoubleWritable>() {
+
+ @Override
+ public DoubleWritable getVertexID() {
+ return new DoubleWritable(id);
+ }
+
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ }
+
+ @Override
+ public void readState(DataInput in) throws IOException {
+ }
+
+ @Override
+ public void writeState(DataOutput out) throws IOException {
+ }
+ };
+ }
+}
Propchange: hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
------------------------------------------------------------------------------
svn:eol-style = native