You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2018/01/19 17:13:02 UTC

git commit: updated refs/heads/trunk to f944f5c

Repository: giraph
Updated Branches:
  refs/heads/trunk 8e2df4f8c -> f944f5cc3


GIRAPH-1167

closes #56


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f944f5cc
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f944f5cc
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f944f5cc

Branch: refs/heads/trunk
Commit: f944f5cc3dd7874dac049e0c8c4d7212df781452
Parents: 8e2df4f
Author: Dionysios Logothetis <dl...@gmail.com>
Authored: Fri Jan 12 21:00:55 2018 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Jan 19 09:12:03 2018 -0800

----------------------------------------------------------------------
 .../giraph/edge/LongByteHashMapEdges.java       | 217 +++++++++++++++++++
 .../giraph/edge/LongByteHashMapEdgesTest.java   | 164 ++++++++++++++
 2 files changed, 381 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f944f5cc/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java
new file mode 100644
index 0000000..fd5f278
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import it.unimi.dsi.fastutil.longs.Long2ByteMap;
+import it.unimi.dsi.fastutil.longs.Long2ByteOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.utils.EdgeIterables;
+import org.apache.giraph.utils.Trimmable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link OutEdges} implementation with long ids and byte edge values,
+ * backed by a {@link Long2ByteOpenHashMap}.
+ * Parallel edges are not allowed.
+ * Note: this implementation is optimized for fast random access and mutations,
+ * and uses less space than a generic {@link HashMapEdges}.
+ */
+public class LongByteHashMapEdges
+    implements StrictRandomAccessOutEdges<LongWritable, ByteWritable>,
+    ReuseObjectsOutEdges<LongWritable, ByteWritable>,
+    MutableOutEdges<LongWritable, ByteWritable>, Trimmable {
+  /** Hash map from target vertex id to edge value. */
+  private Long2ByteOpenHashMap edgeMap;
+  /** Representative edge value object, used by getEdgeValue(). */
+  private ByteWritable representativeEdgeValue;
+
+  @Override
+  public void initialize(Iterable<Edge<LongWritable, ByteWritable>> edges) {
+    EdgeIterables.initialize(this, edges);
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    edgeMap = new Long2ByteOpenHashMap(capacity);
+  }
+
+  @Override
+  public void initialize() {
+    edgeMap = new Long2ByteOpenHashMap();
+  }
+
+  @Override
+  public void add(Edge<LongWritable, ByteWritable> edge) {
+    edgeMap.put(edge.getTargetVertexId().get(), edge.getValue().get());
+  }
+
+  @Override
+  public void remove(LongWritable targetVertexId) {
+    edgeMap.remove(targetVertexId.get());
+  }
+
+  @Override
+  public ByteWritable getEdgeValue(LongWritable targetVertexId) {
+    if (!edgeMap.containsKey(targetVertexId.get())) {
+      return null;
+    }
+    if (representativeEdgeValue == null) {
+      representativeEdgeValue = new ByteWritable();
+    }
+    representativeEdgeValue.set(edgeMap.get(targetVertexId.get()));
+    return representativeEdgeValue;
+  }
+
+  @Override
+  public void setEdgeValue(LongWritable targetVertexId,
+                           ByteWritable edgeValue) {
+    if (edgeMap.containsKey(targetVertexId.get())) {
+      edgeMap.put(targetVertexId.get(), edgeValue.get());
+    }
+  }
+
+  @Override
+  public int size() {
+    return edgeMap.size();
+  }
+
+  @Override
+  public Iterator<Edge<LongWritable, ByteWritable>> iterator() {
+    // Returns an iterator that reuses objects.
+    return new UnmodifiableIterator<Edge<LongWritable, ByteWritable>>() {
+      /** Wrapped map iterator. */
+      private final ObjectIterator<Long2ByteMap.Entry> mapIterator =
+          edgeMap.long2ByteEntrySet().fastIterator();
+      /** Representative edge object. */
+      private final ReusableEdge<LongWritable, ByteWritable>
+      representativeEdge =
+          EdgeFactory.createReusable(new LongWritable(), new ByteWritable());
+
+      @Override
+      public boolean hasNext() {
+        return mapIterator.hasNext();
+      }
+
+      @Override
+      public Edge<LongWritable, ByteWritable> next() {
+        Long2ByteMap.Entry nextEntry = mapIterator.next();
+        representativeEdge.getTargetVertexId().set(nextEntry.getLongKey());
+        representativeEdge.getValue().set(nextEntry.getByteValue());
+        return representativeEdge;
+      }
+    };
+  }
+
+  @Override
+  public void trim() {
+    edgeMap.trim();
+  }
+
+  /** Helper class for a mutable edge that modifies the backing map entry. */
+  private static class LongByteHashMapMutableEdge
+      extends DefaultEdge<LongWritable, ByteWritable> {
+    /** Backing entry for the edge in the map. */
+    private Long2ByteMap.Entry entry;
+
+    /** Constructor. */
+    public LongByteHashMapMutableEdge() {
+      super(new LongWritable(), new ByteWritable());
+    }
+
+    /**
+     * Make the edge point to the given entry in the backing map.
+     *
+     * @param entry Backing entry
+     */
+    public void setEntry(Long2ByteMap.Entry entry) {
+      // Update the id and value objects from the superclass.
+      getTargetVertexId().set(entry.getLongKey());
+      getValue().set(entry.getByteValue());
+      // Update the entry.
+      this.entry = entry;
+    }
+
+    @Override
+    public void setValue(ByteWritable value) {
+      // Update the value object from the superclass.
+      getValue().set(value.get());
+      // Update the value stored in the backing map.
+      entry.setValue(value.get());
+    }
+  }
+
+  @Override
+  public Iterator<MutableEdge<LongWritable, ByteWritable>> mutableIterator() {
+    return new Iterator<MutableEdge<LongWritable, ByteWritable>>() {
+      /**
+       * Wrapped map iterator.
+       * Note: we cannot use the fast iterator in this case,
+       * because we need to call setValue() on an entry.
+       */
+      private final ObjectIterator<Long2ByteMap.Entry> mapIterator =
+          edgeMap.long2ByteEntrySet().iterator();
+      /** Representative edge object. */
+      private final LongByteHashMapMutableEdge representativeEdge =
+          new LongByteHashMapMutableEdge();
+
+      @Override
+      public boolean hasNext() {
+        return mapIterator.hasNext();
+      }
+
+      @Override
+      public MutableEdge<LongWritable, ByteWritable> next() {
+        representativeEdge.setEntry(mapIterator.next());
+        return representativeEdge;
+      }
+
+      @Override
+      public void remove() {
+        mapIterator.remove();
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(edgeMap.size());
+    for (Long2ByteMap.Entry entry : edgeMap.long2ByteEntrySet()) {
+      out.writeLong(entry.getLongKey());
+      out.writeByte(entry.getByteValue());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEdges = in.readInt();
+    initialize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      edgeMap.put(in.readLong(), in.readByte());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f944f5cc/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java b/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java
new file mode 100644
index 0000000..d4de07d
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+public class LongByteHashMapEdgesTest {
+  private static Edge<LongWritable, ByteWritable> createEdge(long id, byte value) {
+    return EdgeFactory.create(new LongWritable(id), new ByteWritable(value));
+  }
+
+  private static void assertEdges(LongByteHashMapEdges edges, long[] expectedIds,
+                                  byte[] expectedValues) {
+    Assert.assertEquals(expectedIds.length, edges.size());
+    for (int i = 0; i< expectedIds.length; i++) {
+       ByteWritable value = edges.getEdgeValue(new LongWritable(expectedIds[i]));
+       assertNotNull(value);
+      assertEquals(expectedValues[i], value.get());
+    }
+  }
+
+  @Test
+  public void testEdges() {
+    LongByteHashMapEdges edges = new LongByteHashMapEdges();
+
+    List<Edge<LongWritable, ByteWritable>> initialEdges = Lists.newArrayList(
+      createEdge(1, (byte) 99), createEdge(2, (byte) 77), createEdge(4, (byte) 66));
+
+    edges.initialize(initialEdges);
+    assertEdges(edges, new long[]{1, 2, 4}, new byte[]{99, 77, 66});
+
+    edges.add(EdgeFactory.createReusable(new LongWritable(3), new ByteWritable((byte) 55)));
+    assertEdges(edges, new long[]{1, 2, 3, 4}, new byte[]{99, 77, 55, 66});
+
+    edges.remove(new LongWritable(2));
+    assertEdges(edges, new long[]{1, 3, 4}, new byte[]{99, 55, 66});
+  }
+
+  @Test
+  public void testMutateEdges() {
+    LongByteHashMapEdges edges = new LongByteHashMapEdges();
+
+    edges.initialize();
+
+    // Add 10 edges with id and value set to i, for i = 0..9
+    for (int i = 0; i < 10; ++i) {
+      edges.add(createEdge(i, (byte) i));
+    }
+
+    // Use the mutable iterator to remove edges with even id
+    Iterator<MutableEdge<LongWritable, ByteWritable>> edgeIt =
+        edges.mutableIterator();
+    while (edgeIt.hasNext()) {
+      if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+        edgeIt.remove();
+      }
+    }
+
+    // We should now have 5 edges
+    assertEquals(5, edges.size());
+    // The edge ids should be all odd
+    for (Edge<LongWritable, ByteWritable> edge : edges) {
+      assertEquals(1, edge.getTargetVertexId().get() % 2);
+      assertEquals(1, edge.getValue().get() % 2);
+    }
+  }
+
+  @Test
+  public void testSerialization() throws IOException {
+    LongByteHashMapEdges edges = new LongByteHashMapEdges();
+
+    edges.initialize();
+
+    // Add 10 edges with id and value set to i, for i = 0..9
+    for (int i = 0; i < 10; ++i) {
+      edges.add(createEdge(i, (byte) i));
+    }
+
+    edges.trim();
+
+    // Use the mutable iterator to remove edges with even id
+    Iterator<MutableEdge<LongWritable, ByteWritable>> edgeIt =
+        edges.mutableIterator();
+    while (edgeIt.hasNext()) {
+      if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+        edgeIt.remove();
+      }
+    }
+
+    // We should now have 5 edges
+    assertEdges(edges, new long[]{1, 3, 5, 7, 9}, new byte[]{1, 3, 5, 7, 9});
+
+    ExtendedDataOutput tempBuffer = new UnsafeByteArrayOutputStream();
+
+    edges.write(tempBuffer);
+
+    DataInput input = new UnsafeByteArrayInputStream(
+      tempBuffer.getByteArray(), 0, tempBuffer.getPos());
+
+    edges = new LongByteHashMapEdges();
+    edges.readFields(input);
+
+    assertEquals(5, edges.size());
+
+    for (Edge<LongWritable, ByteWritable> edge : edges) {
+      assertEquals(1, edge.getTargetVertexId().get() % 2);
+      assertEquals(1, edge.getValue().get() % 2);
+    }
+  }
+
+  /**
+   * This implementation does not allow parallel edges.
+   */
+  @Test
+  public void testParallelEdges() {
+    LongByteHashMapEdges edges = new LongByteHashMapEdges();
+
+    List<Edge<LongWritable, ByteWritable>> initialEdges = Lists.newArrayList(
+      createEdge(2, (byte) 1), createEdge(2, (byte) 2), createEdge(2, (byte) 3));
+
+    edges.initialize(initialEdges);
+    assertEquals(1, edges.size());
+
+    edges.remove(new LongWritable(2));
+    assertEquals(0, edges.size());
+
+    edges.add(EdgeFactory.create(new LongWritable(2), new ByteWritable((byte) 4)));
+    assertEquals(1, edges.size());
+
+    edges.trim();
+    assertEquals(1, edges.size());
+  }
+}