You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2013/02/26 11:52:53 UTC

svn commit: r1450126 - in /hama/branches/hama-732/graph: pom.xml src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java

Author: tommaso
Date: Tue Feb 26 10:52:52 2013
New Revision: 1450126

URL: http://svn.apache.org/r1450126
Log:
HAMA-732 - readded sketch impl of OffHeapVerticesInfo

Added:
    hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java   (with props)
    hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java   (with props)
Modified:
    hama/branches/hama-732/graph/pom.xml

Modified: hama/branches/hama-732/graph/pom.xml
URL: http://svn.apache.org/viewvc/hama/branches/hama-732/graph/pom.xml?rev=1450126&r1=1450125&r2=1450126&view=diff
==============================================================================
--- hama/branches/hama-732/graph/pom.xml (original)
+++ hama/branches/hama-732/graph/pom.xml Tue Feb 26 10:52:52 2013
@@ -43,6 +43,16 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.directmemory</groupId>
+      <artifactId>directmemory-cache</artifactId>
+      <version>0.2-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.directmemory</groupId>
+      <artifactId>directmemory-protostuff</artifactId>
+      <version>0.2-SNAPSHOT</version>
+    </dependency>
   </dependencies>
   <build>
     <finalName>hama-graph-${project.version}</finalName>

Added: hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1450126&view=auto
==============================================================================
--- hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (added)
+++ hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java Tue Feb 26 10:52:52 2013
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.directmemory.DirectMemory;
+import org.apache.directmemory.cache.CacheService;
+import org.apache.directmemory.utils.CacheValuesIterable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * An off heap version of a {@link Vertex} storage.
+ */
+public class OffHeapVerticesInfo<V extends WritableComparable, E extends Writable, M extends Writable>
+        implements VerticesInfo<V, E, M> {
+
+    public static final String DM_STRICT_ITERATOR = "dm.iterator.strict";
+    public static final String DM_BUFFERS = "dm.buffers";
+    public static final String DM_SIZE = "dm.size";
+    public static final String DM_CAPACITY = "dm.capacity";
+    public static final String DM_CONCURRENCY = "dm.concurrency";
+    public static final String DM_DISPOSAL_TIME = "dm.disposal.time";
+    public static final String DM_SERIALIZER = "dm.serializer";
+
+    private CacheService<V, Vertex<V, E, M>> vertices;
+
+    private boolean strict;
+
+    @Override
+    public void init(GraphJobRunner<V, E, M> runner, Configuration conf, TaskAttemptID attempt) throws IOException {
+        this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true);
+        this.vertices = new DirectMemory<V, Vertex<V, E, M>>()
+                .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 10))
+                .setSize(conf.getInt(DM_SIZE, 102400))
+                .setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
+                .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10))
+                .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 360000))
+                .newCacheService();
+    }
+
+    @Override
+    public void cleanup(Configuration conf, TaskAttemptID attempt) throws IOException {
+    }
+
+    public void addVertex(Vertex<V, E, M> vertex) {
+        vertices.put(vertex.getVertexID(), vertex);
+    }
+
+    @Override
+    public void finishAdditions() {
+    }
+
+    @Override
+    public void startSuperstep() throws IOException {
+    }
+
+    @Override
+    public void finishSuperstep() throws IOException {
+    }
+
+    @Override
+    public void finishVertexComputation(Vertex<V, E, M> vertex) throws IOException {
+    }
+
+    @Override
+    public boolean isFinishedAdditions() {
+        return false;
+    }
+
+    public void clear() {
+        vertices.clear();
+    }
+
+    public int size() {
+        return (int) this.vertices.entries();
+    }
+
+    @Override
+    public IDSkippingIterator<V, E, M> skippingIterator() {
+        final Iterator<Vertex<V, E, M>> vertexIterator =
+                new CacheValuesIterable<V, Vertex<V, E, M>>(vertices, strict).iterator();
+        return new IDSkippingIterator<V, E, M>() {
+            @Override
+            public boolean hasNext(V e, Strategy strat) {
+                return vertexIterator.hasNext();
+            }
+
+            @Override
+            public Vertex<V, E, M> next() {
+                return vertexIterator.next();
+            }
+        };
+    }
+
+}

