You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2018/01/19 17:13:02 UTC
git commit: updated refs/heads/trunk to f944f5c
Repository: giraph
Updated Branches:
refs/heads/trunk 8e2df4f8c -> f944f5cc3
GIRAPH-1167
closes #56
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f944f5cc
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f944f5cc
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f944f5cc
Branch: refs/heads/trunk
Commit: f944f5cc3dd7874dac049e0c8c4d7212df781452
Parents: 8e2df4f
Author: Dionysios Logothetis <dl...@gmail.com>
Authored: Fri Jan 12 21:00:55 2018 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Jan 19 09:12:03 2018 -0800
----------------------------------------------------------------------
.../giraph/edge/LongByteHashMapEdges.java | 217 +++++++++++++++++++
.../giraph/edge/LongByteHashMapEdgesTest.java | 164 ++++++++++++++
2 files changed, 381 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/f944f5cc/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java
new file mode 100644
index 0000000..fd5f278
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongByteHashMapEdges.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import it.unimi.dsi.fastutil.longs.Long2ByteMap;
+import it.unimi.dsi.fastutil.longs.Long2ByteOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.utils.EdgeIterables;
+import org.apache.giraph.utils.Trimmable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link OutEdges} implementation with long ids and byte edge values,
+ * backed by a {@link Long2ByteOpenHashMap}.
+ * Parallel edges are not allowed.
+ * Note: this implementation is optimized for fast random access and mutations,
+ * and uses less space than a generic {@link HashMapEdges}.
+ */
+public class LongByteHashMapEdges
+ implements StrictRandomAccessOutEdges<LongWritable, ByteWritable>,
+ ReuseObjectsOutEdges<LongWritable, ByteWritable>,
+ MutableOutEdges<LongWritable, ByteWritable>, Trimmable {
+ /** Hash map from target vertex id to edge value. */
+ private Long2ByteOpenHashMap edgeMap;
+ /** Representative edge value object, used by getEdgeValue(). */
+ private ByteWritable representativeEdgeValue;
+
+ @Override
+ public void initialize(Iterable<Edge<LongWritable, ByteWritable>> edges) {
+ EdgeIterables.initialize(this, edges);
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ edgeMap = new Long2ByteOpenHashMap(capacity);
+ }
+
+ @Override
+ public void initialize() {
+ edgeMap = new Long2ByteOpenHashMap();
+ }
+
+ @Override
+ public void add(Edge<LongWritable, ByteWritable> edge) {
+ edgeMap.put(edge.getTargetVertexId().get(), edge.getValue().get());
+ }
+
+ @Override
+ public void remove(LongWritable targetVertexId) {
+ edgeMap.remove(targetVertexId.get());
+ }
+
+ @Override
+ public ByteWritable getEdgeValue(LongWritable targetVertexId) {
+ if (!edgeMap.containsKey(targetVertexId.get())) {
+ return null;
+ }
+ if (representativeEdgeValue == null) {
+ representativeEdgeValue = new ByteWritable();
+ }
+ representativeEdgeValue.set(edgeMap.get(targetVertexId.get()));
+ return representativeEdgeValue;
+ }
+
+ @Override
+ public void setEdgeValue(LongWritable targetVertexId,
+ ByteWritable edgeValue) {
+ if (edgeMap.containsKey(targetVertexId.get())) {
+ edgeMap.put(targetVertexId.get(), edgeValue.get());
+ }
+ }
+
+ @Override
+ public int size() {
+ return edgeMap.size();
+ }
+
+ @Override
+ public Iterator<Edge<LongWritable, ByteWritable>> iterator() {
+ // Returns an iterator that reuses objects.
+ return new UnmodifiableIterator<Edge<LongWritable, ByteWritable>>() {
+ /** Wrapped map iterator. */
+ private final ObjectIterator<Long2ByteMap.Entry> mapIterator =
+ edgeMap.long2ByteEntrySet().fastIterator();
+ /** Representative edge object. */
+ private final ReusableEdge<LongWritable, ByteWritable>
+ representativeEdge =
+ EdgeFactory.createReusable(new LongWritable(), new ByteWritable());
+
+ @Override
+ public boolean hasNext() {
+ return mapIterator.hasNext();
+ }
+
+ @Override
+ public Edge<LongWritable, ByteWritable> next() {
+ Long2ByteMap.Entry nextEntry = mapIterator.next();
+ representativeEdge.getTargetVertexId().set(nextEntry.getLongKey());
+ representativeEdge.getValue().set(nextEntry.getByteValue());
+ return representativeEdge;
+ }
+ };
+ }
+
+ @Override
+ public void trim() {
+ edgeMap.trim();
+ }
+
+ /** Helper class for a mutable edge that modifies the backing map entry. */
+ private static class LongByteHashMapMutableEdge
+ extends DefaultEdge<LongWritable, ByteWritable> {
+ /** Backing entry for the edge in the map. */
+ private Long2ByteMap.Entry entry;
+
+ /** Constructor. */
+ public LongByteHashMapMutableEdge() {
+ super(new LongWritable(), new ByteWritable());
+ }
+
+ /**
+ * Make the edge point to the given entry in the backing map.
+ *
+ * @param entry Backing entry
+ */
+ public void setEntry(Long2ByteMap.Entry entry) {
+ // Update the id and value objects from the superclass.
+ getTargetVertexId().set(entry.getLongKey());
+ getValue().set(entry.getByteValue());
+ // Update the entry.
+ this.entry = entry;
+ }
+
+ @Override
+ public void setValue(ByteWritable value) {
+ // Update the value object from the superclass.
+ getValue().set(value.get());
+ // Update the value stored in the backing map.
+ entry.setValue(value.get());
+ }
+ }
+
+ @Override
+ public Iterator<MutableEdge<LongWritable, ByteWritable>> mutableIterator() {
+ return new Iterator<MutableEdge<LongWritable, ByteWritable>>() {
+ /**
+ * Wrapped map iterator.
+ * Note: we cannot use the fast iterator in this case,
+ * because we need to call setValue() on an entry.
+ */
+ private final ObjectIterator<Long2ByteMap.Entry> mapIterator =
+ edgeMap.long2ByteEntrySet().iterator();
+ /** Representative edge object. */
+ private final LongByteHashMapMutableEdge representativeEdge =
+ new LongByteHashMapMutableEdge();
+
+ @Override
+ public boolean hasNext() {
+ return mapIterator.hasNext();
+ }
+
+ @Override
+ public MutableEdge<LongWritable, ByteWritable> next() {
+ representativeEdge.setEntry(mapIterator.next());
+ return representativeEdge;
+ }
+
+ @Override
+ public void remove() {
+ mapIterator.remove();
+ }
+ };
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(edgeMap.size());
+ for (Long2ByteMap.Entry entry : edgeMap.long2ByteEntrySet()) {
+ out.writeLong(entry.getLongKey());
+ out.writeByte(entry.getByteValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numEdges = in.readInt();
+ initialize(numEdges);
+ for (int i = 0; i < numEdges; ++i) {
+ edgeMap.put(in.readLong(), in.readByte());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f944f5cc/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java b/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java
new file mode 100644
index 0000000..d4de07d
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/LongByteHashMapEdgesTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+public class LongByteHashMapEdgesTest {
+ private static Edge<LongWritable, ByteWritable> createEdge(long id, byte value) {
+ return EdgeFactory.create(new LongWritable(id), new ByteWritable(value));
+ }
+
+ private static void assertEdges(LongByteHashMapEdges edges, long[] expectedIds,
+ byte[] expectedValues) {
+ Assert.assertEquals(expectedIds.length, edges.size());
+ for (int i = 0; i< expectedIds.length; i++) {
+ ByteWritable value = edges.getEdgeValue(new LongWritable(expectedIds[i]));
+ assertNotNull(value);
+ assertEquals(expectedValues[i], value.get());
+ }
+ }
+
+ @Test
+ public void testEdges() {
+ LongByteHashMapEdges edges = new LongByteHashMapEdges();
+
+ List<Edge<LongWritable, ByteWritable>> initialEdges = Lists.newArrayList(
+ createEdge(1, (byte) 99), createEdge(2, (byte) 77), createEdge(4, (byte) 66));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, new long[]{1, 2, 4}, new byte[]{99, 77, 66});
+
+ edges.add(EdgeFactory.createReusable(new LongWritable(3), new ByteWritable((byte) 55)));
+ assertEdges(edges, new long[]{1, 2, 3, 4}, new byte[]{99, 77, 55, 66});
+
+ edges.remove(new LongWritable(2));
+ assertEdges(edges, new long[]{1, 3, 4}, new byte[]{99, 55, 66});
+ }
+
+ @Test
+ public void testMutateEdges() {
+ LongByteHashMapEdges edges = new LongByteHashMapEdges();
+
+ edges.initialize();
+
+ // Add 10 edges with id and value set to i, for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i, (byte) i));
+ }
+
+ // Use the mutable iterator to remove edges with even id
+ Iterator<MutableEdge<LongWritable, ByteWritable>> edgeIt =
+ edges.mutableIterator();
+ while (edgeIt.hasNext()) {
+ if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+ edgeIt.remove();
+ }
+ }
+
+ // We should now have 5 edges
+ assertEquals(5, edges.size());
+ // The edge ids should be all odd
+ for (Edge<LongWritable, ByteWritable> edge : edges) {
+ assertEquals(1, edge.getTargetVertexId().get() % 2);
+ assertEquals(1, edge.getValue().get() % 2);
+ }
+ }
+
+ @Test
+ public void testSerialization() throws IOException {
+ LongByteHashMapEdges edges = new LongByteHashMapEdges();
+
+ edges.initialize();
+
+ // Add 10 edges with id and value set to i, for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i, (byte) i));
+ }
+
+ edges.trim();
+
+ // Use the mutable iterator to remove edges with even id
+ Iterator<MutableEdge<LongWritable, ByteWritable>> edgeIt =
+ edges.mutableIterator();
+ while (edgeIt.hasNext()) {
+ if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+ edgeIt.remove();
+ }
+ }
+
+ // We should now have 5 edges
+ assertEdges(edges, new long[]{1, 3, 5, 7, 9}, new byte[]{1, 3, 5, 7, 9});
+
+ ExtendedDataOutput tempBuffer = new UnsafeByteArrayOutputStream();
+
+ edges.write(tempBuffer);
+
+ DataInput input = new UnsafeByteArrayInputStream(
+ tempBuffer.getByteArray(), 0, tempBuffer.getPos());
+
+ edges = new LongByteHashMapEdges();
+ edges.readFields(input);
+
+ assertEquals(5, edges.size());
+
+ for (Edge<LongWritable, ByteWritable> edge : edges) {
+ assertEquals(1, edge.getTargetVertexId().get() % 2);
+ assertEquals(1, edge.getValue().get() % 2);
+ }
+ }
+
+ /**
+ * This implementation does not allow parallel edges.
+ */
+ @Test
+ public void testParallelEdges() {
+ LongByteHashMapEdges edges = new LongByteHashMapEdges();
+
+ List<Edge<LongWritable, ByteWritable>> initialEdges = Lists.newArrayList(
+ createEdge(2, (byte) 1), createEdge(2, (byte) 2), createEdge(2, (byte) 3));
+
+ edges.initialize(initialEdges);
+ assertEquals(1, edges.size());
+
+ edges.remove(new LongWritable(2));
+ assertEquals(0, edges.size());
+
+ edges.add(EdgeFactory.create(new LongWritable(2), new ByteWritable((byte) 4)));
+ assertEquals(1, edges.size());
+
+ edges.trim();
+ assertEquals(1, edges.size());
+ }
+}