You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2016/09/29 23:54:29 UTC
git commit: updated refs/heads/trunk to 8bf08f5
Repository: giraph
Updated Branches:
refs/heads/trunk 06de6c48a -> 8bf08f545
[GIRAPH-1117] Provide a flexible way to decide whether to create vertex when it is not present in the input
Test Plan: run hello pagerank with this feature on and off
Reviewers: majakabiljo, maja.kabiljo, dionysis.logothetis
Reviewed By: dionysis.logothetis
Differential Revision: https://reviews.facebook.net/D64485
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8bf08f54
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8bf08f54
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8bf08f54
Branch: refs/heads/trunk
Commit: 8bf08f545d15b39e43874a2b1ac5b0586a917a4f
Parents: 06de6c4
Author: Sergey Edunov <ed...@fb.com>
Authored: Thu Sep 29 16:54:18 2016 -0700
Committer: Sergey Edunov <ed...@fb.com>
Committed: Thu Sep 29 16:54:18 2016 -0700
----------------------------------------------------------------------
.../org/apache/giraph/conf/GiraphConstants.java | 14 +++++
.../apache/giraph/edge/AbstractEdgeStore.java | 13 +++-
.../giraph/edge/CreateSourceVertexCallback.java | 42 +++++++++++++
.../edge/DefaultCreateSourceVertexCallback.java | 50 +++++++++++++++
.../java/org/apache/giraph/utils/TestGraph.java | 33 +++++-----
.../giraph/io/TestCreateSourceVertex.java | 65 ++++++++++++++++++++
6 files changed, 199 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index b384261..437d08a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -27,6 +27,8 @@ import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.DefaultCreateSourceVertexCallback;
+import org.apache.giraph.edge.CreateSourceVertexCallback;
import org.apache.giraph.edge.EdgeStoreFactory;
import org.apache.giraph.edge.InMemoryEdgeStoreFactory;
import org.apache.giraph.edge.OutEdges;
@@ -1118,6 +1120,18 @@ public interface GiraphConstants {
"necessarily in vertex input");
/**
+ * Defines a call back that can be used to make decisions on
+ * whether the vertex should be created or not in the runtime.
+ */
+ ClassConfOption<CreateSourceVertexCallback>
+ CREATE_EDGE_SOURCE_VERTICES_CALLBACK =
+ ClassConfOption.create("giraph.createEdgeSourceVerticesCallback",
+ DefaultCreateSourceVertexCallback.class,
+ CreateSourceVertexCallback.class,
+ "Decide whether we should create a source vertex when id is " +
+ "present in the edge input but not in vertex input");
+
+ /**
* This counter group will contain one counter whose name is the ZooKeeper
* server:port which this job is using
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index 104cae2..d2e7e8d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -21,6 +21,7 @@ package org.apache.giraph.edge;
import com.google.common.collect.MapMaker;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.ooc.OutOfCoreEngine;
@@ -81,6 +82,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
protected boolean useInputOutEdges;
/** Whether we spilled edges on disk */
private boolean hasEdgesOnDisk = false;
+ /** Create source vertices */
+ private CreateSourceVertexCallback<I> createSourceVertexCallback;
/**
* Constructor.
@@ -100,6 +103,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
configuration.getNettyServerExecutionConcurrency()).makeMap();
reuseEdgeObjects = configuration.reuseEdgeObjects();
useInputOutEdges = configuration.useInputOutEdges();
+ createSourceVertexCallback =
+ GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
+ .newInstance(configuration);
}
/**
@@ -247,7 +253,6 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
@Override
public void moveEdgesToVertices() {
- final boolean createSourceVertex = configuration.getCreateSourceVertex();
if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
if (LOG.isInfoEnabled()) {
LOG.info("moveEdgesToVertices: No edges to move");
@@ -256,7 +261,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
}
if (LOG.isInfoEnabled()) {
- LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
+ LOG.info("moveEdgesToVertices: Moving incoming edges to " +
+ "vertices. Using " + createSourceVertexCallback);
}
service.getPartitionStore().startIteration();
@@ -307,7 +313,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
// If the source vertex doesn't exist, create it. Otherwise,
// just set the edges.
if (vertex == null) {
- if (createSourceVertex) {
+ if (createSourceVertexCallback
+ .shouldCreateSourceVertex(vertexId)) {
// createVertex only if it is allowed by configuration
vertex = configuration.createVertex();
vertex.initialize(createVertexId(entry),
http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java b/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java
new file mode 100644
index 0000000..9d6ed1b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/CreateSourceVertexCallback.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Implementations of this interface can decide whether
+ * we should create a vertex when it is not present in vertex input
+ * but exists in edge input.
+ *
+ * @param <I> vertex id
+ */
+public interface CreateSourceVertexCallback<I extends Writable>
+ extends GiraphConfigurationSettable {
+
+ /**
+ * Should we create a vertex that doesn't exist in vertex input
+ * but only exists in edge input
+ * @param vertexId the id of vertex to be created
+ * @return true if we should create a vertex
+ */
+ boolean shouldCreateSourceVertex(I vertexId);
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java
new file mode 100644
index 0000000..19ed598
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultCreateSourceVertexCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Default implementation of vertex creation decision maker.
+ * By default you can either create all vertices or not create
+ * implicit vertices at all.
+ *
+ * @param <I> Vertex id
+ */
+public class DefaultCreateSourceVertexCallback<I extends Writable>
+ implements CreateSourceVertexCallback<I> {
+ /**
+ * True if giraph has to create even vertices that only exist
+ * in edge input
+ */
+ private boolean shouldCreateVertices;
+
+ @Override
+ public boolean shouldCreateSourceVertex(I vertexId) {
+ return shouldCreateVertices;
+ }
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+ shouldCreateVertices =
+ GiraphConstants.CREATE_EDGE_SOURCE_VERTICES.get(configuration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
index 46f7b48..363d4c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
@@ -24,7 +24,9 @@ import java.util.List;
import java.util.Map.Entry;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.CreateSourceVertexCallback;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
@@ -54,6 +56,8 @@ public class TestGraph<I extends WritableComparable,
protected Basic2ObjectMap<I, Vertex<I, V, E>> vertices;
/** The configuration */
protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
+ /** Callback that makes a decision on whether vertex should be created */
+ private CreateSourceVertexCallback<I> createSourceVertexCallback;
/**
* Constructor requiring classes
@@ -62,6 +66,9 @@ public class TestGraph<I extends WritableComparable,
*/
public TestGraph(GiraphConfiguration conf) {
this.conf = new ImmutableClassesGiraphConfiguration<>(conf);
+ createSourceVertexCallback =
+ GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
+ .newInstance(this.conf);
vertexValueCombiner = this.conf.createVertexValueCombiner();
vertices = BasicCollectionsUtils.create2ObjectMap(
this.conf.getVertexIdClass()
@@ -147,21 +154,13 @@ public class TestGraph<I extends WritableComparable,
/**
* Add an edge to an existing vertex
- *
+ *`
* @param vertexId Edge origin
* @param edgePair The edge
* @return this
*/
public TestGraph<I, V, E> addEdge(I vertexId, Entry<I, E> edgePair) {
- if (!vertices.containsKey(vertexId)) {
- Vertex<I, V, E> v = conf.createVertex();
- v.initialize(vertexId, conf.createVertexValue());
- vertices.put(vertexId, v);
- }
- vertices.get(vertexId)
- .addEdge(EdgeFactory.create(edgePair.getKey(),
- edgePair.getValue()));
- return this;
+ return addEdge(vertexId, edgePair.getKey(), edgePair.getValue());
}
/**
@@ -174,12 +173,16 @@ public class TestGraph<I extends WritableComparable,
*/
public TestGraph<I, V, E> addEdge(I vertexId, I toVertex, E edgeValue) {
if (!vertices.containsKey(vertexId)) {
- Vertex<I, V, E> v = conf.createVertex();
- v.initialize(vertexId, conf.createVertexValue());
- vertices.put(vertexId, v);
+ if (createSourceVertexCallback.shouldCreateSourceVertex(vertexId)) {
+ Vertex<I, V, E> v = conf.createVertex();
+ v.initialize(vertexId, conf.createVertexValue());
+ vertices.put(vertexId, v);
+ }
+ }
+ Vertex<I, V, E> v = vertices.get(vertexId);
+ if (v != null) {
+ v.addEdge(EdgeFactory.create(toVertex, edgeValue));
}
- vertices.get(vertexId)
- .addEdge(EdgeFactory.create(toVertex, edgeValue));
return this;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8bf08f54/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java b/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java
index 039e975..35c4390 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestCreateSourceVertex.java
@@ -20,13 +20,16 @@ package org.apache.giraph.io;
import com.google.common.collect.Maps;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.DefaultCreateSourceVertexCallback;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
import org.apache.giraph.utils.ComputationCountEdges;
import org.apache.giraph.utils.IntIntNullNoOpComputation;
import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.IntWritable;
import org.junit.Test;
import java.util.Map;
@@ -133,6 +136,68 @@ public class TestCreateSourceVertex {
assertEquals(1, (int) values.get(7));
}
+ @Test
+ public void testCustomCreateSourceVertex() throws Exception {
+ String [] vertices = new String[] {
+ "1 0",
+ "2 0",
+ "3 0",
+ "4 0",
+ };
+ String [] edges = new String[] {
+ "1 2",
+ "1 5",
+ "2 4",
+ "2 1",
+ "3 4",
+ "4 1",
+ "4 5",
+ "6 2",
+ "7 8",
+ "4 8",
+ };
+
+ GiraphConfiguration conf = getConf();
+ GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK.set(conf,
+ CreateEvenSourceVerticesCallback.class);
+
+ Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
+ Map<Integer, Integer> values = parseResults(results);
+
+ // Check that only vertices from vertex input are present in output graph
+ assertEquals(5, values.size());
+ // Check that the ids of vertices in output graph exactly match vertex input
+ assertTrue(values.containsKey(1));
+ assertTrue(values.containsKey(2));
+ assertTrue(values.containsKey(3));
+ assertTrue(values.containsKey(4));
+ assertTrue(values.containsKey(6));
+
+ conf.setComputationClass(ComputationCountEdges.class);
+ results = InternalVertexRunner.run(conf, vertices, edges);
+ values = parseResults(results);
+
+ // Check the number of edges of each vertex
+ assertEquals(2, (int) values.get(1));
+ assertEquals(2, (int) values.get(2));
+ assertEquals(1, (int) values.get(3));
+ assertEquals(3, (int) values.get(4));
+ assertEquals(1, (int) values.get(6));
+ }
+
+ /**
+ * Only allows to create vertices with even ids.
+ */
+ public static class CreateEvenSourceVerticesCallback extends
+ DefaultCreateSourceVertexCallback<IntWritable> {
+
+ @Override
+ public boolean shouldCreateSourceVertex(IntWritable vertexId) {
+ return vertexId.get() % 2 == 0;
+ }
+ }
+
+
private GiraphConfiguration getConf() {
GiraphConfiguration conf = new GiraphConfiguration();
conf.setComputationClass(IntIntNullNoOpComputation.class);