Propchange: hama/branches/hama-732/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java?rev=1450126&view=auto
==============================================================================
--- hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java (added)
+++ hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java Tue Feb 26 10:52:52 2013
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.graph.example.PageRank.PageRankVertex;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestOffHeapVerticesInfo {
+
+  @Test
+  public void testOffHeapVerticesInfoLifeCycle() throws Exception {
+    OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> info = new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
+    Configuration conf = new Configuration();
+    conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName());
+    conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR,
+        NullWritable.class.getName());
+    conf.set(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class.getName());
+    conf.set(GraphJob.VERTEX_VALUE_CLASS_ATTR, DoubleWritable.class.getName());
+    GraphJobRunner.<Text, NullWritable, DoubleWritable> initClasses(conf);
+    TaskAttemptID attempt = new TaskAttemptID("omg", 1, 1, 0);
+    try {
+      ArrayList<PageRankVertex> list = new ArrayList<PageRankVertex>();
+
+      for (int i = 0; i < 10; i++) {
+        PageRankVertex v = new PageRankVertex();
+        v.setVertexID(new Text(i + ""));
+        if (i % 2 == 0) {
+          v.setValue(new DoubleWritable(i * 2));
+        }
+        v.addEdge(new Edge<Text, NullWritable>(new Text((10 - i) + ""), null));
+
+        list.add(v);
+      }
+
+      info.init(null, conf, attempt);
+      for (PageRankVertex v : list) {
+        info.addVertex(v);
+      }
+
+      info.finishAdditions();
+
+      assertEquals(10, info.size());
+      // no we want to iterate and check if the result can properly be obtained
+
+      int index = 0;
+      IDSkippingIterator<Text, NullWritable, DoubleWritable> iterator = info
+          .skippingIterator();
+      while (iterator.hasNext()) {
+        Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+        PageRankVertex pageRankVertex = list.get(index);
+        assertEquals(pageRankVertex.getVertexID().toString(), next
+            .getVertexID().toString());
+        if (index % 2 == 0) {
+          assertEquals((int) next.getValue().get(), index * 2);
+        } else {
+          assertNull(next.getValue());
+        }
+        assertEquals(next.isHalted(), false);
+        // check edges
+        List<Edge<Text, NullWritable>> edges = next.getEdges();
+        assertEquals(1, edges.size());
+        Edge<Text, NullWritable> edge = edges.get(0);
+        assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
+            .toString(), edge.getDestinationVertexID().toString());
+        assertNull(edge.getValue());
+
+        index++;
+      }
+      assertEquals(index, list.size());
+      info.finishSuperstep();
+      // iterate again and compute so vertices change internally
+      iterator = info.skippingIterator();
+      info.startSuperstep();
+      while (iterator.hasNext()) {
+        Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+        // override everything with constant 2
+        next.setValue(new DoubleWritable(2));
+        if (Integer.parseInt(next.getVertexID().toString()) == 3) {
+          next.voteToHalt();
+        }
+        info.finishVertexComputation(next);
+      }
+      info.finishSuperstep();
+
+      index = 0;
+      // now reread
+      info.startSuperstep();
+      iterator = info.skippingIterator();
+      while (iterator.hasNext()) {
+        Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+        PageRankVertex pageRankVertex = list.get(index);
+        assertEquals(pageRankVertex.getVertexID().toString(), next
+            .getVertexID().toString());
+        assertEquals((int) next.getValue().get(), 2);
+        // check edges
+        List<Edge<Text, NullWritable>> edges = next.getEdges();
+        assertEquals(1, edges.size());
+        Edge<Text, NullWritable> edge = edges.get(0);
+        assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
+            .toString(), edge.getDestinationVertexID().toString());
+        assertNull(edge.getValue());
+        if (index == 3) {
+          assertEquals(true, next.isHalted());
+        }
+
+        index++;
+      }
+      assertEquals(index, list.size());
+
+    } finally {
+      info.cleanup(conf, attempt);
+    }
+
+  }
+
+  @Test
+  public void testAdditionWithDefaults() throws Exception {
+    OffHeapVerticesInfo<DoubleWritable, DoubleWritable, DoubleWritable> verticesInfo =
+            new OffHeapVerticesInfo<DoubleWritable, DoubleWritable, DoubleWritable>();
+    Configuration conf = new Configuration();
+    verticesInfo.init(null, conf, null);
+    Vertex<DoubleWritable, DoubleWritable, DoubleWritable> vertex = creteDoubleVertex(1d);
+    assertNotNull(vertex.getVertexID());
+    verticesInfo.addVertex(vertex);
+    assertTrue("added vertex could not be found in the cache", verticesInfo.skippingIterator().hasNext());
+  }
+
+  private Vertex<DoubleWritable, DoubleWritable, DoubleWritable> creteDoubleVertex(final Double id) {
+    return new Vertex<DoubleWritable, DoubleWritable, DoubleWritable>() {
+
+      @Override
+      public DoubleWritable getVertexID() {
+        return new DoubleWritable(id);
+      }
+
+      @Override
+      public void compute(Iterable<DoubleWritable> messages) throws IOException {
+      }
+
+      @Override
+      public void readState(DataInput in) throws IOException {
+      }
+
+      @Override
+      public void writeState(DataOutput out) throws IOException {
+      }
+    };
+  }
+}

Propchange: hama/branches/hama-732/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native