You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/11/27 21:01:45 UTC
svn commit: r1414361 [1/2] - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/messages/
giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/m...
Author: aching
Date: Tue Nov 27 20:01:38 2012
New Revision: 1414361
URL: http://svn.apache.org/viewvc?rev=1414361&view=rev
Log:
GIRAPH-435: Serialize server messages for memory and less GC. (aching)
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
Removed:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/CollectionOfMessagesPerVertexStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Nov 27 20:01:38 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-435: Serialize server messages for memory and less GC. (aching)
+
GIRAPH-420: build formats in profiles where it works. (nitay)
GIRAPH-421: Aggregate metrics up to master. (nitay)
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Tue Nov 27 20:01:38 2012
@@ -31,6 +31,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.master.MasterObserver;
+
import org.apache.hadoop.conf.Configuration;
/**
@@ -380,6 +381,17 @@ public class GiraphConfiguration extends
/** Default number of messages that can be bulk sent during a flush */
public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
+ /**
+ * Use message size encoding (typically better for complex objects,
+ * not meant for primitive wrapped messages)
+ */
+ public static final String USE_MESSAGE_SIZE_ENCODING =
+ "giraph.useMessageSizeEncoding";
+ /**
+ * By default, do not use message size encoding as it is experimental.
+ */
+ public static final boolean USE_MESSAGE_SIZE_ENCODING_DEFAULT = false;
+
/** Number of channels used per server */
public static final String CHANNELS_PER_SERVER =
"giraph.channelsPerServer";
@@ -1102,4 +1114,15 @@ public class GiraphConfiguration extends
public void useUnsafeSerialization(boolean useUnsafeSerialization) {
setBoolean(USE_UNSAFE_SERIALIZATION, useUnsafeSerialization);
}
+
+ /**
+ * Use message size encoding? This feature may help with complex message
+ * objects.
+ *
+ * @return Whether to use message size encoding
+ */
+ public boolean useMessageSizeEncoding() {
+ return getBoolean(
+ USE_MESSAGE_SIZE_ENCODING, USE_MESSAGE_SIZE_ENCODING_DEFAULT);
+ }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java Tue Nov 27 20:01:38 2012
@@ -18,6 +18,9 @@
package org.apache.giraph;
+
+import java.util.List;
+
import org.apache.giraph.graph.AggregatorWriter;
import org.apache.giraph.graph.Combiner;
import org.apache.giraph.graph.DefaultMasterCompute;
@@ -51,8 +54,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
-import java.util.List;
-
/**
* The classes set here are immutable, the remaining configuration is mutable.
* Classes are immutable and final to provide the best performance for
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java Tue Nov 27 20:01:38 2012
@@ -26,7 +26,7 @@ import org.apache.giraph.ImmutableClasse
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -42,7 +42,7 @@ import org.apache.hadoop.io.WritableComp
public class SendMessageCache<I extends WritableComparable,
M extends Writable> {
/** Internal cache */
- private final ByteArrayVertexIdMessageCollection<I, M>[] messageCache;
+ private final ByteArrayVertexIdMessages<I, M>[] messageCache;
/** Number of messages in each partition */
private final int[] messageCounts;
/** List of partition ids belonging to a worker */
@@ -73,7 +73,7 @@ public class SendMessageCache<I extends
workerPartitionIds.add(partitionOwner.getPartitionId());
maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
}
- messageCache = new ByteArrayVertexIdMessageCollection[maxPartition + 1];
+ messageCache = new ByteArrayVertexIdMessages[maxPartition + 1];
int maxWorker = 0;
for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
@@ -95,10 +95,10 @@ public class SendMessageCache<I extends
public int addMessage(WorkerInfo workerInfo,
final int partitionId, I destVertexId, M message) {
// Get the message collection
- ByteArrayVertexIdMessageCollection<I, M> partitionMessages =
+ ByteArrayVertexIdMessages<I, M> partitionMessages =
messageCache[partitionId];
if (partitionMessages == null) {
- partitionMessages = new ByteArrayVertexIdMessageCollection<I, M>();
+ partitionMessages = new ByteArrayVertexIdMessages<I, M>();
partitionMessages.setConf(conf);
partitionMessages.initialize();
messageCache[partitionId] = partitionMessages;
@@ -115,13 +115,13 @@ public class SendMessageCache<I extends
*
* @param workerInfo the address of the worker who owns the data
* partitions that are receiving the messages
- * @return List of pairs (partitionId, ByteArrayVertexIdMessageCollection),
+ * @return List of pairs (partitionId, ByteArrayVertexIdMessages),
* where all partition ids belong to workerInfo
*/
- public PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
+ public PairList<Integer, ByteArrayVertexIdMessages<I, M>>
removeWorkerMessages(WorkerInfo workerInfo) {
- PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>> workerMessages =
- new PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>();
+ PairList<Integer, ByteArrayVertexIdMessages<I, M>> workerMessages =
+ new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
workerMessages.initialize();
for (Integer partitionId : workerPartitions.get(workerInfo)) {
if (messageCache[partitionId] != null) {
@@ -139,15 +139,15 @@ public class SendMessageCache<I extends
* @return All vertex messages for all partitions
*/
public PairList<WorkerInfo, PairList<
- Integer, ByteArrayVertexIdMessageCollection<I, M>>> removeAllMessages() {
+ Integer, ByteArrayVertexIdMessages<I, M>>> removeAllMessages() {
PairList<WorkerInfo, PairList<Integer,
- ByteArrayVertexIdMessageCollection<I, M>>>
+ ByteArrayVertexIdMessages<I, M>>>
allMessages = new PairList<WorkerInfo,
- PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>>();
+ PairList<Integer, ByteArrayVertexIdMessages<I, M>>>();
allMessages.initialize();
for (WorkerInfo workerInfo : workerPartitions.keySet()) {
- PairList<Integer, ByteArrayVertexIdMessageCollection<I,
- M>> workerMessages =
+ PairList<Integer, ByteArrayVertexIdMessages<I,
+ M>> workerMessages =
removeWorkerMessages(workerInfo);
if (!workerMessages.isEmpty()) {
allMessages.add(workerInfo, workerMessages);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java Tue Nov 27 20:01:38 2012
@@ -18,13 +18,10 @@
package org.apache.giraph.comm.messages;
+import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
/**
* Most basic message store with just add, get and clear operations
*
@@ -34,21 +31,24 @@ import java.util.Map;
public interface BasicMessageStore<I extends WritableComparable,
M extends Writable> extends Writable {
/**
- * Adds messages
+ * Adds messages from one message store to another
*
- * @param messages Map of messages we want to add
- * @throws IOException
+ * @param messageStore Add the messages from this message store to this
+ * object
+ * @throws java.io.IOException
*/
- void addMessages(Map<I, Collection<M>> messages) throws IOException;
+ void addMessages(MessageStore<I, M> messageStore) throws IOException;
/**
- * Gets messages for a vertex.
+ * Gets messages for a vertex. The lifetime of every message is only
+ * guaranteed until the iterator's next() method is called. Do not re-use
+ * the messages.
*
* @param vertexId Vertex id for which we want to get messages
- * @return Messages for vertex with required id
+ * @return Iterable of messages for a vertex id
* @throws IOException
*/
- Collection<M> getVertexMessages(I vertexId) throws IOException;
+ Iterable<M> getVertexMessages(I vertexId) throws IOException;
/**
* Clears messages for a vertex.
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java?rev=1414361&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java Tue Nov 27 20:01:38 2012
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import com.google.common.collect.Iterators;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.giraph.utils.RepresentativeByteArrayIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Implementation of {@link SimpleMessageStore} where multiple messages are
+ * stored per vertex as byte arrays. Used when there is no combiner provided.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
+ M extends Writable> extends SimpleMessageStore<I, M, ExtendedDataOutput> {
+ /**
+ * Constructor
+ *
+ * @param service Service worker
+ * @param config Hadoop configuration
+ */
+ public ByteArrayMessagesPerVertexStore(
+ CentralizedServiceWorker<I, ?, ?, M> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+ super(service, config);
+ }
+
+ /**
+ * Get the extended data output for a vertex id from the iterator, creating
+ * if necessary. This method will take ownership of the vertex id from the
+ * iterator if necessary (if used in the partition map entry).
+ *
+ * @param partitionMap Partition map to look in
+ * @param iterator Special iterator that can release ownerhips of vertex ids
+ * @return Extended data output for this vertex id (created if necessary)
+ */
+ private ExtendedDataOutput getExtendedDataOutput(
+ ConcurrentMap<I, ExtendedDataOutput> partitionMap,
+ ByteArrayVertexIdMessages<I, M>.VertexIdIterator iterator) {
+ ExtendedDataOutput extendedDataOutput =
+ partitionMap.get(iterator.getCurrentVertexId());
+ if (extendedDataOutput == null) {
+ ExtendedDataOutput newExtendedDataOutput =
+ config.createExtendedDataOutput();
+ extendedDataOutput =
+ partitionMap.putIfAbsent(
+ iterator.releaseCurrentVertexId(),
+ newExtendedDataOutput);
+ if (extendedDataOutput == null) {
+ extendedDataOutput = newExtendedDataOutput;
+ }
+ }
+ return extendedDataOutput;
+ }
+
+ @Override
+ public void addPartitionMessages(
+ int partitionId,
+ ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+ ConcurrentMap<I, ExtendedDataOutput> partitionMap =
+ getOrCreatePartitionMap(partitionId);
+ ByteArrayVertexIdMessages<I, M>.VertexIdMessageBytesIterator
+ vertexIdMessageBytesIterator =
+ messages.getVertexIdMessageBytesIterator();
+ // Try to copy the message buffer over rather than
+ // doing a deserialization of a message just to know its size. This
+ // should be more efficient for complex objects where serialization is
+ // expensive. If this type of iterator is not available, fall back to
+ // deserializing/serializing the messages
+ if (vertexIdMessageBytesIterator != null) {
+ while (vertexIdMessageBytesIterator.hasNext()) {
+ vertexIdMessageBytesIterator.next();
+ ExtendedDataOutput extendedDataOutput =
+ getExtendedDataOutput(partitionMap, vertexIdMessageBytesIterator);
+
+ synchronized (extendedDataOutput) {
+ vertexIdMessageBytesIterator.writeCurrentMessageBytes(
+ extendedDataOutput);
+ }
+ }
+ } else {
+ ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+ vertexIdMessageIterator = messages.getVertexIdMessageIterator();
+ while (vertexIdMessageIterator.hasNext()) {
+ vertexIdMessageIterator.next();
+ ExtendedDataOutput extendedDataOutput =
+ getExtendedDataOutput(partitionMap, vertexIdMessageIterator);
+
+ synchronized (extendedDataOutput) {
+ vertexIdMessageIterator.getCurrentMessage().write(
+ extendedDataOutput);
+ }
+ }
+ }
+ }
+
+ /**
+ * Special iterable that recycles the message
+ */
+ private class MessagesIterable extends RepresentativeByteArrayIterable<M> {
+ /**
+ * Constructor
+ *
+ * @param buf Buffer
+ * @param off Offset to start in the buffer
+ * @param length Length of the buffer
+ */
+ private MessagesIterable(byte[] buf, int off, int length) {
+ super(config, buf, off, length);
+ }
+
+ @Override
+ protected M createWritable() {
+ return config.createMessageValue();
+ }
+ }
+
+ @Override
+ protected Iterable<M> getMessagesAsIterable(
+ ExtendedDataOutput extendedDataOutput) {
+
+ return new MessagesIterable(extendedDataOutput.getByteArray(), 0,
+ extendedDataOutput.getPos());
+ }
+
+ /**
+ * Special iterator only for counting messages
+ */
+ private class RepresentativeMessageIterator extends
+ RepresentativeByteArrayIterator<M> {
+ /**
+ * Constructor
+ *
+ * @param configuration Configuration
+ * @param buf buffer to read from
+ * @param off Offset into the buffer to start from
+ * @param length Length of the buffer
+ */
+ public RepresentativeMessageIterator(
+ ImmutableClassesGiraphConfiguration configuration,
+ byte[] buf, int off, int length) {
+ super(configuration, buf, off, length);
+ }
+
+ @Override
+ protected M createWritable() {
+ return config.createMessageValue();
+ }
+ }
+
+ @Override
+ protected int getNumberOfMessagesIn(
+ ConcurrentMap<I, ExtendedDataOutput> partitionMap) {
+ int numberOfMessages = 0;
+ for (ExtendedDataOutput extendedDataOutput : partitionMap.values()) {
+ numberOfMessages += Iterators.size(
+ new RepresentativeMessageIterator(config,
+ extendedDataOutput.getByteArray(), 0,
+ extendedDataOutput.getPos()));
+ }
+ return numberOfMessages;
+ }
+
+ @Override
+ protected void writeMessages(ExtendedDataOutput extendedDataOutput,
+ DataOutput out) throws IOException {
+ out.writeInt(extendedDataOutput.getPos());
+ out.write(
+ extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+ }
+
+ @Override
+ protected ExtendedDataOutput readFieldsForMessages(DataInput in) throws
+ IOException {
+ int byteArraySize = in.readInt();
+ byte[] messages = new byte[byteArraySize];
+ in.readFully(messages);
+ ExtendedDataOutput extendedDataOutput =
+ config.createExtendedDataOutput(messages, 0);
+ return extendedDataOutput;
+ }
+
+ /**
+ * Create new factory for this message store
+ *
+ * @param service Worker service
+ * @param config Hadoop configuration
+ * @param <I> Vertex id
+ * @param <M> Message data
+ * @return Factory
+ */
+ public static <I extends WritableComparable, M extends Writable>
+ MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+ CentralizedServiceWorker<I, ?, ?, M> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+ return new Factory<I, M>(service, config);
+ }
+
+ @Override
+ public void addMessages(MessageStore<I, M> messageStore) throws IOException {
+ if (messageStore instanceof ByteArrayMessagesPerVertexStore) {
+ ByteArrayMessagesPerVertexStore<I, M>
+ byteArrayMessagesPerVertexStore =
+ (ByteArrayMessagesPerVertexStore<I, M>) messageStore;
+ for (Map.Entry<Integer, ConcurrentMap<I, ExtendedDataOutput>>
+ partitionEntry : byteArrayMessagesPerVertexStore.map.entrySet()) {
+ for (Map.Entry<I, ExtendedDataOutput> vertexEntry :
+ partitionEntry.getValue().entrySet()) {
+ ConcurrentMap<I, ExtendedDataOutput> partitionMap =
+ getOrCreatePartitionMap(partitionEntry.getKey());
+ ExtendedDataOutput extendedDataOutput =
+ partitionMap.get(vertexEntry.getKey());
+ if (extendedDataOutput == null) {
+ ExtendedDataOutput newExtendedDataOutput =
+ config.createExtendedDataOutput();
+ extendedDataOutput =
+ partitionMap.putIfAbsent(vertexEntry.getKey(),
+ newExtendedDataOutput);
+ if (extendedDataOutput == null) {
+ extendedDataOutput = newExtendedDataOutput;
+ }
+ }
+
+ // Add the messages
+ extendedDataOutput.write(vertexEntry.getValue().getByteArray(), 0,
+ vertexEntry.getValue().getPos());
+ }
+ }
+ } else {
+ throw new IllegalArgumentException("addMessages: Illegal argument " +
+ messageStore.getClass());
+ }
+ }
+
+ /**
+ * Factory for {@link ByteArrayMessagesPerVertexStore}
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+ private static class Factory<I extends WritableComparable, M extends Writable>
+ implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+ /** Service worker */
+ private final CentralizedServiceWorker<I, ?, ?, M> service;
+ /** Hadoop configuration */
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+
+ /**
+ * @param service Worker service
+ * @param config Hadoop configuration
+ */
+ public Factory(CentralizedServiceWorker<I, ?, ?, M> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+ this.service = service;
+ this.config = config;
+ }
+
+ @Override
+ public MessageStoreByPartition<I, M> newStore() {
+ return new ByteArrayMessagesPerVertexStore(service, config);
+ }
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java Tue Nov 27 20:01:38 2012
@@ -18,21 +18,14 @@
package org.apache.giraph.comm.messages;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.CollectionUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -40,9 +33,14 @@ import java.util.concurrent.ConcurrentSk
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
/**
- * Message storage with in memory map of messages and with support for
+ * Message storage with in-memory map of messages and with support for
* flushing all the messages to the disk.
*
* @param <I> Vertex id
@@ -50,11 +48,15 @@ import java.util.concurrent.locks.Reentr
*/
public class DiskBackedMessageStore<I extends WritableComparable,
M extends Writable> implements FlushableMessageStore<I, M> {
- /** In memory message map */
- private volatile ConcurrentNavigableMap<I, Collection<M>> inMemoryMessages;
+ /**
+ * In-memory message map (must be sorted to insure that the ids are
+ * ordered)
+ */
+ private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
+ inMemoryMessages;
/** Hadoop configuration */
private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
- /** Counter for number of messages in memory */
+ /** Counter for number of messages in-memory */
private final AtomicInteger numberOfMessagesInMemory;
/** To keep vertex ids which we have messages for */
private final Set<I> destinationVertices;
@@ -67,13 +69,15 @@ public class DiskBackedMessageStore<I ex
private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
/**
+ * Constructor.
+ *
* @param config Hadoop configuration
* @param fileStoreFactory Factory for creating file stores when flushing
*/
public DiskBackedMessageStore(
ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
- inMemoryMessages = new ConcurrentSkipListMap<I, Collection<M>>();
+ inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
this.config = config;
numberOfMessagesInMemory = new AtomicInteger(0);
destinationVertices =
@@ -82,37 +86,90 @@ public class DiskBackedMessageStore<I ex
this.fileStoreFactory = fileStoreFactory;
}
- @Override
- public void addVertexMessages(I vertexId,
- Collection<M> messages) throws IOException {
+ /**
+ * Add vertex messages
+ *
+ * @param vertexId Vertex id to use
+ * @param messages Messages to add (note that the lifetime of the messages)
+ * is only until next() is called again)
+ * @return True if the vertex id ownership is taken by this method,
+ * false otherwise
+ * @throws IOException
+ */
+ boolean addVertexMessages(I vertexId,
+ Iterable<M> messages) throws IOException {
+ boolean ownsVertexId = false;
destinationVertices.add(vertexId);
-
rwLock.readLock().lock();
try {
- CollectionUtils.addConcurrent(vertexId, messages, inMemoryMessages);
- numberOfMessagesInMemory.addAndGet(messages.size());
+ ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+ if (extendedDataOutput == null) {
+ ExtendedDataOutput newExtendedDataOutput =
+ config.createExtendedDataOutput();
+ extendedDataOutput =
+ inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
+ if (extendedDataOutput == null) {
+ ownsVertexId = true;
+ extendedDataOutput = newExtendedDataOutput;
+ }
+ }
+
+ for (M message : messages) {
+ message.write(extendedDataOutput);
+ numberOfMessagesInMemory.getAndIncrement();
+ }
} finally {
rwLock.readLock().unlock();
}
+
+ return ownsVertexId;
}
@Override
- public void addMessages(Map<I, Collection<M>> messages) throws IOException {
- for (Entry<I, Collection<M>> entry : messages.entrySet()) {
- addVertexMessages(entry.getKey(), entry.getValue());
+ public void addMessages(MessageStore<I, M> messageStore) throws
+ IOException {
+ for (I destinationVertex : messageStore.getDestinationVertices()) {
+ addVertexMessages(destinationVertex,
+ messageStore.getVertexMessages(destinationVertex));
+ }
+ }
+
+ /**
+ * Special iterable that recycles the message
+ */
+ private class MessageIterable extends RepresentativeByteArrayIterable<M> {
+ /**
+ * Constructor
+ *
+ * @param buf Buffer
+ * @param off Offset to start in the buffer
+ * @param length Length of the buffer
+ */
+ public MessageIterable(
+ byte[] buf, int off, int length) {
+ super(config, buf, off, length);
+ }
+
+ @Override
+ protected M createWritable() {
+ return config.createMessageValue();
}
}
@Override
- public Collection<M> getVertexMessages(I vertexId) throws IOException {
- Collection<M> messages = inMemoryMessages.get(vertexId);
- if (messages == null) {
- messages = Lists.newArrayList();
+ public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+ ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+ if (extendedDataOutput == null) {
+ extendedDataOutput = config.createExtendedDataOutput();
}
+ Iterable<M> combinedIterable = new MessageIterable(
+ extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+
for (BasicMessageStore<I, M> fileStore : fileStores) {
- messages.addAll(fileStore.getVertexMessages(vertexId));
+ combinedIterable = Iterables.concat(combinedIterable,
+ fileStore.getVertexMessages(vertexId));
}
- return messages;
+ return combinedIterable;
}
@Override
@@ -145,19 +202,94 @@ public class DiskBackedMessageStore<I ex
fileStores.clear();
}
+ /**
+ * Special temporary message store for passing along in-memory messages
+ */
+ private class TemporaryMessageStore implements MessageStore<I, M> {
+ /**
+ * In-memory message map (must be sorted to insure that the ids are
+ * ordered)
+ */
+ private final ConcurrentNavigableMap<I, ExtendedDataOutput>
+ temporaryMessages;
+
+ /**
+ * Constructor.
+ *
+ * @param temporaryMessages Messages to be owned by this object
+ */
+ private TemporaryMessageStore(
+ ConcurrentNavigableMap<I, ExtendedDataOutput>
+ temporaryMessages) {
+ this.temporaryMessages = temporaryMessages;
+ }
+
+ @Override
+ public int getNumberOfMessages() {
+ throw new IllegalAccessError("getNumberOfMessages: Not supported");
+ }
+
+ @Override
+ public boolean hasMessagesForVertex(I vertexId) {
+ return temporaryMessages.containsKey(vertexId);
+ }
+
+ @Override
+ public Iterable<I> getDestinationVertices() {
+ return temporaryMessages.keySet();
+ }
+
+ @Override
+ public void addMessages(MessageStore<I, M> messageStore)
+ throws IOException {
+ throw new IllegalAccessError("addMessages: Not supported");
+ }
+
+ @Override
+ public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+ ExtendedDataOutput extendedDataOutput = temporaryMessages.get(vertexId);
+ if (extendedDataOutput == null) {
+ extendedDataOutput = config.createExtendedDataOutput();
+ }
+ return new MessageIterable(extendedDataOutput.getByteArray(), 0,
+ extendedDataOutput.getPos());
+ }
+
+ @Override
+ public void clearVertexMessages(I vertexId) throws IOException {
+ temporaryMessages.remove(vertexId);
+ }
+
+ @Override
+ public void clearAll() throws IOException {
+ temporaryMessages.clear();
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ throw new IllegalAccessError("write: Not supported");
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ throw new IllegalAccessError("readFields: Not supported");
+ }
+ }
+
@Override
public void flush() throws IOException {
- ConcurrentNavigableMap<I, Collection<M>> messagesToFlush = null;
+ ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
rwLock.writeLock().lock();
try {
messagesToFlush = inMemoryMessages;
- inMemoryMessages = new ConcurrentSkipListMap<I, Collection<M>>();
+ inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
numberOfMessagesInMemory.set(0);
} finally {
rwLock.writeLock().unlock();
}
BasicMessageStore<I, M> fileStore = fileStoreFactory.newStore();
- fileStore.addMessages(messagesToFlush);
+ fileStore.addMessages(new TemporaryMessageStore(messagesToFlush));
+
synchronized (fileStores) {
fileStores.add(fileStore);
}
@@ -171,14 +303,15 @@ public class DiskBackedMessageStore<I ex
vertexId.write(out);
}
- // write in memory messages map
+ // write of in-memory messages
+ out.writeInt(numberOfMessagesInMemory.get());
+
+ // write in-memory messages map
out.writeInt(inMemoryMessages.size());
- for (Entry<I, Collection<M>> entry : inMemoryMessages.entrySet()) {
+ for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
entry.getKey().write(out);
- out.writeInt(entry.getValue().size());
- for (M message : entry.getValue()) {
- message.write(out);
- }
+ out.writeInt(entry.getValue().getPos());
+ out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
}
// write file stores
@@ -198,20 +331,19 @@ public class DiskBackedMessageStore<I ex
destinationVertices.add(vertexId);
}
- // read in memory map
+ // read in-memory messages
+ numberOfMessagesInMemory.set(in.readInt());
+
+ // read in-memory map
int mapSize = in.readInt();
for (int m = 0; m < mapSize; m++) {
I vertexId = config.createVertexId();
vertexId.readFields(in);
- int numMessages = in.readInt();
- numberOfMessagesInMemory.addAndGet(numMessages);
- List<M> messages = Lists.newArrayList();
- for (int i = 0; i < numMessages; i++) {
- M message = config.createMessageValue();
- message.readFields(in);
- messages.add(message);
- }
- inMemoryMessages.put(vertexId, messages);
+ int messageBytes = in.readInt();
+ byte[] buf = new byte[messageBytes];
+ ExtendedDataOutput extendedDataOutput =
+ config.createExtendedDataOutput(buf, messageBytes);
+ inMemoryMessages.put(vertexId, extendedDataOutput);
}
// read file stores
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java Tue Nov 27 20:01:38 2012
@@ -24,14 +24,13 @@ import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.EmptyIterable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -76,57 +75,60 @@ public class DiskBackedMessageStoreByPar
}
@Override
- public void addVertexMessages(I vertexId,
- Collection<M> messages) throws IOException {
- getMessageStore(vertexId).addVertexMessages(vertexId, messages);
- checkMemory();
- }
-
- @Override
- public void addMessages(Map<I, Collection<M>> messages) throws IOException {
- for (Entry<I, Collection<M>> entry : messages.entrySet()) {
- getMessageStore(entry.getKey()).addVertexMessages(
- entry.getKey(), entry.getValue());
+ public void addPartitionMessages(
+ int partitionId,
+ ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+ FlushableMessageStore<I, M> flushableMessageStore =
+ getMessageStore(partitionId);
+ if (flushableMessageStore instanceof DiskBackedMessageStore) {
+ DiskBackedMessageStore<I, M> diskBackedMessageStore =
+ (DiskBackedMessageStore<I, M>) flushableMessageStore;
+ ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+ vertexIdMessageIterator =
+ messages.getVertexIdMessageIterator();
+ while (vertexIdMessageIterator.hasNext()) {
+ vertexIdMessageIterator.next();
+ boolean ownsVertexId =
+ diskBackedMessageStore.addVertexMessages(
+ vertexIdMessageIterator.getCurrentVertexId(),
+ Collections.singleton(
+ vertexIdMessageIterator.getCurrentMessage()));
+ if (ownsVertexId) {
+ vertexIdMessageIterator.releaseCurrentVertexId();
+ }
+ }
+ } else {
+ throw new IllegalStateException("addPartitionMessages: Doesn't support " +
+ "class " + flushableMessageStore.getClass());
}
checkMemory();
}
@Override
- public void addPartitionMessages(Map<I, Collection<M>> messages,
- int partitionId) throws IOException {
- getMessageStore(partitionId).addMessages(messages);
- checkMemory();
- }
-
- @Override
- public void addPartitionMessages(
- ByteArrayVertexIdMessageCollection<I, M> messages,
- int partitionId) throws IOException {
- Map<I, Collection<M>> map = Maps.newHashMap();
- ByteArrayVertexIdMessageCollection<I, M>.Iterator iterator =
- messages.getIterator();
- while (iterator.hasNext()) {
- iterator.next();
- I vertexId = iterator.getCurrentVertexId();
- M message = iterator.getCurrentMessage();
- Collection<M> currentMessages = map.get(vertexId);
- if (currentMessages == null) {
- currentMessages = Lists.newArrayList(message);
- map.put(vertexId, currentMessages);
+ public void addMessages(MessageStore<I, M> messageStore) throws IOException {
+ for (I destinationVertex : messageStore.getDestinationVertices()) {
+ FlushableMessageStore<I, M> flushableMessageStore =
+ getMessageStore(destinationVertex);
+ if (flushableMessageStore instanceof DiskBackedMessageStore) {
+ DiskBackedMessageStore<I, M> diskBackedMessageStore =
+ (DiskBackedMessageStore<I, M>) flushableMessageStore;
+ Iterable<M> messages =
+ messageStore.getVertexMessages(destinationVertex);
+ diskBackedMessageStore.addVertexMessages(destinationVertex, messages);
} else {
- currentMessages.add(message);
+ throw new IllegalStateException("addMessages: Doesn't support " +
+ "class " + flushableMessageStore.getClass());
}
}
- getMessageStore(partitionId).addMessages(map);
checkMemory();
}
@Override
- public Collection<M> getVertexMessages(I vertexId) throws IOException {
+ public Iterable<M> getVertexMessages(I vertexId) throws IOException {
if (hasMessagesForVertex(vertexId)) {
return getMessageStore(vertexId).getVertexMessages(vertexId);
} else {
- return Collections.emptyList();
+ return EmptyIterable.<M>emptyIterable();
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStore.java Tue Nov 27 20:01:38 2012
@@ -21,9 +21,6 @@ package org.apache.giraph.comm.messages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import java.io.IOException;
-import java.util.Collection;
-
/**
* Message store
*
@@ -33,16 +30,6 @@ import java.util.Collection;
public interface MessageStore<I extends WritableComparable,
M extends Writable> extends BasicMessageStore<I, M> {
/**
- * Adds messages
- *
- * @param vertexId Vertex id for which the messages are
- * @param messages Messages for the vertex
- * @throws IOException
- */
- void addVertexMessages(I vertexId,
- Collection<M> messages) throws IOException;
-
- /**
* Get number of messages in memory
*
* @return Number of messages in memory
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java Tue Nov 27 20:01:38 2012
@@ -21,9 +21,7 @@ package org.apache.giraph.comm.messages;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -38,22 +36,13 @@ public interface MessageStoreByPartition
/**
* Adds messages for partition
*
- * @param messages Map of messages we want to add
* @param partitionId Id of partition
- * @throws IOException
- */
- void addPartitionMessages(Map<I, Collection<M>> messages,
- int partitionId) throws IOException;
-
- /**
- * Adds messages for partition
- *
* @param messages Collection of vertex ids and messages we want to add
- * @param partitionId Id of partition
* @throws IOException
*/
- void addPartitionMessages(ByteArrayVertexIdMessageCollection<I, M> messages,
- int partitionId) throws IOException;
+ void addPartitionMessages(
+ int partitionId, ByteArrayVertexIdMessages<I, M> messages)
+ throws IOException;
/**
* Gets vertex ids from selected partition which we have messages for
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java Tue Nov 27 20:01:38 2012
@@ -18,18 +18,18 @@
package org.apache.giraph.comm.messages;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.graph.Combiner;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.graph.Combiner;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
/**
* Implementation of {@link SimpleMessageStore} where we have a single
@@ -78,29 +78,53 @@ public class OneMessagePerVertexStore<I
return currentMessage;
}
- @Override
- protected void addVertexMessagesToPartition(I vertexId,
- Collection<M> messages,
+ /**
+ * Add a single message for vertex to a partition map
+ *
+ * @param vertexId Id of vertex which received message
+ * @param message Message to add
+ * @param partitionMap Partition map to add the message to
+ * @throws IOException
+ */
+ private void addVertexMessageToPartition(I vertexId, M message,
ConcurrentMap<I, M> partitionMap) throws IOException {
M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
synchronized (currentMessage) {
- for (M message : messages) {
- combiner.combine(vertexId, currentMessage, message);
- }
+ combiner.combine(vertexId, currentMessage, message);
}
}
@Override
- protected void addVertexMessageToPartition(I vertexId, M message,
- ConcurrentMap<I, M> partitionMap) throws IOException {
- M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
- synchronized (currentMessage) {
- combiner.combine(vertexId, currentMessage, message);
+ public void addPartitionMessages(
+ int partitionId,
+ ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+ ConcurrentMap<I, M> partitionMap =
+ getOrCreatePartitionMap(partitionId);
+ ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+ vertexIdMessageIterator = messages.getVertexIdMessageIterator();
+ // This loop is a little complicated as it is optimized to only create
+ // the minimal amount of vertex id and message objects as possible.
+ while (vertexIdMessageIterator.hasNext()) {
+ vertexIdMessageIterator.next();
+ I vertexId = vertexIdMessageIterator.getCurrentVertexId();
+ M currentMessage =
+ partitionMap.get(vertexIdMessageIterator.getCurrentVertexId());
+ if (currentMessage == null) {
+ M newMessage = combiner.createInitialMessage();
+ currentMessage = partitionMap.putIfAbsent(
+ vertexIdMessageIterator.releaseCurrentVertexId(), newMessage);
+ if (currentMessage == null) {
+ currentMessage = newMessage;
+ }
+ }
+ synchronized (currentMessage) {
+ combiner.combine(vertexId, currentMessage,
+ vertexIdMessageIterator.getCurrentMessage());
+ }
}
}
-
@Override
- protected Collection<M> getMessagesAsCollection(M message) {
+ protected Iterable<M> getMessagesAsIterable(M message) {
return Collections.singleton(message);
}
@@ -121,6 +145,27 @@ public class OneMessagePerVertexStore<I
return message;
}
+ @Override
+ public void addMessages(MessageStore<I, M> messageStore) throws IOException {
+ if (messageStore instanceof OneMessagePerVertexStore) {
+ OneMessagePerVertexStore<I, M> oneMessagePerVertexStore =
+ (OneMessagePerVertexStore<I, M>) messageStore;
+ for (Map.Entry<Integer, ConcurrentMap<I, M>>
+ partitionEntry : oneMessagePerVertexStore.map.entrySet()) {
+ ConcurrentMap<I, M> partitionMap =
+ getOrCreatePartitionMap(partitionEntry.getKey());
+ for (Map.Entry<I, M> vertexEntry :
+ partitionEntry.getValue().entrySet()) {
+ addVertexMessageToPartition(vertexEntry.getKey(),
+ vertexEntry.getValue(), partitionMap);
+ }
+ }
+ } else {
+ throw new IllegalArgumentException("addMessages: Illegal argument " +
+ messageStore.getClass());
+ }
+ }
+
/**
* Create new factory for this message store
*
@@ -138,7 +183,7 @@ public class OneMessagePerVertexStore<I
}
/**
- * Factory for {@link CollectionOfMessagesPerVertexStore}
+ * Factory for {@link OneMessagePerVertexStore}
*
* @param <I> Vertex id
* @param <M> Message data
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java Tue Nov 27 20:01:38 2012
@@ -18,14 +18,9 @@
package org.apache.giraph.comm.messages;
-import org.apache.giraph.GiraphConfiguration;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
+import com.google.common.collect.Sets;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInput;
@@ -36,18 +31,21 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.SortedMap;
+import java.util.List;
+import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
/**
* Used for writing and reading collection of messages to the disk. {@link
- * #addMessages(java.util.Map)} should be called only once with the messages
- * we want to store.
+ * #addMessages(MessageStore<I, M>)} should be called only once with
+ * the messages we want to store.
* <p/>
* It's optimized for retrieving messages in the natural order of vertex ids
* they are sent to.
@@ -57,6 +55,9 @@ import java.util.concurrent.atomic.Atomi
*/
public class SequentialFileMessageStore<I extends WritableComparable,
M extends Writable> implements BasicMessageStore<I, M> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SequentialFileMessageStore.class);
/** File in which we store data */
private final File file;
/** Configuration which we need for reading data */
@@ -88,39 +89,56 @@ public class SequentialFileMessageStore<
}
@Override
- public void addMessages(Map<I, Collection<M>> messages) throws IOException {
- SortedMap<I, Collection<M>> map;
- if (!(messages instanceof SortedMap)) {
- map = Maps.newTreeMap();
- map.putAll(messages);
- } else {
- map = (SortedMap) messages;
- }
- writeToFile(map);
- }
-
- /**
- * Writes messages to its file.
- *
- * @param messages Messages to write
- * @throws IOException
- */
- private void writeToFile(SortedMap<I, Collection<M>> messages) throws
- IOException {
+ public void addMessages(MessageStore<I, M> messageStore) throws IOException {
+ // Writes messages to its file
if (file.exists()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addMessages: Deleting " + file);
+ }
file.delete();
}
file.createNewFile();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addMessages: Creating " + file);
+ }
+
DataOutputStream out = null;
try {
out = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file), bufferSize));
- out.writeInt(messages.size());
- for (Entry<I, Collection<M>> entry : messages.entrySet()) {
- entry.getKey().write(out);
- out.writeInt(entry.getValue().size());
- for (M message : entry.getValue()) {
+ int destinationVertexIdCount =
+ Iterables.size(messageStore.getDestinationVertices());
+ out.writeInt(destinationVertexIdCount);
+
+ // Since the message store messages might not be sorted, sort them if
+ // necessary
+ SortedSet<I> sortedSet;
+ if (messageStore.getDestinationVertices() instanceof SortedSet) {
+ sortedSet = (SortedSet<I>) messageStore.getDestinationVertices();
+ } else {
+ sortedSet =
+ Sets.newTreeSet(messageStore.getDestinationVertices());
+ for (I destinationVertexId : messageStore.getDestinationVertices()) {
+ sortedSet.add(destinationVertexId);
+ }
+ }
+
+ // Dump the vertices and their messages in a sorted order
+ for (I destinationVertexId : sortedSet) {
+ destinationVertexId.write(out);
+ Iterable<M> messages =
+ messageStore.getVertexMessages(destinationVertexId);
+ int messageCount = Iterables.size(messages);
+ out.writeInt(messageCount);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addMessages: For vertex id " + destinationVertexId +
+ ", messages = " + messageCount + " to file " + file);
+ }
+ for (M message : messages) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addMessages: Wrote " + message + " to " + file);
+ }
message.write(out);
}
}
@@ -142,8 +160,12 @@ public class SequentialFileMessageStore<
* @throws IOException
*/
@Override
- public Collection<M> getVertexMessages(I vertexId) throws
+ public Iterable<M> getVertexMessages(I vertexId) throws
IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
+ " (currently " + currentVertexId + ") from " + file);
+ }
if (in == null) {
startReading();
}
@@ -154,14 +176,14 @@ public class SequentialFileMessageStore<
}
if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
- return Collections.emptyList();
+ return EmptyIterable.<M>emptyIterable();
}
+
return readMessagesForCurrentVertex();
}
@Override
- public void clearVertexMessages(I vertexId) throws IOException {
- }
+ public void clearVertexMessages(I vertexId) throws IOException { }
@Override
public void clearAll() throws IOException {
@@ -213,6 +235,10 @@ public class SequentialFileMessageStore<
in = new DataInputStream(
new BufferedInputStream(new FileInputStream(file), bufferSize));
verticesLeft = in.readInt();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("startReading: File " + file + " with " +
+ verticesLeft + " vertices left");
+ }
}
/**
@@ -262,10 +288,17 @@ public class SequentialFileMessageStore<
*/
private Collection<M> readMessagesForCurrentVertex() throws IOException {
int messagesSize = in.readInt();
- ArrayList<M> messages = Lists.newArrayList();
+ List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
for (int i = 0; i < messagesSize; i++) {
M message = config.createMessageValue();
- message.readFields(in);
+ try {
+ message.readFields(in);
+ } catch (IOException e) {
+ throw new IllegalStateException("readMessagesForCurrentVertex: " +
+ "Failed to read message from " + i + " of " +
+ messagesSize + " for vertex id " + currentVertexId + " from " +
+ file, e);
+ }
messages.add(message);
}
currentVertexDone();
@@ -291,6 +324,9 @@ public class SequentialFileMessageStore<
* @throws IOException
*/
private void endReading() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("endReading: Stopped reading " + file);
+ }
if (in != null) {
in.close();
in = null;
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Tue Nov 27 20:01:38 2012
@@ -18,24 +18,20 @@
package org.apache.giraph.comm.messages;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
/**
* Abstract class for {@link MessageStoreByPartition} which allows any kind
@@ -72,35 +68,12 @@ public abstract class SimpleMessageStore
}
/**
- * Add collection of messages for vertex to a partition map
- *
- * @param vertexId Id of vertex which received messages
- * @param messages Messages to add
- * @param partitionMap Map which to add to
- * @throws IOException
- */
- protected abstract void addVertexMessagesToPartition(I vertexId,
- Collection<M> messages, ConcurrentMap<I, T> partitionMap) throws
- IOException;
-
- /**
- * Add a single message for vertex to a partition map
- *
- * @param vertexId Id of vertex which received message
- * @param message Message to add
- * @param partitionMap Map which to add to
- * @throws IOException
- */
- protected abstract void addVertexMessageToPartition(I vertexId,
- M message, ConcurrentMap<I, T> partitionMap) throws IOException;
-
- /**
- * Get messages as collection from message storage
+ * Get messages as an iterable from message storage
*
* @param messages Message storage
- * @return Messages as collection
+ * @return Messages as an iterable
*/
- protected abstract Collection<M> getMessagesAsCollection(T messages);
+ protected abstract Iterable<M> getMessagesAsIterable(T messages);
/**
* Get number of messages in partition map
@@ -162,50 +135,6 @@ public abstract class SimpleMessageStore
}
@Override
- public void addMessages(Map<I, Collection<M>> messages) throws IOException {
- for (Map.Entry<I, Collection<M>> entry : messages.entrySet()) {
- addVertexMessages(entry.getKey(), entry.getValue());
- }
- }
-
- @Override
- public void addVertexMessages(I vertexId,
- Collection<M> messages) throws IOException {
- int partitionId = getPartitionId(vertexId);
- ConcurrentMap<I, T> partitionMap = getOrCreatePartitionMap(partitionId);
- addVertexMessagesToPartition(vertexId, messages, partitionMap);
- }
-
- @Override
- public void addPartitionMessages(Map<I, Collection<M>> messages,
- int partitionId) throws IOException {
- ConcurrentMap<I, T> partitionMap =
- getOrCreatePartitionMap(partitionId);
-
- for (Map.Entry<I, Collection<M>> entry : messages.entrySet()) {
- addVertexMessagesToPartition(entry.getKey(), entry.getValue(),
- partitionMap);
- }
- }
-
- @Override
- public void addPartitionMessages(ByteArrayVertexIdMessageCollection<I,
- M> messages,
- int partitionId) throws IOException {
- ConcurrentMap<I, T> partitionMap =
- getOrCreatePartitionMap(partitionId);
-
- ByteArrayVertexIdMessageCollection<I, M>.Iterator iterator =
- messages.getIterator();
- while (iterator.hasNext()) {
- iterator.next();
- I vertexId = iterator.getCurrentVertexId();
- M message = iterator.getCurrentMessage();
- addVertexMessageToPartition(vertexId, message, partitionMap);
- }
- }
-
- @Override
public Iterable<I> getPartitionDestinationVertices(int partitionId) {
ConcurrentMap<I, ?> partitionMap = map.get(partitionId);
return (partitionMap == null) ? Collections.<I>emptyList() :
@@ -229,14 +158,14 @@ public abstract class SimpleMessageStore
}
@Override
- public Collection<M> getVertexMessages(I vertexId) throws IOException {
+ public Iterable<M> getVertexMessages(I vertexId) throws IOException {
ConcurrentMap<I, T> partitionMap = map.get(getPartitionId(vertexId));
if (partitionMap == null) {
return Collections.<M>emptyList();
}
T messages = partitionMap.get(vertexId);
return (messages == null) ? Collections.<M>emptyList() :
- getMessagesAsCollection(messages);
+ getMessagesAsIterable(messages);
}
@Override
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Tue Nov 27 20:01:38 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.comm.netty;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
@@ -32,15 +31,14 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
import org.apache.giraph.comm.netty.handler.ClientRequestId;
-import org.apache.giraph.comm.netty.handler.RequestServerHandler;
-import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
import org.apache.giraph.comm.netty.handler.RequestEncoder;
import org.apache.giraph.comm.netty.handler.RequestInfo;
+import org.apache.giraph.comm.netty.handler.RequestServerHandler;
+import org.apache.giraph.comm.netty.handler.ResponseClientHandler;
/*if[HADOOP_NON_SECURE]
else[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.SaslClientHandler;
@@ -65,8 +63,11 @@ import org.jboss.netty.handler.codec.fra
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
+
import static org.jboss.netty.channel.Channels.pipeline;
+
+
/**
* Netty client for sending requests. Thread-safe.
*/
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Tue Nov 27 20:01:38 2012
@@ -17,9 +17,7 @@
*/
package org.apache.giraph.comm.netty;
-import com.google.common.collect.Maps;
import java.io.IOException;
-import java.util.Collection;
import java.util.Map;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
@@ -45,7 +43,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.ValueGauge;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -146,7 +144,7 @@ public class NettyWorkerClientRequestPro
// Send a request if the cache of outgoing message to
// the remote worker 'workerInfo' is full enough to be flushed
if (workerMessageCount >= maxMessagesPerWorker) {
- PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
+ PairList<Integer, ByteArrayVertexIdMessages<I, M>>
workerMessages =
sendMessageCache.removeWorkerMessages(workerInfo);
WritableRequest writableRequest =
@@ -174,29 +172,41 @@ public class NettyWorkerClientRequestPro
// Messages are stored separately
MessageStoreByPartition<I, M> messageStore =
serverData.getCurrentMessageStore();
- Map<I, Collection<M>> map = Maps.newHashMap();
+ ByteArrayVertexIdMessages<I, M> vertexIdMessages =
+ new ByteArrayVertexIdMessages<I, M>();
+ vertexIdMessages.setConf(configuration);
+ vertexIdMessages.initialize();
int messagesInMap = 0;
for (I vertexId :
messageStore.getPartitionDestinationVertices(partitionId)) {
try {
- Collection<M> messages = messageStore.getVertexMessages(vertexId);
- map.put(vertexId, messages);
- messagesInMap += messages.size();
+ // Messages cannot be re-used from this iterable, but add()
+ // serializes the message, making this safe
+ Iterable<M> messages = messageStore.getVertexMessages(vertexId);
+ for (M message : messages) {
+ vertexIdMessages.add(vertexId, message);
+ ++messagesInMap;
+ }
} catch (IOException e) {
throw new IllegalStateException(
"sendVertexRequest: Got IOException ", e);
}
if (messagesInMap > maxMessagesPerWorker) {
WritableRequest messagesRequest = new
- SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
+ SendPartitionCurrentMessagesRequest<I, V, E, M>(
+ partitionId, vertexIdMessages);
doRequest(workerInfo, messagesRequest);
- map.clear();
+ vertexIdMessages =
+ new ByteArrayVertexIdMessages<I, M>();
+ vertexIdMessages.setConf(configuration);
+ vertexIdMessages.initialize();
messagesInMap = 0;
}
}
- if (!map.isEmpty()) {
+ if (vertexIdMessages != null) {
WritableRequest messagesRequest = new
- SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
+ SendPartitionCurrentMessagesRequest<I, V, E, M>(
+ partitionId, vertexIdMessages);
doRequest(workerInfo, messagesRequest);
}
}
@@ -322,10 +332,10 @@ public class NettyWorkerClientRequestPro
// Execute the remaining sends messages (if any)
PairList<WorkerInfo, PairList<Integer,
- ByteArrayVertexIdMessageCollection<I, M>>>
+ ByteArrayVertexIdMessages<I, M>>>
remainingMessageCache = sendMessageCache.removeAllMessages();
PairList<WorkerInfo,
- PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>>.Iterator
+ PairList<Integer, ByteArrayVertexIdMessages<I, M>>>.Iterator
iterator = remainingMessageCache.getIterator();
while (iterator.hasNext()) {
iterator.next();
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Tue Nov 27 20:01:38 2012
@@ -22,7 +22,7 @@ import org.apache.giraph.GiraphConfigura
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.messages.CollectionOfMessagesPerVertexStore;
+import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
import org.apache.giraph.comm.messages.OneMessagePerVertexStore;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.comm.WorkerServer;
@@ -116,10 +116,10 @@ public class NettyWorkerServer<I extends
} else {
if (LOG.isInfoEnabled()) {
LOG.info("createMessageStoreFactory: " +
- "Using CollectionOfMessagesPerVertexStore " +
+ "Using ByteArrayMessagesPerVertexStore " +
"since there is no combiner");
}
- return CollectionOfMessagesPerVertexStore.newFactory(service, conf);
+ return ByteArrayMessagesPerVertexStore.newFactory(service, conf);
}
} else {
int maxMessagesInMemory = conf.getInt(
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java Tue Nov 27 20:01:38 2012
@@ -18,19 +18,13 @@
package org.apache.giraph.comm.requests;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.giraph.comm.ServerData;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
/**
* Send a collection of vertex messages for a partition. It adds messages to
@@ -44,10 +38,10 @@ import java.util.Map.Entry;
public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> extends
WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
- /** the destination partition for these vertices' messages*/
+ /** Destination partition for these vertices' messages*/
private int partitionId;
- /** map of destination vertex ID's to message lists */
- private Map<I, Collection<M>> vertexMessageMap;
+ /** Map of destination vertex ID's to message lists */
+ private ByteArrayVertexIdMessages<I, M> vertexIdMessageMap;
/** Constructor used for reflection only */
public SendPartitionCurrentMessagesRequest() { }
@@ -59,10 +53,10 @@ public class SendPartitionCurrentMessage
* @param vertexIdMessages Map of messages to send
*/
public SendPartitionCurrentMessagesRequest(int partitionId,
- Map<I, Collection<M>> vertexIdMessages) {
+ ByteArrayVertexIdMessages<I, M> vertexIdMessages) {
super();
this.partitionId = partitionId;
- this.vertexMessageMap = vertexIdMessages;
+ this.vertexIdMessageMap = vertexIdMessages;
}
@Override
@@ -73,42 +67,23 @@ public class SendPartitionCurrentMessage
@Override
public void readFieldsRequest(DataInput input) throws IOException {
partitionId = input.readInt();
- final int numVertices = input.readInt();
- vertexMessageMap =
- Maps.<I, Collection<M>>newHashMapWithExpectedSize(numVertices);
- for (int i = 0; i < numVertices; ++i) {
- I nextVertex = getConf().createVertexId();
- nextVertex.readFields(input);
- final int numMessages = input.readInt();
- Collection<M> messagesForVertex =
- Lists.<M>newArrayListWithExpectedSize(numMessages);
- vertexMessageMap.put(nextVertex, messagesForVertex);
- for (int j = 0; j < numMessages; ++j) {
- M nextMessage = getConf().createMessageValue();
- nextMessage.readFields(input);
- messagesForVertex.add(nextMessage);
- }
- }
+ vertexIdMessageMap = new ByteArrayVertexIdMessages<I, M>();
+ vertexIdMessageMap.setConf(getConf());
+ vertexIdMessageMap.initialize();
+ vertexIdMessageMap.readFields(input);
}
@Override
public void writeRequest(DataOutput output) throws IOException {
output.writeInt(partitionId);
- output.writeInt(vertexMessageMap.size());
- for (Entry<I, Collection<M>> entry : vertexMessageMap.entrySet()) {
- entry.getKey().write(output);
- output.writeInt(entry.getValue().size());
- for (M message : entry.getValue()) {
- message.write(output);
- }
- }
+ vertexIdMessageMap.write(output);
}
@Override
public void doRequest(ServerData<I, V, E, M> serverData) {
try {
- serverData.getCurrentMessageStore().addPartitionMessages(
- vertexMessageMap, partitionId);
+ serverData.getCurrentMessageStore().addPartitionMessages(partitionId,
+ vertexIdMessageMap);
} catch (IOException e) {
throw new RuntimeException("doRequest: Got IOException ", e);
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java Tue Nov 27 20:01:38 2012
@@ -22,7 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -48,7 +48,7 @@ public class SendWorkerMessagesRequest<I
* are owned by a single (destination) worker. These messages are all
* destined for this worker.
* */
- private PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
+ private PairList<Integer, ByteArrayVertexIdMessages<I, M>>
partitionVertexMessages;
/**
@@ -60,10 +60,10 @@ public class SendWorkerMessagesRequest<I
* Constructor used to send request.
*
* @param partVertMsgs Map of remote partitions =>
- * ByteArrayVertexIdMessageCollection
+ * ByteArrayVertexIdMessages
*/
public SendWorkerMessagesRequest(
- PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>> partVertMsgs) {
+ PairList<Integer, ByteArrayVertexIdMessages<I, M>> partVertMsgs) {
super();
this.partitionVertexMessages = partVertMsgs;
}
@@ -72,12 +72,12 @@ public class SendWorkerMessagesRequest<I
public void readFieldsRequest(DataInput input) throws IOException {
int numPartitions = input.readInt();
partitionVertexMessages =
- new PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>();
+ new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
partitionVertexMessages.initialize(numPartitions);
while (numPartitions-- > 0) {
final int partitionId = input.readInt();
- ByteArrayVertexIdMessageCollection<I, M> vertexIdMessages =
- new ByteArrayVertexIdMessageCollection<I, M>();
+ ByteArrayVertexIdMessages<I, M> vertexIdMessages =
+ new ByteArrayVertexIdMessages<I, M>();
vertexIdMessages.setConf(getConf());
vertexIdMessages.readFields(input);
partitionVertexMessages.add(partitionId, vertexIdMessages);
@@ -87,7 +87,7 @@ public class SendWorkerMessagesRequest<I
@Override
public void writeRequest(DataOutput output) throws IOException {
output.writeInt(partitionVertexMessages.getSize());
- PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>.Iterator
+ PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
iterator = partitionVertexMessages.getIterator();
while (iterator.hasNext()) {
iterator.next();
@@ -103,14 +103,14 @@ public class SendWorkerMessagesRequest<I
@Override
public void doRequest(ServerData<I, V, E, M> serverData) {
- PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>.Iterator
+ PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
iterator = partitionVertexMessages.getIterator();
while (iterator.hasNext()) {
iterator.next();
try {
- serverData.getIncomingMessageStore()
- .addPartitionMessages(iterator.getCurrentSecond(),
- iterator.getCurrentFirst());
+ serverData.getIncomingMessageStore().
+ addPartitionMessages(iterator.getCurrentFirst(),
+ iterator.getCurrentSecond());
} catch (IOException e) {
throw new RuntimeException("doRequest: Got IOException ", e);
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java Tue Nov 27 20:01:38 2012
@@ -46,7 +46,7 @@ public class SimpleMsgVertex extends
for (IntWritable message : messages) {
sum += message.get();
}
- LOG.info("TestMsgVertex: Received a sum of " + sum +
+ LOG.info("compute: Received a sum of " + sum +
" (will stop on 306)");
if (sum == 306) {
@@ -54,7 +54,7 @@ public class SimpleMsgVertex extends
}
}
if (getSuperstep() > 3) {
- System.err.println("TestMsgVertex: Vertex 1 failed to receive " +
+ System.err.println("compute: Vertex 1 failed to receive " +
"messages in time");
voteToHalt();
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java Tue Nov 27 20:01:38 2012
@@ -17,9 +17,15 @@
*/
package org.apache.giraph.graph;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
@@ -28,22 +34,17 @@ import org.apache.giraph.comm.netty.Nett
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.metrics.GiraphMetrics;
+
+import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.SystemTime;
import org.apache.giraph.utils.Time;
-import org.apache.giraph.utils.Times;
-import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.TimedLogger;
+import org.apache.giraph.utils.Times;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-
/**
* Compute as many vertex partitions as possible. Every thread will has its
* own instance of WorkerClientRequestProcessor to send requests. Note that
@@ -189,10 +190,9 @@ public class ComputeCallable<I extends W
// Make sure every vertex has this thread's
// graphState before computing
vertex.setGraphState(graphState);
- Collection<M> messages =
+ Iterable<M> messages =
messageStore.getVertexMessages(vertex.getId());
- messageStore.clearVertexMessages(vertex.getId());
- if (vertex.isHalted() && !messages.isEmpty()) {
+ if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
vertex.wakeUp();
}
if (!vertex.isHalted()) {
@@ -209,6 +209,10 @@ public class ComputeCallable<I extends W
if (vertex.isHalted()) {
partitionStats.incrFinishedVertexCount();
}
+ // Remove the messages now that the vertex has finished computation
+ messageStore.clearVertexMessages(vertex.getId());
+
+ // Add statistics for this vertex
partitionStats.incrVertexCount();
partitionStats.addEdgeCount(vertex.getNumEdges());
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java Tue Nov 27 20:01:38 2012
@@ -98,7 +98,8 @@ public abstract class Vertex<I extends W
* Must be defined by user to do computation on a single Vertex.
*
* @param messages Messages that were sent to this vertex in the previous
- * superstep
+ * superstep. Each message is only guaranteed to have
+ * a life expectancy as long as next() is not called.
* @throws IOException
*/
public abstract void compute(Iterable<M> messages) throws IOException;