You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/02/16 01:34:17 UTC
[2/2] git commit: GIRAPH-515: GIRAPH-515: More efficient and flexible
edge-based input
Updated Branches:
refs/heads/trunk ae3d29faa -> 212326153
GIRAPH-515: GIRAPH-515: More efficient and flexible edge-based input
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/21232615
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/21232615
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/21232615
Branch: refs/heads/trunk
Commit: 2123261537c3ccb041d067f5079533490db2d1ef
Parents: ae3d29f
Author: Alessandro Presta <al...@fb.com>
Authored: Fri Feb 15 16:20:43 2013 -0800
Committer: Alessandro Presta <al...@fb.com>
Committed: Fri Feb 15 16:33:00 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../ByteArrayVertexPageRankBenchmark.java | 39 ++
...MultiGraphByteArrayVertexPageRankBenchmark.java | 39 ++
.../apache/giraph/benchmark/PageRankBenchmark.java | 16 +-
.../java/org/apache/giraph/comm/SendCache.java | 179 +++++++++
.../java/org/apache/giraph/comm/SendEdgeCache.java | 97 +++++
.../org/apache/giraph/comm/SendMessageCache.java | 138 +------
.../java/org/apache/giraph/comm/ServerData.java | 11 +
.../giraph/comm/WorkerClientRequestProcessor.java | 13 +
.../java/org/apache/giraph/comm/WorkerServer.java | 7 -
.../messages/ByteArrayMessagesPerVertexStore.java | 16 +-
.../netty/NettyWorkerClientRequestProcessor.java | 71 ++++-
.../giraph/comm/netty/NettyWorkerServer.java | 11 +-
.../apache/giraph/comm/requests/RequestType.java | 2 +
.../comm/requests/SendWorkerDataRequest.java | 111 ++++++
.../comm/requests/SendWorkerEdgesRequest.java | 76 ++++
.../comm/requests/SendWorkerMessagesRequest.java | 73 +---
.../apache/giraph/conf/GiraphConfiguration.java | 18 +-
.../org/apache/giraph/conf/GiraphConstants.java | 47 +++-
.../conf/ImmutableClassesGiraphConfiguration.java | 46 ++-
.../java/org/apache/giraph/graph/EdgeStore.java | 176 +++++++++
.../giraph/partition/ByteArrayPartition.java | 18 +-
.../giraph/partition/DiskBackedPartitionStore.java | 29 +-
.../org/apache/giraph/utils/ByteArrayEdges.java | 290 +++++++++++++++
.../apache/giraph/utils/ByteArrayVertexIdData.java | 226 +++++++++++
.../giraph/utils/ByteArrayVertexIdEdges.java | 84 +++++
.../giraph/utils/ByteArrayVertexIdMessages.java | 240 +++----------
.../org/apache/giraph/utils/VertexIdIterator.java | 85 +++++
.../org/apache/giraph/utils/WritableUtils.java | 35 ++
.../org/apache/giraph/vertex/ByteArrayVertex.java | 63 ++++
.../apache/giraph/vertex/ByteArrayVertexBase.java | 126 +++++++
.../org/apache/giraph/vertex/EdgeListVertex.java | 2 +-
.../giraph/vertex/MultiGraphByteArrayVertex.java | 49 +++
.../org/apache/giraph/worker/BspServiceWorker.java | 6 +-
.../giraph/worker/EdgeInputSplitsCallable.java | 2 +-
.../org/apache/giraph/comm/RequestFailureTest.java | 6 +-
.../java/org/apache/giraph/comm/RequestTest.java | 6 +-
.../java/org/apache/giraph/utils/MockUtils.java | 1 +
.../apache/giraph/vertex/TestMultiGraphVertex.java | 8 +-
.../apache/giraph/vertex/TestMutableVertex.java | 6 +-
40 files changed, 2002 insertions(+), 468 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 1db3492..ee31207 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-515: More efficient and flexible edge-based input (apresta)
+
GIRAPH-516: out-of-core messages dies for ArrayIndexOutOfBoundsException when
running out-of-core messages in UnsafeByteArrayOutputStream (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/benchmark/ByteArrayVertexPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ByteArrayVertexPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ByteArrayVertexPageRankBenchmark.java
new file mode 100644
index 0000000..7e51c26
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ByteArrayVertexPageRankBenchmark.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import java.io.IOException;
+import org.apache.giraph.vertex.ByteArrayVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Same benchmark code as {@link PageRankBenchmark}, but uses
+ * {@link org.apache.giraph.vertex.ByteArrayVertex}
+ * implementation.
+ */
+public class ByteArrayVertexPageRankBenchmark extends
+ ByteArrayVertex<LongWritable, DoubleWritable,
+ DoubleWritable, DoubleWritable> {
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws
+ IOException {
+ PageRankComputation.computePageRank(this, messages);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/benchmark/MultiGraphByteArrayVertexPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/MultiGraphByteArrayVertexPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/MultiGraphByteArrayVertexPageRankBenchmark.java
new file mode 100644
index 0000000..9144641
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/MultiGraphByteArrayVertexPageRankBenchmark.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.vertex.MultiGraphByteArrayVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Vertex for PageRank benchmark based on {@link
+ * org.apache.giraph.vertex.MultiGraphByteArrayVertex}
+ */
+public class MultiGraphByteArrayVertexPageRankBenchmark
+ extends MultiGraphByteArrayVertex<LongWritable, DoubleWritable,
+ DoubleWritable, DoubleWritable> {
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws
+ IOException {
+ PageRankComputation.computePageRank(this, messages);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 19b08bd..8341dce 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -82,13 +82,13 @@ public class PageRankBenchmark implements Tool {
"vertexClass",
true,
"Vertex class (0 for HashMapVertex, 1 for EdgeListVertex, " +
- "2 for RepresentativeVertex, " +
- "3 for RepresentativeVertex with unsafe, " +
+ "2 for ByteArrayVertex, " +
+ "3 for ByteArrayVertex with unsafe, " +
"4 for HashMapVertex (using EdgeInputFormat), " +
"5 for MultiGraphEdgeListVertex (using EdgeInputFormat), " +
- "6 for MultiGraphRepresentativeVertex (using " +
+ "6 for MultiGraphByteArrayVertex (using " +
"EdgeInputFormat), " +
- "7 for MultiGraphRepresentativeVertex with unsafe (using " +
+ "7 for MultiGraphByteArrayVertex with unsafe (using " +
"EdgeInputFormat))");
options.addOption("N",
"name",
@@ -176,22 +176,22 @@ public class PageRankBenchmark implements Tool {
HashMapVertexPageRankBenchmark.class);
} else if (vertexClassOption == 2) {
configuration.setVertexClass(
- RepresentativeVertexPageRankBenchmark.class);
+ ByteArrayVertexPageRankBenchmark.class);
configuration.useUnsafeSerialization(false);
} else if (vertexClassOption == 3) {
configuration.setVertexClass(
- RepresentativeVertexPageRankBenchmark.class);
+ ByteArrayVertexPageRankBenchmark.class);
configuration.useUnsafeSerialization(true);
} else if (vertexClassOption == 5) {
configuration.setVertexClass(
MultiGraphEdgeListVertexPageRankBenchmark.class);
} else if (vertexClassOption == 6) {
configuration.setVertexClass(
- MultiGraphRepresentativeVertexPageRankBenchmark.class);
+ MultiGraphByteArrayVertexPageRankBenchmark.class);
configuration.useUnsafeSerialization(false);
} else if (vertexClassOption == 7) {
configuration.setVertexClass(
- MultiGraphRepresentativeVertexPageRankBenchmark.class);
+ MultiGraphByteArrayVertexPageRankBenchmark.class);
configuration.useUnsafeSerialization(true);
}
LOG.info("Using vertex class " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
new file mode 100644
index 0000000..1e8bdf9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract structure for caching data indexed by vertex id,
+ * to be sent to workers in bulk. Not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <T> Data
+ * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
+ */
+@SuppressWarnings("unchecked")
+public abstract class SendCache<I extends WritableComparable, T,
+ B extends ByteArrayVertexIdData<I, T>> {
+ /** Internal cache */
+ private final ByteArrayVertexIdData<I, T>[] dataCache;
+ /** Size of data (in bytes) for each worker */
+ private final int[] dataSizes;
+ /** How big to initially make output streams for each worker's partitions */
+ private final int[] initialBufferSizes;
+ /** List of partition ids belonging to a worker */
+ private final Map<WorkerInfo, List<Integer>> workerPartitions =
+ Maps.newHashMap();
+ /** Giraph configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
+
+ /**
+ * Constructor.
+ *
+ * @param conf Giraph configuration
+ * @param serviceWorker Service worker
+ * @param maxRequestSize Maximum request size (in bytes)
+ * @param additionalRequestSize Additional request size (expressed as a
+ * ratio of the average request size)
+ */
+ public SendCache(ImmutableClassesGiraphConfiguration conf,
+ CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+ int maxRequestSize,
+ float additionalRequestSize) {
+ this.conf = conf;
+
+ int maxPartition = 0;
+ for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
+ List<Integer> workerPartitionIds =
+ workerPartitions.get(partitionOwner.getWorkerInfo());
+ if (workerPartitionIds == null) {
+ workerPartitionIds = Lists.newArrayList();
+ workerPartitions.put(partitionOwner.getWorkerInfo(),
+ workerPartitionIds);
+ }
+ workerPartitionIds.add(partitionOwner.getPartitionId());
+ maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
+ }
+ dataCache = new ByteArrayVertexIdData[maxPartition + 1];
+
+ int maxWorker = 0;
+ for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+ maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
+ }
+ dataSizes = new int[maxWorker + 1];
+
+ int initialRequestSize =
+ (int) (maxRequestSize * (1 + additionalRequestSize));
+ initialBufferSizes = new int[maxWorker + 1];
+ for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+ initialBufferSizes[workerInfo.getTaskId()] =
+ initialRequestSize / workerPartitions.get(workerInfo).size();
+ }
+ }
+
+ /**
+ * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
+ *
+ * @return A new instance of {@link ByteArrayVertexIdData}
+ */
+ public abstract B createByteArrayVertexIdData();
+
+ /**
+ * Add data to the cache.
+ *
+ * @param workerInfo the remote worker destination
+ * @param partitionId the remote Partition this message belongs to
+ * @param destVertexId vertex id that is ultimate destination
+ * @param data Data to send to remote worker
+ * @return Size of messages for the worker.
+ */
+ public int addData(WorkerInfo workerInfo,
+ int partitionId, I destVertexId, T data) {
+ // Get the data collection
+ ByteArrayVertexIdData<I, T> partitionData = dataCache[partitionId];
+ int originalSize = 0;
+ if (partitionData == null) {
+ partitionData = createByteArrayVertexIdData();
+ partitionData.setConf(conf);
+ partitionData.initialize(initialBufferSizes[workerInfo.getTaskId()]);
+ dataCache[partitionId] = partitionData;
+ } else {
+ originalSize = partitionData.getSize();
+ }
+ partitionData.add(destVertexId, data);
+
+ // Update the size of cached, outgoing data per worker
+ dataSizes[workerInfo.getTaskId()] +=
+ partitionData.getSize() - originalSize;
+ return dataSizes[workerInfo.getTaskId()];
+ }
+
+ /**
+ * Gets the data for a worker and removes it from the cache.
+ *
+ * @param workerInfo the address of the worker who owns the data
+ * partitions that are receiving the data
+ * @return List of pairs (partitionId, ByteArrayVertexIdData),
+ * where all partition ids belong to workerInfo
+ */
+ public PairList<Integer, B>
+ removeWorkerData(WorkerInfo workerInfo) {
+ PairList<Integer, B> workerData = new PairList<Integer, B>();
+ List<Integer> partitions = workerPartitions.get(workerInfo);
+ workerData.initialize(partitions.size());
+ for (Integer partitionId : partitions) {
+ if (dataCache[partitionId] != null) {
+ workerData.add(partitionId, (B) dataCache[partitionId]);
+ dataCache[partitionId] = null;
+ }
+ }
+ dataSizes[workerInfo.getTaskId()] = 0;
+ return workerData;
+ }
+
+ /**
+ * Gets all the data and removes it from the cache.
+ *
+ * @return All data for all vertices for all partitions
+ */
+ public PairList<WorkerInfo, PairList<Integer, B>> removeAllData() {
+ PairList<WorkerInfo, PairList<Integer, B>> allData =
+ new PairList<WorkerInfo, PairList<Integer, B>>();
+ allData.initialize(dataSizes.length);
+ for (WorkerInfo workerInfo : workerPartitions.keySet()) {
+ PairList<Integer, B> workerData = removeWorkerData(workerInfo);
+ if (!workerData.isEmpty()) {
+ allData.add(workerInfo, workerData);
+ }
+ dataSizes[workerInfo.getTaskId()] = 0;
+ }
+ return allData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
new file mode 100644
index 0000000..f239c1a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Aggregates the edges to be sent to workers so they can be sent
+ * in bulk. Not thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class SendEdgeCache<I extends WritableComparable, E extends Writable>
+ extends SendCache<I, Edge<I, E>, ByteArrayVertexIdEdges<I, E>> {
+ /**
+ * Constructor
+ *
+ * @param conf Giraph configuration
+ * @param serviceWorker Service worker
+ */
+ public SendEdgeCache(ImmutableClassesGiraphConfiguration conf,
+ CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
+ super(conf,
+ serviceWorker,
+ conf.getInt(GiraphConstants.MAX_EDGE_REQUEST_SIZE,
+ GiraphConstants.MAX_EDGE_REQUEST_SIZE_DEFAULT),
+ conf.getFloat(GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE,
+ GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE_DEFAULT));
+ }
+
+ @Override
+ public ByteArrayVertexIdEdges<I, E> createByteArrayVertexIdData() {
+ return new ByteArrayVertexIdEdges<I, E>();
+ }
+
+ /**
+ * Add an edge to the cache.
+ *
+ * @param workerInfo the remote worker destination
+ * @param partitionId the remote Partition this edge belongs to
+ * @param destVertexId vertex id that is ultimate destination
+ * @param edge Edge to send to remote worker
+ * @return Size of edges for the worker.
+ */
+ public int addEdge(WorkerInfo workerInfo,
+ int partitionId, I destVertexId, Edge<I, E> edge) {
+ return addData(workerInfo, partitionId, destVertexId, edge);
+ }
+
+ /**
+ * Gets the edges for a worker and removes it from the cache.
+ *
+ * @param workerInfo the address of the worker who owns the data
+ * partitions that are receiving the edges
+ * @return List of pairs (partitionId, ByteArrayVertexIdEdges),
+ * where all partition ids belong to workerInfo
+ */
+ public PairList<Integer, ByteArrayVertexIdEdges<I, E>>
+ removeWorkerEdges(WorkerInfo workerInfo) {
+ return removeWorkerData(workerInfo);
+ }
+
+ /**
+ * Gets all the edges and removes them from the cache.
+ *
+ * @return All vertex edges for all partitions
+ */
+ public PairList<WorkerInfo, PairList<Integer, ByteArrayVertexIdEdges<I, E>>>
+ removeAllEdges() {
+ return removeAllData();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 3cbf0eb..07dc380 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -21,56 +21,21 @@ package org.apache.giraph.comm;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
+import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
/**
- * Aggregates the messages to be send to workers so they can be sent
+ * Aggregates the messages to be sent to workers so they can be sent
* in bulk. Not thread-safe.
*
* @param <I> Vertex id
* @param <M> Message data
*/
-@SuppressWarnings("rawtypes")
-public class SendMessageCache<I extends WritableComparable,
- M extends Writable> {
- /**
- * How much bigger than the average per partition size to make initial per
- * partition buffers.
- * If this value is A, message request size is M,
- * and a worker has P partitions, than its initial partition buffer size
- * will be (M / P) * (1 + A).
- */
- public static final String ADDITIONAL_MSG_REQUEST_SIZE =
- "giraph.additionalMsgRequestSize";
- /**
- * Default factor for how bigger should initial per partition buffers be
- * of 20%.
- */
- public static final float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
-
- /** Internal cache */
- private final ByteArrayVertexIdMessages<I, M>[] messageCache;
- /** Size of messages (in bytes) for each worker */
- private final int[] messageSizes;
- /** How big to initially make output streams for each worker's partitions */
- private final int[] initialBufferSizes;
- /** List of partition ids belonging to a worker */
- private final Map<WorkerInfo, List<Integer>> workerPartitions =
- Maps.newHashMap();
- /** Giraph configuration */
- private final ImmutableClassesGiraphConfiguration conf;
-
+public class SendMessageCache<I extends WritableComparable, M extends Writable>
+ extends SendCache<I, M, ByteArrayVertexIdMessages<I, M>> {
/**
* Constructor
*
@@ -79,39 +44,17 @@ public class SendMessageCache<I extends WritableComparable,
*/
public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
- this.conf = conf;
-
- int maxPartition = 0;
- for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
- List<Integer> workerPartitionIds =
- workerPartitions.get(partitionOwner.getWorkerInfo());
- if (workerPartitionIds == null) {
- workerPartitionIds = Lists.newArrayList();
- workerPartitions.put(partitionOwner.getWorkerInfo(),
- workerPartitionIds);
- }
- workerPartitionIds.add(partitionOwner.getPartitionId());
- maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
- }
- messageCache = new ByteArrayVertexIdMessages[maxPartition + 1];
-
- int maxWorker = 0;
- for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
- maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
- }
- messageSizes = new int[maxWorker + 1];
+ super(conf,
+ serviceWorker,
+ conf.getInt(GiraphConstants.MAX_MSG_REQUEST_SIZE,
+ GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT),
+ conf.getFloat(GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE,
+ GiraphConstants.ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT));
+ }
- float additionalRequestSize =
- conf.getFloat(ADDITIONAL_MSG_REQUEST_SIZE,
- ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT);
- int requestSize = conf.getInt(GiraphConstants.MAX_MSG_REQUEST_SIZE,
- GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT);
- int initialRequestSize = (int) (requestSize * (1 + additionalRequestSize));
- initialBufferSizes = new int[maxWorker + 1];
- for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
- initialBufferSizes[workerInfo.getTaskId()] =
- initialRequestSize / workerPartitions.get(workerInfo).size();
- }
+ @Override
+ public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
+ return new ByteArrayVertexIdMessages<I, M>();
}
/**
@@ -120,30 +63,12 @@ public class SendMessageCache<I extends WritableComparable,
* @param workerInfo the remote worker destination
* @param partitionId the remote Partition this message belongs to
* @param destVertexId vertex id that is ultimate destination
- * @param message Message to be send to remote
- * <b>host => partition => vertex</b>
+ * @param message Message to send to remote worker
* @return Size of messages for the worker.
*/
public int addMessage(WorkerInfo workerInfo,
- final int partitionId, I destVertexId, M message) {
- // Get the message collection
- ByteArrayVertexIdMessages<I, M> partitionMessages =
- messageCache[partitionId];
- int originalSize = 0;
- if (partitionMessages == null) {
- partitionMessages = new ByteArrayVertexIdMessages<I, M>();
- partitionMessages.setConf(conf);
- partitionMessages.initialize(initialBufferSizes[workerInfo.getTaskId()]);
- messageCache[partitionId] = partitionMessages;
- } else {
- originalSize = partitionMessages.getSize();
- }
- partitionMessages.add(destVertexId, message);
-
- // Update the size of cached, outgoing messages per worker
- messageSizes[workerInfo.getTaskId()] +=
- partitionMessages.getSize() - originalSize;
- return messageSizes[workerInfo.getTaskId()];
+ int partitionId, I destVertexId, M message) {
+ return addData(workerInfo, partitionId, destVertexId, message);
}
/**
@@ -156,18 +81,7 @@ public class SendMessageCache<I extends WritableComparable,
*/
public PairList<Integer, ByteArrayVertexIdMessages<I, M>>
removeWorkerMessages(WorkerInfo workerInfo) {
- PairList<Integer, ByteArrayVertexIdMessages<I, M>> workerMessages =
- new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
- List<Integer> partitions = workerPartitions.get(workerInfo);
- workerMessages.initialize(partitions.size());
- for (Integer partitionId : partitions) {
- if (messageCache[partitionId] != null) {
- workerMessages.add(partitionId, messageCache[partitionId]);
- messageCache[partitionId] = null;
- }
- }
- messageSizes[workerInfo.getTaskId()] = 0;
- return workerMessages;
+ return removeWorkerData(workerInfo);
}
/**
@@ -177,20 +91,6 @@ public class SendMessageCache<I extends WritableComparable,
*/
public PairList<WorkerInfo, PairList<
Integer, ByteArrayVertexIdMessages<I, M>>> removeAllMessages() {
- PairList<WorkerInfo, PairList<Integer,
- ByteArrayVertexIdMessages<I, M>>>
- allMessages = new PairList<WorkerInfo,
- PairList<Integer, ByteArrayVertexIdMessages<I, M>>>();
- allMessages.initialize(messageSizes.length);
- for (WorkerInfo workerInfo : workerPartitions.keySet()) {
- PairList<Integer, ByteArrayVertexIdMessages<I,
- M>> workerMessages =
- removeWorkerMessages(workerInfo);
- if (!workerMessages.isEmpty()) {
- allMessages.add(workerInfo, workerMessages);
- }
- messageSizes[workerInfo.getTaskId()] = 0;
- }
- return allMessages;
+ return removeAllData();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 3655d79..7b4baa1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -18,12 +18,14 @@
package org.apache.giraph.comm;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.EdgeStore;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.partition.DiskBackedPartitionStore;
import org.apache.giraph.partition.PartitionStore;
@@ -48,6 +50,8 @@ public class ServerData<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
/** Partition store for this worker. */
private volatile PartitionStore<I, V, E, M> partitionStore;
+ /** Edge store for this worker. */
+ private final EdgeStore<I, V, E, M> edgeStore;
/** Message store factory */
private final
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> messageStoreFactory;
@@ -79,11 +83,13 @@ public class ServerData<I extends WritableComparable,
/**
* Constructor.
*
+ * @param service Service worker
* @param configuration Configuration
* @param messageStoreFactory Factory for message stores
* @param context Mapper context
*/
public ServerData(
+ CentralizedServiceWorker<I, V, E, M> service,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
messageStoreFactory,
@@ -100,10 +106,15 @@ public class ServerData<I extends WritableComparable,
partitionStore =
new SimplePartitionStore<I, V, E, M>(configuration, context);
}
+ edgeStore = new EdgeStore<I, V, E, M>(service, configuration, context);
ownerAggregatorData = new OwnerAggregatorServerData(context);
allAggregatorData = new AllAggregatorServerData(context);
}
+ public EdgeStore<I, V, E, M> getEdgeStore() {
+ return edgeStore;
+ }
+
/**
* Return the partition store for this worker.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
index 0c043e2..43311f4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
@@ -74,6 +74,19 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException;
/**
+ * Sends a request to the source vertex owner to add an edge.
+ * Note: this request follows an optimized code path used by edge-based
+ * input, and doesn't coordinate with mutations.
+ *
+ * @param sourceVertexId Source vertex id.
+ * @param edge Edge to be added.
+ * @return Returns true iff any network I/O occurred.
+ * @throws IOException
+ */
+ boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+ throws IOException;
+
+ /**
* Sends a request to the appropriate vertex range owner to remove all edges
* pointing to a given vertex.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
index e60db55..e373b2c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
@@ -52,13 +52,6 @@ public interface WorkerServer<I extends WritableComparable,
void prepareSuperstep(GraphState<I, V, E, M> graphState);
/**
- * Only resolve mutations requests (used for edge-oriented input).
- *
- * @param graphState Current graph state
- */
- void resolveMutations(GraphState<I, V, E, M> graphState);
-
- /**
* Get server data
*
* @return Server data
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 65caa5d..3cd1175 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -19,20 +19,22 @@
package org.apache.giraph.comm.messages;
import com.google.common.collect.Iterators;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.RepresentativeByteArrayIterable;
import org.apache.giraph.utils.RepresentativeByteArrayIterator;
+import org.apache.giraph.utils.VertexIdIterator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
/**
* Implementation of {@link SimpleMessageStore} where multiple messages are
* stored per vertex as byte arrays. Used when there is no combiner provided.
@@ -65,7 +67,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
*/
private ExtendedDataOutput getExtendedDataOutput(
ConcurrentMap<I, ExtendedDataOutput> partitionMap,
- ByteArrayVertexIdMessages<I, M>.VertexIdIterator iterator) {
+ VertexIdIterator<I> iterator) {
ExtendedDataOutput extendedDataOutput =
partitionMap.get(iterator.getCurrentVertexId());
if (extendedDataOutput == null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index d4e919e..0fc1858 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -17,8 +17,12 @@
*/
package org.apache.giraph.comm.netty;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.PercentGauge;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.SendEdgeCache;
import org.apache.giraph.comm.SendMessageCache;
import org.apache.giraph.comm.SendMutationsCache;
import org.apache.giraph.comm.SendPartitionCache;
@@ -29,6 +33,7 @@ import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
import org.apache.giraph.comm.requests.SendVertexRequest;
+import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.WorkerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
@@ -41,6 +46,7 @@ import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.vertex.Vertex;
@@ -50,10 +56,6 @@ import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.util.PercentGauge;
-
import java.io.IOException;
import java.util.Map;
@@ -67,6 +69,7 @@ import java.util.Map;
* @param <E> Edge data
* @param <M> Message data
*/
+@SuppressWarnings("unchecked")
public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> implements
WorkerClientRequestProcessor<I, V, E, M> {
@@ -77,6 +80,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
private final SendPartitionCache<I, V, E, M> sendPartitionCache;
/** Cached map of partitions to vertex indices to messages */
private final SendMessageCache<I, M> sendMessageCache;
+ /** Cache of edges to be sent. */
+ private final SendEdgeCache<I, E> sendEdgeCache;
/** Cached map of partitions to vertex indices to mutations */
private final SendMutationsCache<I, V, E, M> sendMutationsCache =
new SendMutationsCache<I, V, E, M>();
@@ -86,6 +91,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
private long totalMsgsSentInSuperstep = 0;
/** Maximum size of messages per remote worker to cache before sending */
private final int maxMessagesSizePerWorker;
+ /** Maximum size of edges per remote worker to cache before sending. */
+ private final int maxEdgesSizePerWorker;
/** Maximum number of mutations per partition before sending */
private final int maxMutationsPerPartition;
/** Giraph configuration */
@@ -119,9 +126,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
configuration);
sendMessageCache =
new SendMessageCache<I, M>(configuration, serviceWorker);
+ sendEdgeCache = new SendEdgeCache<I, E>(configuration, serviceWorker);
maxMessagesSizePerWorker = configuration.getInt(
GiraphConstants.MAX_MSG_REQUEST_SIZE,
GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT);
+ maxEdgesSizePerWorker = configuration.getInt(
+ GiraphConstants.MAX_EDGE_REQUEST_SIZE,
+ GiraphConstants.MAX_EDGE_REQUEST_SIZE_DEFAULT);
maxMutationsPerPartition = configuration.getInt(
GiraphConstants.MAX_MUTATIONS_PER_REQUEST,
GiraphConstants.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
@@ -135,7 +146,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS);
final Gauge<Long> totalRequests = smr.getGauge(MetricNames.TOTAL_REQUESTS,
new Gauge<Long>() {
- @Override public Long value() {
+ @Override
+ public Long value() {
return localRequests.count() + remoteRequests.count();
}
}
@@ -174,7 +186,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
workerMessages =
sendMessageCache.removeWorkerMessages(workerInfo);
WritableRequest writableRequest =
- new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
+ new SendWorkerMessagesRequest<I, M>(workerMessages);
doRequest(workerInfo, writableRequest);
return true;
}
@@ -278,6 +290,36 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
partitionId, partitionOwner, partitionMutationCount);
}
+ @Override
+ public boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+ throws IOException {
+ PartitionOwner owner =
+ serviceWorker.getVertexPartitionOwner(sourceVertexId);
+ WorkerInfo workerInfo = owner.getWorkerInfo();
+ final int partitionId = owner.getPartitionId();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("sendEdgeRequest: Send bytes (" + edge.toString() +
+ ") to " + sourceVertexId + " on worker " + workerInfo);
+ }
+
+ // Add the message to the cache
+ int workerEdgesSize = sendEdgeCache.addEdge(
+ workerInfo, partitionId, sourceVertexId, edge);
+
+ // Send a request if the cache of outgoing edges to the remote worker is
+ // full
+ if (workerEdgesSize >= maxEdgesSizePerWorker) {
+ PairList<Integer, ByteArrayVertexIdEdges<I, E>> workerEdges =
+ sendEdgeCache.removeWorkerEdges(workerInfo);
+ WritableRequest writableRequest =
+ new SendWorkerEdgesRequest<I, E>(workerEdges);
+ doRequest(workerInfo, writableRequest);
+ return true;
+ }
+
+ return false;
+ }
+
/**
* Send a mutations request if the maximum number of mutations per partition
* was met.
@@ -376,11 +418,26 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
while (iterator.hasNext()) {
iterator.next();
WritableRequest writableRequest =
- new SendWorkerMessagesRequest<I, V, E, M>(
+ new SendWorkerMessagesRequest<I, M>(
iterator.getCurrentSecond());
doRequest(iterator.getCurrentFirst(), writableRequest);
}
+ // Execute the remaining sends edges (if any)
+ PairList<WorkerInfo, PairList<Integer,
+ ByteArrayVertexIdEdges<I, E>>>
+ remainingEdgeCache = sendEdgeCache.removeAllEdges();
+ PairList<WorkerInfo,
+ PairList<Integer, ByteArrayVertexIdEdges<I, E>>>.Iterator
+ edgeIterator = remainingEdgeCache.getIterator();
+ while (edgeIterator.hasNext()) {
+ edgeIterator.next();
+ WritableRequest writableRequest =
+ new SendWorkerEdgesRequest<I, E>(
+ edgeIterator.getCurrentSecond());
+ doRequest(edgeIterator.getCurrentFirst(), writableRequest);
+ }
+
// Execute the remaining sends mutations (if any)
Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
sendMutationsCache.removeAllPartitionMutations();
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 1b7cc54..697b6ce 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -90,7 +90,8 @@ public class NettyWorkerServer<I extends WritableComparable,
this.service = service;
serverData =
- new ServerData<I, V, E, M>(conf, createMessageStoreFactory(), context);
+ new ServerData<I, V, E, M>(service, conf, createMessageStoreFactory(),
+ context);
nettyServer = new NettyServer(conf,
new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData),
@@ -153,8 +154,12 @@ public class NettyWorkerServer<I extends WritableComparable,
resolveMutations(graphState);
}
- @Override
- public void resolveMutations(GraphState<I, V, E, M> graphState) {
+ /**
+ * Resolve mutation requests.
+ *
+ * @param graphState Graph state
+ */
+ private void resolveMutations(GraphState<I, V, E, M> graphState) {
Multimap<Integer, I> resolveVertexIndices = HashMultimap.create(
service.getPartitionStore().getNumPartitions(), 100);
// Add any mutated vertex indices to be resolved
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index aac0028..4129fb8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -42,6 +42,8 @@ public enum RequestType {
*/
SEND_PARTITION_CURRENT_MESSAGES_REQUEST
(SendPartitionCurrentMessagesRequest.class),
+ /** Send a partition of edges */
+ SEND_WORKER_EDGES_REQUEST(SendWorkerEdgesRequest.class),
/** Send a partition of mutations */
SEND_PARTITION_MUTATIONS_REQUEST(SendPartitionMutationsRequest.class),
/** Send aggregated values from one worker's vertices */
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
new file mode 100644
index 0000000..4f80224
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.utils.PairList;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Abstract request to send a collection of data, indexed by vertex id,
+ * for a partition.
+ *
+ * @param <I> Vertex id
+ * @param <T> Data
+ * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
+ */
+public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
+ B extends ByteArrayVertexIdData<I, T>>
+ extends WritableRequest implements WorkerRequest {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SendWorkerDataRequest.class);
+ /**
+ * All data for a group of vertices, organized by partition, which
+ * are owned by a single (destination) worker. This data is all
+ * destined for this worker.
+ * */
+ protected PairList<Integer, B> partitionVertexData;
+
+ /**
+ * Constructor used for reflection only
+ */
+ public SendWorkerDataRequest() { }
+
+ /**
+ * Constructor used to send request.
+ *
+ * @param partVertData Map of remote partitions =>
+ * ByteArrayVertexIdData
+ */
+ public SendWorkerDataRequest(
+ PairList<Integer, B> partVertData) {
+ this.partitionVertexData = partVertData;
+ }
+
+ /**
+ * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
+ *
+ * @return A new instance of {@link ByteArrayVertexIdData}
+ */
+ public abstract B createByteArrayVertexIdData();
+
+ @Override
+ public void readFieldsRequest(DataInput input) throws IOException {
+ int numPartitions = input.readInt();
+ partitionVertexData = new PairList<Integer, B>();
+ partitionVertexData.initialize(numPartitions);
+ while (numPartitions-- > 0) {
+ final int partitionId = input.readInt();
+ B vertexIdData = createByteArrayVertexIdData();
+ vertexIdData.setConf(getConf());
+ vertexIdData.readFields(input);
+ partitionVertexData.add(partitionId, vertexIdData);
+ }
+ }
+
+ @Override
+ public void writeRequest(DataOutput output) throws IOException {
+ output.writeInt(partitionVertexData.getSize());
+ PairList<Integer, B>.Iterator
+ iterator = partitionVertexData.getIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ output.writeInt(iterator.getCurrentFirst());
+ iterator.getCurrentSecond().write(output);
+ }
+ }
+
+ @Override
+ public int getSerializedSize() {
+ int size = super.getSerializedSize() + 4;
+ PairList<Integer, B>.Iterator iterator = partitionVertexData.getIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ size += 4 + iterator.getCurrentSecond().getSerializedSize();
+ }
+ return size;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
new file mode 100644
index 0000000..f301bbf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.PairList;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Send a collection of edges for a partition.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+@SuppressWarnings("unchecked")
+public class SendWorkerEdgesRequest<I extends WritableComparable,
+ E extends Writable>
+ extends SendWorkerDataRequest<I, Edge<I, E>,
+ ByteArrayVertexIdEdges<I, E>> {
+ /**
+ * Constructor used for reflection only
+ */
+ public SendWorkerEdgesRequest() { }
+
+ /**
+ * Constructor used to send request.
+ *
+ * @param partVertEdges Map of remote partitions =>
+ * ByteArrayVertexIdEdges
+ */
+ public SendWorkerEdgesRequest(
+ PairList<Integer, ByteArrayVertexIdEdges<I, E>> partVertEdges) {
+ this.partitionVertexData = partVertEdges;
+ }
+
+ @Override
+ public ByteArrayVertexIdEdges<I, E> createByteArrayVertexIdData() {
+ return new ByteArrayVertexIdEdges<I, E>();
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.SEND_WORKER_EDGES_REQUEST;
+ }
+
+ @Override
+ public void doRequest(ServerData serverData) {
+ PairList<Integer, ByteArrayVertexIdEdges<I, E>>.Iterator
+ iterator = partitionVertexData.getIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ serverData.getEdgeStore().
+ addPartitionEdges(iterator.getCurrentFirst(),
+ iterator.getCurrentSecond());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index 641c795..04b633b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -18,39 +18,24 @@
package org.apache.giraph.comm.requests;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
+
+import java.io.IOException;
/**
* Send a collection of vertex messages for a partition.
*
* @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
* @param <M> Message data
*/
-@SuppressWarnings("rawtypes")
+@SuppressWarnings("unchecked")
public class SendWorkerMessagesRequest<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> extends
- WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SendWorkerMessagesRequest.class);
- /**
- * All messages for a group of vertices, organized by partition, which
- * are owned by a single (destination) worker. These messages are all
- * destined for this worker.
- * */
- private PairList<Integer, ByteArrayVertexIdMessages<I, M>>
- partitionVertexMessages;
-
+ M extends Writable>
+ extends SendWorkerDataRequest<I, M, ByteArrayVertexIdMessages<I, M>> {
/**
* Constructor used for reflection only
*/
@@ -63,37 +48,13 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
* ByteArrayVertexIdMessages
*/
public SendWorkerMessagesRequest(
- PairList<Integer, ByteArrayVertexIdMessages<I, M>> partVertMsgs) {
- super();
- this.partitionVertexMessages = partVertMsgs;
+ PairList<Integer, ByteArrayVertexIdMessages<I, M>> partVertMsgs) {
+ this.partitionVertexData = partVertMsgs;
}
@Override
- public void readFieldsRequest(DataInput input) throws IOException {
- int numPartitions = input.readInt();
- partitionVertexMessages =
- new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
- partitionVertexMessages.initialize(numPartitions);
- while (numPartitions-- > 0) {
- final int partitionId = input.readInt();
- ByteArrayVertexIdMessages<I, M> vertexIdMessages =
- new ByteArrayVertexIdMessages<I, M>();
- vertexIdMessages.setConf(getConf());
- vertexIdMessages.readFields(input);
- partitionVertexMessages.add(partitionId, vertexIdMessages);
- }
- }
-
- @Override
- public void writeRequest(DataOutput output) throws IOException {
- output.writeInt(partitionVertexMessages.getSize());
- PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
- iterator = partitionVertexMessages.getIterator();
- while (iterator.hasNext()) {
- iterator.next();
- output.writeInt(iterator.getCurrentFirst());
- iterator.getCurrentSecond().write(output);
- }
+ public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
+ return new ByteArrayVertexIdMessages<I, M>();
}
@Override
@@ -102,9 +63,9 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
}
@Override
- public void doRequest(ServerData<I, V, E, M> serverData) {
+ public void doRequest(ServerData serverData) {
PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
- iterator = partitionVertexMessages.getIterator();
+ iterator = partitionVertexData.getIterator();
while (iterator.hasNext()) {
iterator.next();
try {
@@ -116,16 +77,4 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
}
}
}
-
- @Override
- public int getSerializedSize() {
- int size = super.getSerializedSize() + 4;
- PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
- iterator = partitionVertexMessages.getIterator();
- while (iterator.hasNext()) {
- iterator.next();
- size += 4 + iterator.getCurrentSecond().getSerializedSize();
- }
- return size;
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 9e129ef..96fada4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -18,16 +18,14 @@
package org.apache.giraph.conf;
-import java.net.UnknownHostException;
-
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.job.DefaultJobObserver;
-import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.job.DefaultJobObserver;
+import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
@@ -39,6 +37,8 @@ import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.DNS;
+import java.net.UnknownHostException;
+
/**
* Adds user methods specific to Giraph. This will be put into an
* ImmutableClassesGiraphConfiguration that provides the configuration plus
@@ -701,6 +701,16 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Check if we can reuse incoming edge objects.
+ *
+ * @return True iff we can reuse incoming edge objects.
+ */
+ public boolean reuseIncomingEdgeObjects() {
+ return getBoolean(GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS,
+ GiraphConstants.REUSE_INCOMING_EDGE_OBJECTS_DEFAULT);
+ }
+
+ /**
* Get the local hostname on the given interface.
*
* @return The local hostname
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 44d09c9..20d6df7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -337,10 +337,36 @@ public interface GiraphConstants {
/** Default maximum size of messages per peer before flush of 0.5MB */
int MAX_MSG_REQUEST_SIZE_DEFAULT = 512 * 1024;
- /** Maximum number of messages per peer before flush */
- String MSG_SIZE = "giraph.msgSize";
- /** Default maximum number of messages per peer before flush */
- int MSG_SIZE_DEFAULT = 2000;
+ /**
+ * How much bigger than the average per partition size to make initial per
+ * partition buffers.
+ * If this value is A, message request size is M,
+ * and a worker has P partitions, than its initial partition buffer size
+ * will be (M / P) * (1 + A).
+ */
+ String ADDITIONAL_MSG_REQUEST_SIZE =
+ "giraph.additionalMsgRequestSize";
+ /**
+ * Default factor for how bigger should initial per partition buffers be
+ * of 20%.
+ */
+ float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
+
+ /** Maximum size of edges (in bytes) per peer before flush */
+ String MAX_EDGE_REQUEST_SIZE = "giraph.edgeRequestSize";
+ /** Default maximum size of edges per peer before flush of 0.5MB */
+ int MAX_EDGE_REQUEST_SIZE_DEFAULT = 512 * 1024;
+
+ /**
+ * Additional size (expressed as a ratio) of each per-partition buffer on
+ * top of the average size.
+ */
+ String ADDITIONAL_EDGE_REQUEST_SIZE =
+ "giraph.additionalEdgeRequestSize";
+ /**
+ * Default additional per-partition buffer size.
+ */
+ float ADDITIONAL_EDGE_REQUEST_SIZE_DEFAULT = 0.2f;
/** Maximum number of mutations per partition before flush */
String MAX_MUTATIONS_PER_REQUEST = "giraph.maxMutationsPerRequest";
@@ -348,6 +374,19 @@ public interface GiraphConstants {
int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
/**
+ * Whether we should reuse the same Edge object when adding edges from
+ * requests.
+ * This works with edge storage implementations that don't keep references
+ * to the input Edge objects (e.g., ByteArrayVertex).
+ */
+ String REUSE_INCOMING_EDGE_OBJECTS = "giraph.reuseIncomingEdgeObjects";
+ /**
+ * Default is to not reuse edge objects (since it's not compatible with
+ * all storage implementations).
+ */
+ boolean REUSE_INCOMING_EDGE_OBJECTS_DEFAULT = false;
+
+ /**
* Use message size encoding (typically better for complex objects,
* not meant for primitive wrapped messages)
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 3e158af..18fd9ef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,21 +20,23 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.job.GiraphJobObserver;
-import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeNoValue;
import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.master.MasterCompute;
-import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.MutableEdge;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.graph.VertexResolver;
-import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.job.GiraphJobObserver;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.MasterGraphPartitioner;
import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.partition.PartitionStats;
-import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.utils.ExtendedByteArrayDataInput;
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.giraph.utils.ExtendedDataInput;
@@ -42,6 +44,8 @@ import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
@@ -499,6 +503,32 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Create a user edge.
+ *
+ * @return Instantiated user edge.
+ */
+ public Edge<I, E> createEdge() {
+ if (isEdgeValueNullWritable()) {
+ return (Edge<I, E>) new EdgeNoValue<I>(createVertexId());
+ } else {
+ return new DefaultEdge<I, E>(createVertexId(), createEdgeValue());
+ }
+ }
+
+ /**
+ * Create a mutable user edge.
+ *
+ * @return Instantiated mutable user edge.
+ */
+ public MutableEdge<I, E> createMutableEdge() {
+ if (isEdgeValueNullWritable()) {
+ return (MutableEdge<I, E>) new EdgeNoValue<I>(createVertexId());
+ } else {
+ return new DefaultEdge<I, E>(createVertexId(), createEdgeValue());
+ }
+ }
+
+ /**
* Get the user's subclassed vertex message value class.
*
* @return User's vertex message value class
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
new file mode 100644
index 0000000..6210367
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.MapMaker;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ByteArrayEdges;
+import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Collects incoming edges for vertices owned by this worker.
+ * Note: the current implementation is simply a bridge between
+ * incoming requests and vertices. In the future, EdgeStore may become an
+ * interface allowing for alternative, pluggable implementations of edge
+ * storage without having to extend Vertex.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeStore<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(EdgeStore.class);
+ /** Service worker. */
+ private CentralizedServiceWorker<I, V, E, M> service;
+ /** Giraph configuration. */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ /** Progressable to report progress. */
+ private Progressable progressable;
+ /** Map used to temporarily store incoming edges. */
+ private ConcurrentMap<Integer,
+ ConcurrentMap<I, ByteArrayEdges<I, E>>> transientEdges;
+ /**
+ * Whether we should reuse edge objects (cached to avoid expensive calls
+ * to the configuration).
+ */
+ private boolean reuseIncomingEdgeObjects;
+
+ /**
+ * Constructor.
+ *
+ * @param service Service worker
+ * @param configuration Configuration
+ * @param progressable Progressable
+ */
+ public EdgeStore(
+ CentralizedServiceWorker<I, V, E, M> service,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ Progressable progressable) {
+ this.service = service;
+ this.configuration = configuration;
+ this.progressable = progressable;
+ reuseIncomingEdgeObjects = configuration.reuseIncomingEdgeObjects();
+ transientEdges = new MapMaker().concurrencyLevel(
+ configuration.getNettyServerExecutionConcurrency()).makeMap();
+ }
+
+ /**
+ * Add edges belonging to a given partition on this worker.
+ * Note: This method is thread-safe.
+ *
+ * @param partitionId Partition id for the incoming edges.
+ * @param edges Incoming edges
+ */
+ public void addPartitionEdges(
+ int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
+ ConcurrentMap<I, ByteArrayEdges<I, E>> partitionEdges =
+ transientEdges.get(partitionId);
+ if (partitionEdges == null) {
+ ConcurrentMap<I, ByteArrayEdges<I, E>> newPartitionEdges =
+ new MapMaker().concurrencyLevel(
+ configuration.getNettyServerExecutionConcurrency()).makeMap();
+ partitionEdges = transientEdges.putIfAbsent(partitionId,
+ newPartitionEdges);
+ if (partitionEdges == null) {
+ partitionEdges = newPartitionEdges;
+ }
+ }
+ ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
+ edges.getVertexIdEdgeIterator();
+ while (vertexIdEdgeIterator.hasNext()) {
+ vertexIdEdgeIterator.next();
+ I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
+ Edge<I, E> edge = vertexIdEdgeIterator.getCurrentEdge();
+ ByteArrayEdges<I, E> vertexEdges = partitionEdges.get(vertexId);
+ if (vertexEdges == null) {
+ ByteArrayEdges<I, E> newVertexEdges =
+ new ByteArrayEdges<I, E>(configuration);
+ vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges);
+ if (vertexEdges == null) {
+ vertexEdges = newVertexEdges;
+ // Since we had to use the vertex id as a new key in the map,
+ // we need to release the object.
+ vertexIdEdgeIterator.releaseCurrentVertexId();
+ }
+ }
+ synchronized (vertexEdges) {
+ vertexEdges.appendEdge(edge);
+ }
+ }
+ }
+
+ /**
+ * Move all edges from temporary storage to their source vertices.
+ * Note: this method is not thread-safe.
+ */
+ public void moveEdgesToVertices() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
+ }
+ for (Map.Entry<Integer, ConcurrentMap<I,
+ ByteArrayEdges<I, E>>> partitionEdges : transientEdges.entrySet()) {
+ Partition<I, V, E, M> partition =
+ service.getPartitionStore().getPartition(partitionEdges.getKey());
+ for (I vertexId : partitionEdges.getValue().keySet()) {
+ // Depending on whether the vertex implementation keeps references to
+ // the Edge objects or not, we may be able to reuse objects when
+ // iterating.
+ Iterable<Edge<I, E>> edgesIterable = reuseIncomingEdgeObjects ?
+ partitionEdges.getValue().remove(vertexId) :
+ partitionEdges.getValue().remove(vertexId).copyEdgeIterable();
+ Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
+ // If the source vertex doesn't exist, create it. Otherwise,
+ // just set the edges.
+ if (vertex == null) {
+ vertex = configuration.createVertex();
+ vertex.initialize(vertexId, configuration.createVertexValue(),
+ edgesIterable);
+ partition.putVertex(vertex);
+ } else {
+ vertex.setEdges(edgesIterable);
+ // Some Partition implementations (e.g. ByteArrayPartition) require
+ // us to put back the vertex after modifying it.
+ partition.saveVertex(vertex);
+ }
+ progressable.progress();
+ }
+ // Some PartitionStore implementations (e.g. DiskBackedPartitionStore)
+ // require us to put back the partition after modifying it.
+ service.getPartitionStore().putPartition(partition);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
+ "vertices.");
+ }
+ transientEdges.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index 1298918..2260837 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -18,8 +18,14 @@
package org.apache.giraph.partition;
import com.google.common.collect.MapMaker;
-
import com.google.common.primitives.Ints;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -27,14 +33,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.utils.UnsafeByteArrayInputStream;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
/**
* Byte array based partition. Should reduce the amount of memory used since
* the entire graph is compressed into byte arrays. Must guarantee, however,
@@ -48,8 +46,6 @@ import org.apache.log4j.Logger;
public class ByteArrayPartition<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends BasicPartition<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
/**
* Vertex map for this range (keyed by index). Note that the byte[] is a
* serialized vertex with the first four bytes as the length of the vertex
http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 725de39..585ab85 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -18,6 +18,20 @@
package org.apache.giraph.partition;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.log4j.Logger;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -41,21 +55,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
/**
* Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
* Thread-safe, but expects the caller to synchronized between deletes, adds,