You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/06/26 01:22:34 UTC
[2/2] git commit: updated refs/heads/trunk to 8944567
GIRAPH-697: Clean up message stores (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/89445670
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/89445670
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/89445670
Branch: refs/heads/trunk
Commit: 89445670eee21e38c0dadde4753310adae14b964
Parents: f8a3c77
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Tue Jun 25 16:05:44 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Tue Jun 25 16:05:44 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/comm/ServerData.java | 23 +-
.../giraph/comm/messages/BasicMessageStore.java | 67 ---
.../ByteArrayMessagesPerVertexStore.java | 77 +---
.../comm/messages/DiskBackedMessageStore.java | 418 -------------------
.../DiskBackedMessageStoreByPartition.java | 390 -----------------
.../comm/messages/FlushableMessageStore.java | 40 --
.../messages/InMemoryMessageStoreFactory.java | 4 +-
.../giraph/comm/messages/MessageStore.java | 75 +++-
.../comm/messages/MessageStoreByPartition.java | 82 ----
.../comm/messages/MessageStoreFactory.java | 6 +-
.../giraph/comm/messages/MessagesIterable.java | 56 +++
.../comm/messages/OneMessagePerVertexStore.java | 65 +--
.../messages/SequentialFileMessageStore.java | 411 ------------------
.../comm/messages/SimpleMessageStore.java | 44 +-
.../out_of_core/DiskBackedMessageStore.java | 322 ++++++++++++++
.../PartitionDiskBackedMessageStore.java | 350 ++++++++++++++++
.../out_of_core/SequentialFileMessageStore.java | 407 ++++++++++++++++++
.../comm/messages/out_of_core/package-info.java | 21 +
.../giraph/comm/messages/package-info.java | 2 +-
.../NettyWorkerClientRequestProcessor.java | 4 +-
.../giraph/comm/netty/NettyWorkerServer.java | 21 +-
.../apache/giraph/graph/ComputeCallable.java | 6 +-
.../apache/giraph/graph/GraphTaskManager.java | 6 +-
.../apache/giraph/comm/RequestFailureTest.java | 2 +-
.../org/apache/giraph/comm/RequestTest.java | 2 +-
.../apache/giraph/comm/TestMessageStores.java | 140 +++----
27 files changed, 1335 insertions(+), 1708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 0f2758e..2a605d8 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-697: Clean up message stores (majakabiljo)
+
GIRAPH-696: Should be able to spill giraph metrics to a specified
directory on HDFS (claudio)
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 788be53..affc260 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -21,7 +21,7 @@ package org.apache.giraph.comm;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -54,21 +54,18 @@ public class ServerData<I extends WritableComparable,
/** Edge store for this worker. */
private final EdgeStore<I, V, E> edgeStore;
/** Message store factory */
- private final
- MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+ private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
messageStoreFactory;
/**
* Message store for incoming messages (messages which will be consumed
* in the next super step)
*/
- private volatile MessageStoreByPartition<I, Writable>
- incomingMessageStore;
+ private volatile MessageStore<I, Writable> incomingMessageStore;
/**
* Message store for current messages (messages which we received in
* previous super step and which will be consumed in current super step)
*/
- private volatile MessageStoreByPartition<I, Writable>
- currentMessageStore;
+ private volatile MessageStore<I, Writable> currentMessageStore;
/**
* Map of partition ids to incoming vertex mutations from other workers.
* (Synchronized access to values)
@@ -95,7 +92,7 @@ public class ServerData<I extends WritableComparable,
public ServerData(
CentralizedServiceWorker<I, V, E> service,
ImmutableClassesGiraphConfiguration<I, V, E> conf,
- MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+ MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
messageStoreFactory,
Mapper<?, ?, ?, ?>.Context context) {
this.conf = conf;
@@ -136,9 +133,8 @@ public class ServerData<I extends WritableComparable,
* @param <M> Message data
* @return Incoming message store
*/
- public <M extends Writable> MessageStoreByPartition<I, M>
- getIncomingMessageStore() {
- return (MessageStoreByPartition<I, M>) incomingMessageStore;
+ public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
+ return (MessageStore<I, M>) incomingMessageStore;
}
/**
@@ -148,9 +144,8 @@ public class ServerData<I extends WritableComparable,
* @param <M> Message data
* @return Current message store
*/
- public <M extends Writable> MessageStoreByPartition<I, M>
- getCurrentMessageStore() {
- return (MessageStoreByPartition<I, M>) currentMessageStore;
+ public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
+ return (MessageStore<I, M>) currentMessageStore;
}
/** Prepare for next super step */
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
deleted file mode 100644
index dcb6223..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Most basic message store with just add, get and clear operations
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface BasicMessageStore<I extends WritableComparable,
- M extends Writable> extends Writable {
- /**
- * Adds messages from one message store to another
- *
- * @param messageStore Add the messages from this message store to this
- * object
- * @throws java.io.IOException
- */
- void addMessages(MessageStore<I, M> messageStore) throws IOException;
-
- /**
- * Gets messages for a vertex. The lifetime of every message is only
- * guaranteed until the iterator's next() method is called. Do not re-use
- * the messages.
- *
- * @param vertexId Vertex id for which we want to get messages
- * @return Iterable of messages for a vertex id
- * @throws IOException
- */
- Iterable<M> getVertexMessages(I vertexId) throws IOException;
-
- /**
- * Clears messages for a vertex.
- *
- * @param vertexId Vertex id for which we want to clear messages
- * @throws IOException
- */
- void clearVertexMessages(I vertexId) throws IOException;
-
- /**
- * Clears all resources used by this store.
- *
- * @throws IOException
- */
- void clearAll() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 97c8a35..fecd7ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -24,7 +24,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
import org.apache.giraph.utils.RepresentativeByteArrayIterator;
import org.apache.giraph.utils.VertexIdIterator;
import org.apache.hadoop.io.Writable;
@@ -33,7 +32,6 @@ import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/**
@@ -128,33 +126,11 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
}
}
- /**
- * Special iterable that recycles the message
- */
- private class MessagesIterable extends RepresentativeByteArrayIterable<M> {
- /**
- * Constructor
- *
- * @param buf Buffer
- * @param off Offset to start in the buffer
- * @param length Length of the buffer
- */
- private MessagesIterable(byte[] buf, int off, int length) {
- super(config, buf, off, length);
- }
-
- @Override
- protected M createWritable() {
- return ReflectionUtils.newInstance(messageClass);
- }
- }
-
@Override
protected Iterable<M> getMessagesAsIterable(
ExtendedDataOutput extendedDataOutput) {
-
- return new MessagesIterable(extendedDataOutput.getByteArray(), 0,
- extendedDataOutput.getPos());
+ return new MessagesIterable<M>(config, messageClass,
+ extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
}
/**
@@ -209,9 +185,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
int byteArraySize = in.readInt();
byte[] messages = new byte[byteArraySize];
in.readFully(messages);
- ExtendedDataOutput extendedDataOutput =
- config.createExtendedDataOutput(messages, 0);
- return extendedDataOutput;
+ return config.createExtendedDataOutput(messages, 0);
}
/**
@@ -224,48 +198,12 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
* @return Factory
*/
public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+ MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
CentralizedServiceWorker<I, ?, ?> service,
ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
return new Factory<I, M>(service, config);
}
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws IOException {
- if (messageStore instanceof ByteArrayMessagesPerVertexStore) {
- ByteArrayMessagesPerVertexStore<I, M>
- byteArrayMessagesPerVertexStore =
- (ByteArrayMessagesPerVertexStore<I, M>) messageStore;
- for (Map.Entry<Integer, ConcurrentMap<I, ExtendedDataOutput>>
- partitionEntry : byteArrayMessagesPerVertexStore.map.entrySet()) {
- for (Map.Entry<I, ExtendedDataOutput> vertexEntry :
- partitionEntry.getValue().entrySet()) {
- ConcurrentMap<I, ExtendedDataOutput> partitionMap =
- getOrCreatePartitionMap(partitionEntry.getKey());
- ExtendedDataOutput extendedDataOutput =
- partitionMap.get(vertexEntry.getKey());
- if (extendedDataOutput == null) {
- ExtendedDataOutput newExtendedDataOutput =
- config.createExtendedDataOutput();
- extendedDataOutput =
- partitionMap.putIfAbsent(vertexEntry.getKey(),
- newExtendedDataOutput);
- if (extendedDataOutput == null) {
- extendedDataOutput = newExtendedDataOutput;
- }
- }
-
- // Add the messages
- extendedDataOutput.write(vertexEntry.getValue().getByteArray(), 0,
- vertexEntry.getValue().getPos());
- }
- }
- } else {
- throw new IllegalArgumentException("addMessages: Illegal argument " +
- messageStore.getClass());
- }
- }
-
/**
* Factory for {@link ByteArrayMessagesPerVertexStore}
*
@@ -273,7 +211,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
* @param <M> Message data
*/
private static class Factory<I extends WritableComparable, M extends Writable>
- implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+ implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Service worker */
private final CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
@@ -290,8 +228,9 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
}
@Override
- public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
- return new ByteArrayMessagesPerVertexStore(messageClass, service, config);
+ public MessageStore<I, M> newStore(Class<M> messageClass) {
+ return new ByteArrayMessagesPerVertexStore<I, M>(
+ messageClass, service, config);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
deleted file mode 100644
index 2712edd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message storage with in-memory map of messages and with support for
- * flushing all the messages to the disk.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class DiskBackedMessageStore<I extends WritableComparable,
- M extends Writable> implements FlushableMessageStore<I, M> {
- /** Message class */
- private final Class<M> messageClass;
- /**
- * In-memory message map (must be sorted to insure that the ids are
- * ordered)
- */
- private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
- inMemoryMessages;
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
- /** Counter for number of messages in-memory */
- private final AtomicInteger numberOfMessagesInMemory;
- /** To keep vertex ids which we have messages for */
- private final Set<I> destinationVertices;
- /** File stores in which we keep flushed messages */
- private final Collection<BasicMessageStore<I, M>> fileStores;
- /** Factory for creating file stores when flushing */
- private final
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
- /** Lock for disk flushing */
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
-
- /**
- * Constructor.
- *
- * @param messageClass Message class held in the store
- * @param config Hadoop configuration
- * @param fileStoreFactory Factory for creating file stores when flushing
- */
- public DiskBackedMessageStore(
- Class<M> messageClass,
- ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
- inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
- this.messageClass = messageClass;
- this.config = config;
- numberOfMessagesInMemory = new AtomicInteger(0);
- destinationVertices =
- Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
- fileStores = Lists.newArrayList();
- this.fileStoreFactory = fileStoreFactory;
- }
-
- /**
- * Add vertex messages
- *
- * @param vertexId Vertex id to use
- * @param messages Messages to add (note that the lifetime of the messages)
- * is only until next() is called again)
- * @return True if the vertex id ownership is taken by this method,
- * false otherwise
- * @throws IOException
- */
- boolean addVertexMessages(I vertexId,
- Iterable<M> messages) throws IOException {
- boolean ownsVertexId = false;
- destinationVertices.add(vertexId);
- rwLock.readLock().lock();
- try {
- ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
- if (extendedDataOutput == null) {
- ExtendedDataOutput newExtendedDataOutput =
- config.createExtendedDataOutput();
- extendedDataOutput =
- inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
- if (extendedDataOutput == null) {
- ownsVertexId = true;
- extendedDataOutput = newExtendedDataOutput;
- }
- }
-
- synchronized (extendedDataOutput) {
- for (M message : messages) {
- message.write(extendedDataOutput);
- numberOfMessagesInMemory.getAndIncrement();
- }
- }
- } finally {
- rwLock.readLock().unlock();
- }
-
- return ownsVertexId;
- }
-
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws
- IOException {
- for (I destinationVertex : messageStore.getDestinationVertices()) {
- addVertexMessages(destinationVertex,
- messageStore.getVertexMessages(destinationVertex));
- }
- }
-
- /**
- * Special iterable that recycles the message
- */
- private class MessageIterable extends RepresentativeByteArrayIterable<M> {
- /**
- * Constructor
- *
- * @param buf Buffer
- * @param off Offset to start in the buffer
- * @param length Length of the buffer
- */
- public MessageIterable(
- byte[] buf, int off, int length) {
- super(config, buf, off, length);
- }
-
- @Override
- protected M createWritable() {
- return ReflectionUtils.newInstance(messageClass);
- }
- }
-
- @Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
- ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
- if (extendedDataOutput == null) {
- extendedDataOutput = config.createExtendedDataOutput();
- }
- Iterable<M> combinedIterable = new MessageIterable(
- extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
-
- for (BasicMessageStore<I, M> fileStore : fileStores) {
- combinedIterable = Iterables.concat(combinedIterable,
- fileStore.getVertexMessages(vertexId));
- }
- return combinedIterable;
- }
-
- @Override
- public int getNumberOfMessages() {
- return numberOfMessagesInMemory.get();
- }
-
- @Override
- public boolean hasMessagesForVertex(I vertexId) {
- return destinationVertices.contains(vertexId);
- }
-
- @Override
- public Iterable<I> getDestinationVertices() {
- return destinationVertices;
- }
-
- @Override
- public void clearVertexMessages(I vertexId) throws IOException {
- inMemoryMessages.remove(vertexId);
- }
-
- @Override
- public void clearAll() throws IOException {
- inMemoryMessages.clear();
- destinationVertices.clear();
- for (BasicMessageStore<I, M> fileStore : fileStores) {
- fileStore.clearAll();
- }
- fileStores.clear();
- }
-
- /**
- * Special temporary message store for passing along in-memory messages
- */
- private class TemporaryMessageStore implements MessageStore<I, M> {
- /**
- * In-memory message map (must be sorted to insure that the ids are
- * ordered)
- */
- private final ConcurrentNavigableMap<I, ExtendedDataOutput>
- temporaryMessages;
-
- /**
- * Constructor.
- *
- * @param temporaryMessages Messages to be owned by this object
- */
- private TemporaryMessageStore(
- ConcurrentNavigableMap<I, ExtendedDataOutput>
- temporaryMessages) {
- this.temporaryMessages = temporaryMessages;
- }
-
- @Override
- public int getNumberOfMessages() {
- throw new IllegalAccessError("getNumberOfMessages: Not supported");
- }
-
- @Override
- public boolean hasMessagesForVertex(I vertexId) {
- return temporaryMessages.containsKey(vertexId);
- }
-
- @Override
- public Iterable<I> getDestinationVertices() {
- return temporaryMessages.keySet();
- }
-
- @Override
- public void addMessages(MessageStore<I, M> messageStore)
- throws IOException {
- throw new IllegalAccessError("addMessages: Not supported");
- }
-
- @Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
- ExtendedDataOutput extendedDataOutput = temporaryMessages.get(vertexId);
- if (extendedDataOutput == null) {
- extendedDataOutput = config.createExtendedDataOutput();
- }
- return new MessageIterable(extendedDataOutput.getByteArray(), 0,
- extendedDataOutput.getPos());
- }
-
- @Override
- public void clearVertexMessages(I vertexId) throws IOException {
- temporaryMessages.remove(vertexId);
- }
-
- @Override
- public void clearAll() throws IOException {
- temporaryMessages.clear();
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- throw new IllegalAccessError("write: Not supported");
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- throw new IllegalAccessError("readFields: Not supported");
- }
- }
-
- @Override
- public void flush() throws IOException {
- ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
- rwLock.writeLock().lock();
- try {
- messagesToFlush = inMemoryMessages;
- inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
- numberOfMessagesInMemory.set(0);
- } finally {
- rwLock.writeLock().unlock();
- }
- BasicMessageStore<I, M> fileStore =
- fileStoreFactory.newStore(messageClass);
- fileStore.addMessages(new TemporaryMessageStore(messagesToFlush));
-
- synchronized (fileStores) {
- fileStores.add(fileStore);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // write destination vertices
- out.writeInt(destinationVertices.size());
- for (I vertexId : destinationVertices) {
- vertexId.write(out);
- }
-
- // write of in-memory messages
- out.writeInt(numberOfMessagesInMemory.get());
-
- // write in-memory messages map
- out.writeInt(inMemoryMessages.size());
- for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
- entry.getKey().write(out);
- out.writeInt(entry.getValue().getPos());
- out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
- }
-
- // write file stores
- out.writeInt(fileStores.size());
- for (BasicMessageStore<I, M> fileStore : fileStores) {
- fileStore.write(out);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- // read destination vertices
- int numVertices = in.readInt();
- for (int v = 0; v < numVertices; v++) {
- I vertexId = (I) config.createVertexId();
- vertexId.readFields(in);
- destinationVertices.add(vertexId);
- }
-
- // read in-memory messages
- numberOfMessagesInMemory.set(in.readInt());
-
- // read in-memory map
- int mapSize = in.readInt();
- for (int m = 0; m < mapSize; m++) {
- I vertexId = config.createVertexId();
- vertexId.readFields(in);
- int messageBytes = in.readInt();
- byte[] buf = new byte[messageBytes];
- ExtendedDataOutput extendedDataOutput =
- config.createExtendedDataOutput(buf, messageBytes);
- inMemoryMessages.put(vertexId, extendedDataOutput);
- }
-
- // read file stores
- int numFileStores = in.readInt();
- for (int s = 0; s < numFileStores; s++) {
- BasicMessageStore<I, M> fileStore =
- fileStoreFactory.newStore(messageClass);
- fileStore.readFields(in);
- fileStores.add(fileStore);
- }
- }
-
-
- /**
- * Create new factory for this message store
- *
- * @param config Hadoop configuration
- * @param fileStoreFactory Factory for creating message stores for
- * partitions
- * @param <I> Vertex id
- * @param <M> Message data
- * @return Factory
- */
- public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(
- ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
- return new Factory<I, M>(config, fileStoreFactory);
- }
-
- /**
- * Factory for {@link DiskBackedMessageStore}
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
- private static class Factory<I extends WritableComparable,
- M extends Writable> implements MessageStoreFactory<I, M,
- FlushableMessageStore<I, M>> {
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration config;
- /** Factory for creating message stores for partitions */
- private final
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
-
- /**
- * @param config Hadoop configuration
- * @param fileStoreFactory Factory for creating message stores for
- * partitions
- */
- public Factory(ImmutableClassesGiraphConfiguration config,
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
- this.config = config;
- this.fileStoreFactory = fileStoreFactory;
- }
-
- @Override
- public FlushableMessageStore<I, M> newStore(Class<M> messageClass) {
- return new DiskBackedMessageStore<I, M>(messageClass, config,
- fileStoreFactory);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
deleted file mode 100644
index 4a28949..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Message store which separates data by partitions,
- * and submits them to underlying message store.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- MessageStoreByPartition<I, M> {
- /** Message class */
- private final Class<M> messageClass;
- /** Service worker */
- private final CentralizedServiceWorker<I, V, E> service;
- /** Number of messages to keep in memory */
- private final int maxNumberOfMessagesInMemory;
- /** Factory for creating file stores when flushing */
- private final
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
- /** Map from partition id to its message store */
- private final
- ConcurrentMap<Integer, FlushableMessageStore<I, M>> partitionMessageStores;
- /**
- * @param messageClass Message class held in the store
- * @param service Service worker
- * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
- * @param fileStoreFactory Factory for creating file stores
- * when flushing
- */
- public DiskBackedMessageStoreByPartition(
- Class<M> messageClass,
- CentralizedServiceWorker<I, V, E> service,
- int maxNumberOfMessagesInMemory,
- MessageStoreFactory<I, M, FlushableMessageStore<I,
- M>> fileStoreFactory) {
- this.messageClass = messageClass;
- this.service = service;
- this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
- this.fileStoreFactory = fileStoreFactory;
- partitionMessageStores = Maps.newConcurrentMap();
- }
-
- @Override
- public void addPartitionMessages(
- int partitionId,
- ByteArrayVertexIdMessages<I, M> messages) throws IOException {
- FlushableMessageStore<I, M> flushableMessageStore =
- getMessageStore(partitionId);
- if (flushableMessageStore instanceof DiskBackedMessageStore) {
- DiskBackedMessageStore<I, M> diskBackedMessageStore =
- (DiskBackedMessageStore<I, M>) flushableMessageStore;
- ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
- vertexIdMessageIterator =
- messages.getVertexIdMessageIterator();
- while (vertexIdMessageIterator.hasNext()) {
- vertexIdMessageIterator.next();
- boolean ownsVertexId =
- diskBackedMessageStore.addVertexMessages(
- vertexIdMessageIterator.getCurrentVertexId(),
- Collections.singleton(
- vertexIdMessageIterator.getCurrentMessage()));
- if (ownsVertexId) {
- vertexIdMessageIterator.releaseCurrentVertexId();
- }
- }
- } else {
- throw new IllegalStateException("addPartitionMessages: Doesn't support " +
- "class " + flushableMessageStore.getClass());
- }
- checkMemory();
- }
-
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws IOException {
- for (I destinationVertex : messageStore.getDestinationVertices()) {
- FlushableMessageStore<I, M> flushableMessageStore =
- getMessageStore(destinationVertex);
- if (flushableMessageStore instanceof DiskBackedMessageStore) {
- DiskBackedMessageStore<I, M> diskBackedMessageStore =
- (DiskBackedMessageStore<I, M>) flushableMessageStore;
- Iterable<M> messages =
- messageStore.getVertexMessages(destinationVertex);
- diskBackedMessageStore.addVertexMessages(destinationVertex, messages);
- } else {
- throw new IllegalStateException("addMessages: Doesn't support " +
- "class " + flushableMessageStore.getClass());
- }
- }
- checkMemory();
- }
-
- @Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
- if (hasMessagesForVertex(vertexId)) {
- return getMessageStore(vertexId).getVertexMessages(vertexId);
- } else {
- return EmptyIterable.get();
- }
- }
-
- @Override
- public int getNumberOfMessages() {
- int numOfMessages = 0;
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- numOfMessages += messageStore.getNumberOfMessages();
- }
- return numOfMessages;
- }
-
- @Override
- public boolean hasMessagesForVertex(I vertexId) {
- return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
- }
-
- @Override
- public Iterable<I> getDestinationVertices() {
- List<I> vertices = Lists.newArrayList();
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- Iterables.addAll(vertices, messageStore.getDestinationVertices());
- }
- return vertices;
- }
-
- @Override
- public Iterable<I> getPartitionDestinationVertices(int partitionId) {
- FlushableMessageStore<I, M> messageStore =
- partitionMessageStores.get(partitionId);
- if (messageStore == null) {
- return Collections.emptyList();
- } else {
- return messageStore.getDestinationVertices();
- }
- }
-
- @Override
- public void clearVertexMessages(I vertexId) throws IOException {
- if (hasMessagesForVertex(vertexId)) {
- getMessageStore(vertexId).clearVertexMessages(vertexId);
- }
- }
-
- @Override
- public void clearPartition(int partitionId) throws IOException {
- FlushableMessageStore<I, M> messageStore =
- partitionMessageStores.get(partitionId);
- if (messageStore != null) {
- messageStore.clearAll();
- }
- }
-
- @Override
- public void clearAll() throws IOException {
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- messageStore.clearAll();
- }
- partitionMessageStores.clear();
- }
-
- /**
- * Checks the memory status, flushes if necessary
- *
- * @throws IOException
- */
- private void checkMemory() throws IOException {
- while (memoryFull()) {
- flushOnePartition();
- }
- }
-
- /**
- * Check if memory is full
- *
- * @return True iff memory is full
- */
- private boolean memoryFull() {
- int totalMessages = 0;
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- totalMessages += messageStore.getNumberOfMessages();
- }
- return totalMessages > maxNumberOfMessagesInMemory;
- }
-
- /**
- * Finds biggest partition and flushes it to the disk
- *
- * @throws IOException
- */
- private void flushOnePartition() throws IOException {
- int maxMessages = 0;
- FlushableMessageStore<I, M> biggestStore = null;
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- int numMessages = messageStore.getNumberOfMessages();
- if (numMessages > maxMessages) {
- maxMessages = numMessages;
- biggestStore = messageStore;
- }
- }
- if (biggestStore != null) {
- biggestStore.flush();
- }
- }
-
- /**
- * Get message store for partition which holds vertex with required vertex
- * id
- *
- * @param vertexId Id of vertex for which we are asking for message store
- * @return Requested message store
- */
- private FlushableMessageStore<I, M> getMessageStore(I vertexId) {
- int partitionId =
- service.getVertexPartitionOwner(vertexId).getPartitionId();
- return getMessageStore(partitionId);
- }
-
- /**
- * Get message store for partition id. It it doesn't exist yet,
- * creates a new one.
- *
- * @param partitionId Id of partition for which we are asking for message
- * store
- * @return Requested message store
- */
- private FlushableMessageStore<I, M> getMessageStore(int partitionId) {
- FlushableMessageStore<I, M> messageStore =
- partitionMessageStores.get(partitionId);
- if (messageStore != null) {
- return messageStore;
- }
- messageStore = fileStoreFactory.newStore(messageClass);
- FlushableMessageStore<I, M> store =
- partitionMessageStores.putIfAbsent(partitionId, messageStore);
- return (store == null) ? messageStore : store;
- }
-
- @Override
- public void writePartition(DataOutput out,
- int partitionId) throws IOException {
- FlushableMessageStore<I, M> partitionStore =
- partitionMessageStores.get(partitionId);
- out.writeBoolean(partitionStore != null);
- if (partitionStore != null) {
- partitionStore.write(out);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(partitionMessageStores.size());
- for (Entry<Integer, FlushableMessageStore<I, M>> entry :
- partitionMessageStores.entrySet()) {
- out.writeInt(entry.getKey());
- entry.getValue().write(out);
- }
- }
-
- @Override
- public void readFieldsForPartition(DataInput in,
- int partitionId) throws IOException {
- if (in.readBoolean()) {
- FlushableMessageStore<I, M> messageStore =
- fileStoreFactory.newStore(messageClass);
- messageStore.readFields(in);
- partitionMessageStores.put(partitionId, messageStore);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int numStores = in.readInt();
- for (int s = 0; s < numStores; s++) {
- int partitionId = in.readInt();
- FlushableMessageStore<I, M> messageStore =
- fileStoreFactory.newStore(messageClass);
- messageStore.readFields(in);
- partitionMessageStores.put(partitionId, messageStore);
- }
- }
-
-
- /**
- * Create new factory for this message store
- *
- * @param service Service worker
- * @param maxMessagesInMemory Number of messages to keep in memory
- * @param fileStoreFactory Factory for creating file stores when
- * flushing
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @return Factory
- */
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
- CentralizedServiceWorker<I, V, E> service,
- int maxMessagesInMemory,
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
- fileStoreFactory) {
- return new Factory<I, V, E, M>(service, maxMessagesInMemory,
- fileStoreFactory);
- }
-
- /**
- * Factory for {@link DiskBackedMessageStoreByPartition}
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
- private static class Factory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
- /** Service worker */
- private final CentralizedServiceWorker<I, V, E> service;
- /** Number of messages to keep in memory */
- private final int maxMessagesInMemory;
- /** Factory for creating file stores when flushing */
- private final
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
-
- /**
- * @param service Service worker
- * @param maxMessagesInMemory Number of messages to keep in memory
- * @param fileStoreFactory Factory for creating file stores when
- * flushing
- */
- public Factory(CentralizedServiceWorker<I, V, E> service,
- int maxMessagesInMemory,
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
- fileStoreFactory) {
- this.service = service;
- this.maxMessagesInMemory = maxMessagesInMemory;
- this.fileStoreFactory = fileStoreFactory;
- }
-
- @Override
- public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
- return new DiskBackedMessageStoreByPartition<I, V, E, M>(messageClass,
- service, maxMessagesInMemory, fileStoreFactory);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
deleted file mode 100644
index 6e7fe55..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-
-/**
- * Message stores which has flush operation
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface FlushableMessageStore<I extends WritableComparable,
- M extends Writable> extends MessageStore<I, M> {
- /**
- * Flushes messages to the disk.
- *
- * @throws IOException
- */
- void flush() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 9086d78..ba8a005 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
*/
public class InMemoryMessageStoreFactory<I extends WritableComparable,
M extends Writable>
- implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+ implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(InMemoryMessageStoreFactory.class);
@@ -56,7 +56,7 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
}
@Override
- public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+ public MessageStore<I, M> newStore(Class<M> messageClass) {
if (conf.useCombiner()) {
if (LOG.isInfoEnabled()) {
LOG.info("newStore: " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index a6f174d..2af7642 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -18,6 +18,10 @@
package org.apache.giraph.comm.messages;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -28,13 +32,32 @@ import org.apache.hadoop.io.WritableComparable;
* @param <M> Message data
*/
public interface MessageStore<I extends WritableComparable,
- M extends Writable> extends BasicMessageStore<I, M> {
+ M extends Writable> {
/**
- * Get number of messages in memory
+ * Gets messages for a vertex. The lifetime of every message is only
+ * guaranteed until the iterator's next() method is called. Do not hold
+ * references to objects returned by this iterator.
*
- * @return Number of messages in memory
+ * @param vertexId Vertex id for which we want to get messages
+ * @return Iterable of messages for a vertex id
+ * @throws java.io.IOException
*/
- int getNumberOfMessages();
+ Iterable<M> getVertexMessages(I vertexId) throws IOException;
+
+ /**
+ * Clears messages for a vertex.
+ *
+ * @param vertexId Vertex id for which we want to clear messages
+ * @throws IOException
+ */
+ void clearVertexMessages(I vertexId) throws IOException;
+
+ /**
+ * Clears all resources used by this store.
+ *
+ * @throws IOException
+ */
+ void clearAll() throws IOException;
/**
* Check if we have messages for some vertex
@@ -45,9 +68,49 @@ public interface MessageStore<I extends WritableComparable,
boolean hasMessagesForVertex(I vertexId);
/**
- * Gets vertex ids which we have messages for
+ * Adds messages for partition
+ *
+ * @param partitionId Id of partition
+ * @param messages Collection of vertex ids and messages we want to add
+ * @throws IOException
+ */
+ void addPartitionMessages(
+ int partitionId, ByteArrayVertexIdMessages<I, M> messages)
+ throws IOException;
+
+ /**
+ * Gets vertex ids from selected partition which we have messages for
*
+ * @param partitionId Id of partition
* @return Iterable over vertex ids which we have messages for
*/
- Iterable<I> getDestinationVertices();
+ Iterable<I> getPartitionDestinationVertices(int partitionId);
+
+ /**
+ * Clears messages for a partition.
+ *
+ * @param partitionId Partition id for which we want to clear messages
+ * @throws IOException
+ */
+ void clearPartition(int partitionId) throws IOException;
+
+ /**
+ * Serialize messages for one partition.
+ *
+ * @param out {@link DataOutput} to serialize this object into
+ * @param partitionId Id of partition
+ * @throws IOException
+ */
+ void writePartition(DataOutput out, int partitionId) throws IOException;
+
+ /**
+ * Deserialize messages for one partition
+ *
+ * @param in {@link DataInput} to deserialize this object
+ * from.
+ * @param partitionId Id of partition
+ * @throws IOException
+ */
+ void readFieldsForPartition(DataInput in,
+ int partitionId) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
deleted file mode 100644
index d2143c6..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message store which stores data by partition
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface MessageStoreByPartition<I extends WritableComparable,
- M extends Writable> extends MessageStore<I, M> {
- /**
- * Adds messages for partition
- *
- * @param partitionId Id of partition
- * @param messages Collection of vertex ids and messages we want to add
- * @throws IOException
- */
- void addPartitionMessages(
- int partitionId, ByteArrayVertexIdMessages<I, M> messages)
- throws IOException;
-
- /**
- * Gets vertex ids from selected partition which we have messages for
- *
- * @param partitionId Id of partition
- * @return Iterable over vertex ids which we have messages for
- */
- Iterable<I> getPartitionDestinationVertices(int partitionId);
-
- /**
- * Clears messages for a partition.
- *
- * @param partitionId Partition id for which we want to clear messages
- * @throws IOException
- */
- void clearPartition(int partitionId) throws IOException;
-
- /**
- * Serialize messages for one partition.
- *
- * @param out {@link DataOutput} to serialize this object into
- * @param partitionId Id of partition
- * @throws IOException
- */
- void writePartition(DataOutput out, int partitionId) throws IOException;
-
- /**
- * Deserialize messages for one partition
- *
- * @param in {@link DataInput} to deserialize this object
- * from.
- * @param partitionId Id of partition
- * @throws IOException
- */
- void readFieldsForPartition(DataInput in,
- int partitionId) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index dec9a92..aa6a63e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.io.WritableComparable;
*
* @param <I> Vertex id
* @param <M> Message data
- * @param <S> Message store
+ * @param <MS> Message store
*/
public interface MessageStoreFactory<I extends WritableComparable,
- M extends Writable, S extends BasicMessageStore<I, M>> {
+ M extends Writable, MS> {
/**
* Creates new message store.
*
@@ -39,5 +39,5 @@ public interface MessageStoreFactory<I extends WritableComparable,
* @param messageClass Message class held in the store
* @return New message store
*/
- S newStore(Class<M> messageClass);
+ MS newStore(Class<M> messageClass);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
new file mode 100644
index 0000000..970d76d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special iterable that recycles the message
+ *
+ * @param <M> Message data
+ */
+public class MessagesIterable<M extends Writable>
+ extends RepresentativeByteArrayIterable<M> {
+ /** Message class */
+ private final Class<M> messageClass;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ * @param messageClass Message class
+ * @param buf Buffer
+ * @param off Offset to start in the buffer
+ * @param length Length of the buffer
+ */
+ public MessagesIterable(
+ ImmutableClassesGiraphConfiguration conf, Class<M> messageClass,
+ byte[] buf, int off, int length) {
+ super(conf, buf, off, length);
+ this.messageClass = messageClass;
+ }
+
+ @Override
+ protected M createWritable() {
+ return ReflectionUtils.newInstance(messageClass);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 8710dac..f18af5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -22,7 +22,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
-import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -60,44 +59,6 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
this.combiner = combiner;
}
- /**
- * If there is already a message related to the vertex id in the
- * partition map return that message, otherwise create a new one,
- * put it in the map and return it
- *
- * @param vertexId Id of vertex
- * @param partitionMap Partition map
- * @return Message for this vertex
- */
- private M getOrCreateCurrentMessage(I vertexId,
- ConcurrentMap<I, M> partitionMap) {
- M currentMessage = partitionMap.get(vertexId);
- if (currentMessage == null) {
- M newMessage = combiner.createInitialMessage();
- currentMessage = partitionMap.putIfAbsent(vertexId, newMessage);
- if (currentMessage == null) {
- currentMessage = newMessage;
- }
- }
- return currentMessage;
- }
-
- /**
- * Add a single message for vertex to a partition map
- *
- * @param vertexId Id of vertex which received message
- * @param message Message to add
- * @param partitionMap Partition map to add the message to
- * @throws IOException
- */
- private void addVertexMessageToPartition(I vertexId, M message,
- ConcurrentMap<I, M> partitionMap) throws IOException {
- M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
- synchronized (currentMessage) {
- combiner.combine(vertexId, currentMessage, message);
- }
- }
-
@Override
public void addPartitionMessages(
int partitionId,
@@ -149,26 +110,6 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
return message;
}
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws IOException {
- if (messageStore instanceof OneMessagePerVertexStore) {
- OneMessagePerVertexStore<I, M> oneMessagePerVertexStore =
- (OneMessagePerVertexStore<I, M>) messageStore;
- for (Map.Entry<Integer, ConcurrentMap<I, M>>
- partitionEntry : oneMessagePerVertexStore.map.entrySet()) {
- ConcurrentMap<I, M> partitionMap =
- getOrCreatePartitionMap(partitionEntry.getKey());
- for (Map.Entry<I, M> vertexEntry :
- partitionEntry.getValue().entrySet()) {
- addVertexMessageToPartition(vertexEntry.getKey(),
- vertexEntry.getValue(), partitionMap);
- }
- }
- } else {
- throw new IllegalArgumentException("addMessages: Illegal argument " +
- messageStore.getClass());
- }
- }
/**
* Create new factory for this message store
@@ -180,7 +121,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
* @return Factory
*/
public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+ MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
CentralizedServiceWorker<I, ?, ?> service,
ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
return new Factory<I, M>(service, config);
@@ -194,7 +135,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
*/
private static class Factory<I extends WritableComparable,
M extends Writable>
- implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+ implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Service worker */
private final CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
@@ -211,7 +152,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
}
@Override
- public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+ public MessageStore<I, M> newStore(Class<M> messageClass) {
return new OneMessagePerVertexStore<I, M>(messageClass, service,
config.<M>createCombiner(), config);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
deleted file mode 100644
index 23bbbc5..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
-
-/**
- * Used for writing and reading collection of messages to the disk.
- * {@link SequentialFileMessageStore#addMessages(MessageStore)}
- * should be called only once with the messages we want to store.
- * <p/>
- * It's optimized for retrieving messages in the natural order of vertex ids
- * they are sent to.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class SequentialFileMessageStore<I extends WritableComparable,
- M extends Writable> implements BasicMessageStore<I, M> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SequentialFileMessageStore.class);
- /** Message class */
- private final Class<M> messageClass;
- /** File in which we store data */
- private final File file;
- /** Configuration which we need for reading data */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
- /** Buffer size to use when reading and writing files */
- private final int bufferSize;
- /** File input stream */
- private DataInputStream in;
- /** How many vertices do we have left to read in the file */
- private int verticesLeft;
- /** Id of currently read vertex */
- private I currentVertexId;
-
- /**
- * Stores message on the disk.
- *
- * @param messageClass Message class held in the store
- * @param config Configuration used later for reading
- * @param bufferSize Buffer size to use when reading and writing
- * @param fileName File in which we want to store messages
- * @throws IOException
- */
- public SequentialFileMessageStore(
- Class<M> messageClass,
- ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- int bufferSize,
- String fileName) {
- this.messageClass = messageClass;
- this.config = config;
- this.bufferSize = bufferSize;
- file = new File(fileName);
- }
-
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws IOException {
- // Writes messages to its file
- if (file.exists()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: Deleting " + file);
- }
- file.delete();
- }
- file.createNewFile();
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: Creating " + file);
- }
-
- DataOutputStream out = null;
-
- try {
- out = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file), bufferSize));
- int destinationVertexIdCount =
- Iterables.size(messageStore.getDestinationVertices());
- out.writeInt(destinationVertexIdCount);
-
- // Since the message store messages might not be sorted, sort them if
- // necessary
- SortedSet<I> sortedSet;
- if (messageStore.getDestinationVertices() instanceof SortedSet) {
- sortedSet = (SortedSet<I>) messageStore.getDestinationVertices();
- } else {
- sortedSet =
- Sets.newTreeSet(messageStore.getDestinationVertices());
- for (I destinationVertexId : messageStore.getDestinationVertices()) {
- sortedSet.add(destinationVertexId);
- }
- }
-
- // Dump the vertices and their messages in a sorted order
- for (I destinationVertexId : sortedSet) {
- destinationVertexId.write(out);
- Iterable<M> messages =
- messageStore.getVertexMessages(destinationVertexId);
- int messageCount = Iterables.size(messages);
- out.writeInt(messageCount);
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: For vertex id " + destinationVertexId +
- ", messages = " + messageCount + " to file " + file);
- }
- for (M message : messages) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: Wrote " + message + " to " + file);
- }
- message.write(out);
- }
- }
- } finally {
- if (out != null) {
- out.close();
- }
- }
- }
-
- /**
- * Reads messages for a vertex. It will find the messages only if all
- * previous reads used smaller vertex ids than this one - messages should
- * be retrieved in increasing order of vertex ids.
- *
- * @param vertexId Vertex id for which we want to get messages
- * @return Messages for the selected vertex, or empty list if not used
- * correctly
- * @throws IOException
- */
- @Override
- public Iterable<M> getVertexMessages(I vertexId) throws
- IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
- " (currently " + currentVertexId + ") from " + file);
- }
- if (in == null) {
- startReading();
- }
-
- I nextVertexId = getCurrentVertexId();
- while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
- nextVertexId = getNextVertexId();
- }
-
- if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
- return EmptyIterable.get();
- }
-
- return readMessagesForCurrentVertex();
- }
-
- @Override
- public void clearVertexMessages(I vertexId) throws IOException { }
-
- @Override
- public void clearAll() throws IOException {
- endReading();
- file.delete();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(file.length());
- FileInputStream input = new FileInputStream(file);
- try {
- byte[] buffer = new byte[bufferSize];
- while (true) {
- int length = input.read(buffer);
- if (length < 0) {
- break;
- }
- out.write(buffer, 0, length);
- }
- } finally {
- input.close();
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- FileOutputStream output = new FileOutputStream(file);
- try {
- long fileLength = in.readLong();
- byte[] buffer = new byte[bufferSize];
- for (long position = 0; position < fileLength; position += bufferSize) {
- int bytes = (int) Math.min(bufferSize, fileLength - position);
- in.readFully(buffer, 0, bytes);
- output.write(buffer);
- }
- } finally {
- output.close();
- }
- }
-
- /**
- * Prepare for reading
- *
- * @throws IOException
- */
- private void startReading() throws IOException {
- currentVertexId = null;
- in = new DataInputStream(
- new BufferedInputStream(new FileInputStream(file), bufferSize));
- verticesLeft = in.readInt();
- if (LOG.isDebugEnabled()) {
- LOG.debug("startReading: File " + file + " with " +
- verticesLeft + " vertices left");
- }
- }
-
- /**
- * Gets current vertex id.
- * <p/>
- * If there is a vertex id whose messages haven't been read yet it
- * will return that vertex id, otherwise it will read and return the next
- * one.
- *
- * @return Current vertex id
- * @throws IOException
- */
- private I getCurrentVertexId() throws IOException {
- if (currentVertexId != null) {
- return currentVertexId;
- } else {
- return getNextVertexId();
- }
- }
-
- /**
- * Gets next vertex id.
- * <p/>
- * If there is a vertex whose messages haven't been read yet it
- * will read and skip over its messages to get to the next vertex.
- *
- * @return Next vertex id
- * @throws IOException
- */
- private I getNextVertexId() throws IOException {
- if (currentVertexId != null) {
- readMessagesForCurrentVertex();
- }
- if (verticesLeft == 0) {
- return null;
- }
- currentVertexId = config.createVertexId();
- currentVertexId.readFields(in);
- return currentVertexId;
- }
-
- /**
- * Reads messages for current vertex.
- *
- * @return Messages for current vertex
- * @throws IOException
- */
- private Collection<M> readMessagesForCurrentVertex() throws IOException {
- int messagesSize = in.readInt();
- List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
- for (int i = 0; i < messagesSize; i++) {
- M message = ReflectionUtils.newInstance(messageClass);
- try {
- message.readFields(in);
- } catch (IOException e) {
- throw new IllegalStateException("readMessagesForCurrentVertex: " +
- "Failed to read message from " + i + " of " +
- messagesSize + " for vertex id " + currentVertexId + " from " +
- file, e);
- }
- messages.add(message);
- }
- currentVertexDone();
- return messages;
- }
-
- /**
- * Release current vertex.
- *
- * @throws IOException
- */
- private void currentVertexDone() throws IOException {
- currentVertexId = null;
- verticesLeft--;
- if (verticesLeft == 0) {
- endReading();
- }
- }
-
- /**
- * Call when we are done reading, for closing files.
- *
- * @throws IOException
- */
- private void endReading() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("endReading: Stopped reading " + file);
- }
- if (in != null) {
- in.close();
- in = null;
- }
- }
-
- /**
- * Create new factory for this message store
- *
- * @param config Hadoop configuration
- * @param <I> Vertex id
- * @param <M> Message data
- * @return Factory
- */
- public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> newFactory(
- ImmutableClassesGiraphConfiguration config) {
- return new Factory<I, M>(config);
- }
-
- /**
- * Factory for {@link SequentialFileMessageStore}
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
- private static class Factory<I extends WritableComparable,
- M extends Writable>
- implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> {
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration config;
- /** Directories in which we'll keep necessary files */
- private final String[] directories;
- /** Buffer size to use when reading and writing */
- private final int bufferSize;
- /** Counter for created message stores */
- private final AtomicInteger storeCounter;
-
- /**
- * Constructor.
- *
- * @param config Hadoop configuration
- */
- public Factory(ImmutableClassesGiraphConfiguration config) {
- this.config = config;
- String jobId = config.get("mapred.job.id", "Unknown Job");
- int taskId = config.getTaskPartition();
- List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
- Collections.shuffle(userPaths);
- directories = new String[userPaths.size()];
- int i = 0;
- for (String path : userPaths) {
- String directory = path + File.separator + jobId + File.separator +
- taskId + File.separator;
- directories[i++] = directory;
- new File(directory).mkdirs();
- }
- this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
- storeCounter = new AtomicInteger();
- }
-
- @Override
- public BasicMessageStore<I, M> newStore(Class<M> messageClass) {
- int idx = Math.abs(storeCounter.getAndIncrement());
- String fileName =
- directories[idx % directories.length] + "messages-" + idx;
- return new SequentialFileMessageStore<I, M>(messageClass, config,
- bufferSize, fileName);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 1a91dfb..c9d869c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -18,14 +18,12 @@
package org.apache.giraph.comm.messages;
-import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -34,7 +32,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
- * Abstract class for {@link MessageStoreByPartition} which allows any kind
+ * Abstract class for {@link MessageStore} which allows any kind
* of object to hold messages for one vertex.
* Simple in memory message store implemented with a two level concurrent
* hash map.
@@ -44,7 +42,7 @@ import org.apache.hadoop.io.WritableComparable;
* @param <T> Type of object which holds messages for one vertex
*/
public abstract class SimpleMessageStore<I extends WritableComparable,
- M extends Writable, T> implements MessageStoreByPartition<I, M> {
+ M extends Writable, T> implements MessageStore<I, M> {
/** Message class */
protected final Class<M> messageClass;
/** Service worker */
@@ -150,16 +148,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
public boolean hasMessagesForVertex(I vertexId) {
ConcurrentMap<I, ?> partitionMap =
map.get(getPartitionId(vertexId));
- return (partitionMap == null) ? false : partitionMap.containsKey(vertexId);
- }
-
- @Override
- public Iterable<I> getDestinationVertices() {
- List<I> vertices = Lists.newArrayList();
- for (ConcurrentMap<I, ?> partitionMap : map.values()) {
- vertices.addAll(partitionMap.keySet());
- }
- return vertices;
+ return partitionMap != null && partitionMap.containsKey(vertexId);
}
@Override
@@ -174,15 +163,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public int getNumberOfMessages() {
- int numberOfMessages = 0;
- for (ConcurrentMap<I, T> partitionMap : map.values()) {
- numberOfMessages += getNumberOfMessagesIn(partitionMap);
- }
- return numberOfMessages;
- }
-
- @Override
public void writePartition(DataOutput out,
int partitionId) throws IOException {
ConcurrentMap<I, T> partitionMap = map.get(partitionId);
@@ -197,15 +177,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(map.size());
- for (int partitionId : map.keySet()) {
- out.writeInt(partitionId);
- writePartition(out, partitionId);
- }
- }
-
- @Override
public void readFieldsForPartition(DataInput in,
int partitionId) throws IOException {
if (in.readBoolean()) {
@@ -221,15 +192,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public void readFields(DataInput in) throws IOException {
- int numPartitions = in.readInt();
- for (int p = 0; p < numPartitions; p++) {
- int partitionId = in.readInt();
- readFieldsForPartition(in, partitionId);
- }
- }
-
- @Override
public void clearVertexMessages(I vertexId) throws IOException {
ConcurrentMap<I, ?> partitionMap =
map.get(getPartitionId(vertexId));