You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/05/31 18:04:32 UTC
[2/2] git commit: updated refs/heads/trunk to c94dd9c
Cleanup the old out-of-core message mechanism
Summary: With the new out-of-core infrastructure, there is no need for the old version of message out-of-core. The old version of message out-of-core also interferes with the new mechanism. It seems that the old out-of-core message mechanism is not necessary anymore. This diff removes the old out-of-core messages and cleans up its implications on the rest of the code base.
Test Plan:
mvn clean verify
snapshot tests passes
Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov
Differential Revision: https://reviews.facebook.net/D58701
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c94dd9c7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c94dd9c7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c94dd9c7
Branch: refs/heads/trunk
Commit: c94dd9c74895d419f72974fcbe5456fbae84b7a9
Parents: 6256a76
Author: Hassan Eslami <he...@fb.com>
Authored: Tue May 31 10:37:00 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue May 31 10:37:00 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/giraph/comm/ServerData.java | 20 +-
.../messages/AbstractListPerVertexStore.java | 3 +-
.../ByteArrayMessagesPerVertexStore.java | 32 +-
.../messages/InMemoryMessageStoreFactory.java | 5 -
.../giraph/comm/messages/MessageStore.java | 17 +-
.../comm/messages/MessageStoreFactory.java | 10 -
.../comm/messages/OneMessagePerVertexStore.java | 7 +-
.../messages/PointerListPerVertexStore.java | 43 +-
.../comm/messages/SimpleMessageStore.java | 8 +-
.../out_of_core/DiskBackedMessageStore.java | 305 -------------
.../DiskBackedMessageStoreFactory.java | 97 ----
.../PartitionDiskBackedMessageStore.java | 370 ----------------
.../out_of_core/SequentialFileMessageStore.java | 437 -------------------
.../comm/messages/out_of_core/package-info.java | 21 -
.../primitives/IdByteArrayMessageStore.java | 33 +-
.../primitives/IdOneMessagePerVertexStore.java | 11 +-
.../primitives/IntByteArrayMessageStore.java | 32 +-
.../primitives/IntFloatMessageStore.java | 11 +-
.../primitives/LongDoubleMessageStore.java | 11 +-
.../long_id/LongAbstractMessageStore.java | 7 +-
.../long_id/LongByteArrayMessageStore.java | 25 +-
.../long_id/LongPointerListMessageStore.java | 45 +-
.../queue/AsyncMessageStoreWrapper.java | 12 +-
.../NettyWorkerClientRequestProcessor.java | 23 +-
.../SendPartitionCurrentMessagesRequest.java | 8 +-
.../requests/SendWorkerMessagesRequest.java | 12 +-
.../SendWorkerOneMessageToManyRequest.java | 102 +++--
.../org/apache/giraph/conf/GiraphConstants.java | 22 -
.../giraph/ooc/data/DiskBackedMessageStore.java | 17 +-
.../giraph/partition/SimplePartition.java | 33 +-
.../apache/giraph/comm/RequestFailureTest.java | 2 +-
.../org/apache/giraph/comm/RequestTest.java | 8 +-
.../apache/giraph/comm/TestMessageStores.java | 38 +-
.../TestIntFloatPrimitiveMessageStores.java | 12 +-
.../TestLongDoublePrimitiveMessageStores.java | 9 +-
.../queue/AsyncMessageStoreWrapperTest.java | 12 +-
.../apache/giraph/graph/TestVertexAndEdges.java | 2 -
.../giraph/jython/TestJythonComputation.java | 6 -
38 files changed, 248 insertions(+), 1620 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 69fbfee..4156d8c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -18,7 +18,6 @@
package org.apache.giraph.comm;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -234,10 +233,8 @@ public class ServerData<I extends WritableComparable,
/**
* Re-initialize message stores.
* Discards old values if any.
- *
- * @throws IOException
*/
- public void resetMessageStores() throws IOException {
+ public void resetMessageStores() {
if (currentMessageStore != null) {
currentMessageStore.clearAll();
currentMessageStore = null;
@@ -252,12 +249,7 @@ public class ServerData<I extends WritableComparable,
/** Prepare for next superstep */
public void prepareSuperstep() {
if (currentMessageStore != null) {
- try {
- currentMessageStore.clearAll();
- } catch (IOException e) {
- throw new IllegalStateException(
- "Failed to clear previous message store");
- }
+ currentMessageStore.clearAll();
}
MessageStore<I, Writable> nextCurrentMessageStore;
@@ -422,13 +414,7 @@ public class ServerData<I extends WritableComparable,
partition.putVertex(vertex);
} else if (originalVertex != null) {
partition.removeVertex(vertexId);
- try {
- getCurrentMessageStore().clearVertexMessages(vertexId);
- } catch (IOException e) {
- throw new IllegalStateException("resolvePartitionMutations: " +
- "Caught IOException while clearing messages for a deleted " +
- "vertex due to a mutation");
- }
+ getCurrentMessageStore().clearVertexMessages(vertexId);
}
context.progress();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
index 6840f86..c28dff5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
@@ -26,7 +26,6 @@ import org.apache.giraph.utils.VertexIdIterator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@@ -89,7 +88,7 @@ public abstract class AbstractListPerVertexStore<I extends WritableComparable,
}
@Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+ public Iterable<M> getVertexMessages(I vertexId) {
ConcurrentMap<I, L> partitionMap =
map.get(getPartitionId(vertexId));
if (partitionMap == null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 29a0888..efbe11b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -95,7 +95,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
@Override
public void addPartitionMessages(
- int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+ int partitionId, VertexIdMessages<I, M> messages) {
ConcurrentMap<I, DataInputOutput> partitionMap =
getOrCreatePartitionMap(partitionId);
VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
@@ -117,17 +117,22 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
}
}
} else {
- VertexIdMessageIterator<I, M> vertexIdMessageIterator =
- messages.getVertexIdMessageIterator();
- while (vertexIdMessageIterator.hasNext()) {
- vertexIdMessageIterator.next();
- DataInputOutput dataInputOutput =
- getDataInputOutput(partitionMap, vertexIdMessageIterator);
-
- synchronized (dataInputOutput) {
- VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
- vertexIdMessageIterator, dataInputOutput.getDataOutput());
+ try {
+ VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+ messages.getVertexIdMessageIterator();
+ while (vertexIdMessageIterator.hasNext()) {
+ vertexIdMessageIterator.next();
+ DataInputOutput dataInputOutput =
+ getDataInputOutput(partitionMap, vertexIdMessageIterator);
+
+ synchronized (dataInputOutput) {
+ VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
+ vertexIdMessageIterator, dataInputOutput.getDataOutput());
+ }
}
+ } catch (IOException e) {
+ throw new RuntimeException("addPartitionMessages: IOException while" +
+ " adding messages for a partition: " + e);
}
}
}
@@ -225,10 +230,5 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
this.service = service;
this.config = conf;
}
-
- @Override
- public boolean shouldTraverseMessagesInOrder() {
- return false;
- }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 27980a9..99a12c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -206,9 +206,4 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
this.service = service;
this.conf = conf;
}
-
- @Override
- public boolean shouldTraverseMessagesInOrder() {
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index 6e85ea3..9c56d85 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -48,24 +48,20 @@ public interface MessageStore<I extends WritableComparable,
*
* @param vertexId Vertex id for which we want to get messages
* @return Iterable of messages for a vertex id
- * @throws java.io.IOException
*/
- Iterable<M> getVertexMessages(I vertexId) throws IOException;
+ Iterable<M> getVertexMessages(I vertexId);
/**
* Clears messages for a vertex.
*
* @param vertexId Vertex id for which we want to clear messages
- * @throws IOException
*/
- void clearVertexMessages(I vertexId) throws IOException;
+ void clearVertexMessages(I vertexId);
/**
* Clears all resources used by this store.
- *
- * @throws IOException
*/
- void clearAll() throws IOException;
+ void clearAll();
/**
* Check if we have messages for some vertex
@@ -88,11 +84,9 @@ public interface MessageStore<I extends WritableComparable,
*
* @param partitionId Id of partition
* @param messages Collection of vertex ids and messages we want to add
- * @throws IOException
*/
void addPartitionMessages(
- int partitionId, VertexIdMessages<I, M> messages)
- throws IOException;
+ int partitionId, VertexIdMessages<I, M> messages);
/**
* Called before start of computation in bspworker
@@ -113,9 +107,8 @@ public interface MessageStore<I extends WritableComparable,
* Clears messages for a partition.
*
* @param partitionId Partition id for which we want to clear messages
- * @throws IOException
*/
- void clearPartition(int partitionId) throws IOException;
+ void clearPartition(int partitionId);
/**
* Serialize messages for one partition.
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index 41076e3..6a18aa8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -50,14 +50,4 @@ public interface MessageStoreFactory<I extends WritableComparable,
*/
void initialize(CentralizedServiceWorker<I, ?, ?> service,
ImmutableClassesGiraphConfiguration<I, ?, ?> conf);
-
- /**
- * This method is more for the performance optimization. If the message
- * traversal would be done in order then data structure which is optimized
- * for such traversal can be used.
- *
- * @return true if the messages would be traversed in order
- * else return false
- */
- boolean shouldTraverseMessagesInOrder();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index ad0a5dc..1d67014 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -71,7 +71,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
@Override
public void addPartitionMessages(
int partitionId,
- VertexIdMessages<I, M> messages) throws IOException {
+ VertexIdMessages<I, M> messages) {
ConcurrentMap<I, M> partitionMap =
getOrCreatePartitionMap(partitionId);
VertexIdMessageIterator<I, M> vertexIdMessageIterator =
@@ -175,10 +175,5 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
this.service = service;
this.config = conf;
}
-
- @Override
- public boolean shouldTraverseMessagesInOrder() {
- return false;
- }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
index cce0439..4b32a17 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
@@ -77,26 +77,31 @@ public class PointerListPerVertexStore<I extends WritableComparable,
@Override
public void addPartitionMessages(
- int partitionId, VertexIdMessages<I, M> messages) throws IOException {
- VertexIdMessageIterator<I, M> vertexIdMessageIterator =
- messages.getVertexIdMessageIterator();
- long pointer = 0;
- LongArrayList list;
- while (vertexIdMessageIterator.hasNext()) {
- vertexIdMessageIterator.next();
- M msg = vertexIdMessageIterator.getCurrentMessage();
- list = getOrCreateList(vertexIdMessageIterator);
- if (vertexIdMessageIterator.isNewMessage()) {
- IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
- pointer = indexAndDataOut.getIndex();
- pointer <<= 32;
- ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
- pointer += dataOutput.getPos();
- msg.write(dataOutput);
- }
- synchronized (list) {
- list.add(pointer);
+ int partitionId, VertexIdMessages<I, M> messages) {
+ try {
+ VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+ messages.getVertexIdMessageIterator();
+ long pointer = 0;
+ LongArrayList list;
+ while (vertexIdMessageIterator.hasNext()) {
+ vertexIdMessageIterator.next();
+ M msg = vertexIdMessageIterator.getCurrentMessage();
+ list = getOrCreateList(vertexIdMessageIterator);
+ if (vertexIdMessageIterator.isNewMessage()) {
+ IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+ pointer = indexAndDataOut.getIndex();
+ pointer <<= 32;
+ ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+ pointer += dataOutput.getPos();
+ msg.write(dataOutput);
+ }
+ synchronized (list) {
+ list.add(pointer);
+ }
}
+ } catch (IOException e) {
+ throw new RuntimeException("addPartitionMessages: IOException while" +
+ " adding messages for a partition: " + e);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 054302d..9c3ef7f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -157,7 +157,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+ public Iterable<M> getVertexMessages(I vertexId) {
ConcurrentMap<I, T> partitionMap = map.get(getPartitionId(vertexId));
if (partitionMap == null) {
return Collections.<M>emptyList();
@@ -197,7 +197,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public void clearVertexMessages(I vertexId) throws IOException {
+ public void clearVertexMessages(I vertexId) {
ConcurrentMap<I, ?> partitionMap =
map.get(getPartitionId(vertexId));
if (partitionMap != null) {
@@ -206,7 +206,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public void clearPartition(int partitionId) throws IOException {
+ public void clearPartition(int partitionId) {
map.remove(partitionId);
}
@@ -217,7 +217,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public void clearAll() throws IOException {
+ public void clearAll() {
map.clear();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
deleted file mode 100644
index 0d7009b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.out_of_core;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.conf.MessageClasses;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-/**
- * Message store which separates data by partitions,
- * and submits them to underlying message store.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class DiskBackedMessageStore<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- MessageStore<I, M> {
- /** Message value factory */
- private final MessageClasses<I, M> messageClasses;
- /** Service worker */
- private final CentralizedServiceWorker<I, V, E> service;
- /** Number of messages to keep in memory */
- private final int maxNumberOfMessagesInMemory;
- /** Factory for creating file stores when flushing */
- private final MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
- partitionStoreFactory;
- /** Map from partition id to its message store */
- private final ConcurrentMap<Integer, PartitionDiskBackedMessageStore<I, M>>
- partitionMessageStores;
-
- /**
- * Constructor
- *
- * @param messageClasses Message classes information
- * @param service Service worker
- * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
- * @param partitionStoreFactory Factory for creating stores for a
- * partition
- */
- public DiskBackedMessageStore(
- MessageClasses<I, M> messageClasses,
- CentralizedServiceWorker<I, V, E> service,
- int maxNumberOfMessagesInMemory,
- MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I,
- M>> partitionStoreFactory) {
- this.messageClasses = messageClasses;
- this.service = service;
- this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
- this.partitionStoreFactory = partitionStoreFactory;
- partitionMessageStores = Maps.newConcurrentMap();
- }
-
- @Override
- public boolean isPointerListEncoding() {
- return false;
- }
-
- @Override
- public void addPartitionMessages(
- int partitionId,
- VertexIdMessages<I, M> messages) throws IOException {
- PartitionDiskBackedMessageStore<I, M> partitionMessageStore =
- getMessageStore(partitionId);
- VertexIdMessageIterator<I, M>
- vertexIdMessageIterator =
- messages.getVertexIdMessageIterator();
- while (vertexIdMessageIterator.hasNext()) {
- vertexIdMessageIterator.next();
- boolean ownsVertexId =
- partitionMessageStore.addVertexMessages(
- vertexIdMessageIterator.getCurrentVertexId(),
- Collections.singleton(
- vertexIdMessageIterator.getCurrentMessage()));
- if (ownsVertexId) {
- vertexIdMessageIterator.releaseCurrentVertexId();
- }
- }
- checkMemory();
- }
-
- @Override
- public void finalizeStore() {
- }
-
- @Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
- if (hasMessagesForVertex(vertexId)) {
- return getMessageStore(vertexId).getVertexMessages(vertexId);
- } else {
- return EmptyIterable.get();
- }
- }
-
- @Override
- public boolean hasMessagesForVertex(I vertexId) {
- return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
- }
-
- @Override
- public boolean hasMessagesForPartition(int partitionId) {
- PartitionDiskBackedMessageStore<I, M> partitionMessages =
- getMessageStore(partitionId);
- return partitionMessages != null && !Iterables
- .isEmpty(partitionMessages.getDestinationVertices());
- }
-
- @Override
- public Iterable<I> getPartitionDestinationVertices(int partitionId) {
- PartitionDiskBackedMessageStore<I, M> messageStore =
- partitionMessageStores.get(partitionId);
- if (messageStore == null) {
- return Collections.emptyList();
- } else {
- return messageStore.getDestinationVertices();
- }
- }
-
- @Override
- public void clearVertexMessages(I vertexId) throws IOException {
- if (hasMessagesForVertex(vertexId)) {
- getMessageStore(vertexId).clearVertexMessages(vertexId);
- }
- }
-
- @Override
- public void clearPartition(int partitionId) throws IOException {
- PartitionDiskBackedMessageStore<I, M> messageStore =
- partitionMessageStores.get(partitionId);
- if (messageStore != null) {
- messageStore.clearAll();
- }
- }
-
- @Override
- public void clearAll() throws IOException {
- for (PartitionDiskBackedMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- messageStore.clearAll();
- }
- partitionMessageStores.clear();
- }
-
- /**
- * Checks the memory status, flushes if necessary
- *
- * @throws IOException
- */
- private void checkMemory() throws IOException {
- while (memoryFull()) {
- flushOnePartition();
- }
- }
-
- /**
- * Check if memory is full
- *
- * @return True iff memory is full
- */
- private boolean memoryFull() {
- int totalMessages = 0;
- for (PartitionDiskBackedMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- totalMessages += messageStore.getNumberOfMessages();
- }
- return totalMessages > maxNumberOfMessagesInMemory;
- }
-
- /**
- * Finds biggest partition and flushes it to the disk
- *
- * @throws IOException
- */
- private void flushOnePartition() throws IOException {
- int maxMessages = 0;
- PartitionDiskBackedMessageStore<I, M> biggestStore = null;
- for (PartitionDiskBackedMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- int numMessages = messageStore.getNumberOfMessages();
- if (numMessages > maxMessages) {
- maxMessages = numMessages;
- biggestStore = messageStore;
- }
- }
- if (biggestStore != null) {
- biggestStore.flush();
- }
- }
-
- /**
- * Get message store for partition which holds vertex with required vertex
- * id
- *
- * @param vertexId Id of vertex for which we are asking for message store
- * @return Requested message store
- */
- private PartitionDiskBackedMessageStore<I, M> getMessageStore(I vertexId) {
- int partitionId =
- service.getVertexPartitionOwner(vertexId).getPartitionId();
- return getMessageStore(partitionId);
- }
-
- /**
- * Get message store for partition id. It it doesn't exist yet,
- * creates a new one.
- *
- * @param partitionId Id of partition for which we are asking for message
- * store
- * @return Requested message store
- */
- private PartitionDiskBackedMessageStore<I, M> getMessageStore(
- int partitionId) {
- PartitionDiskBackedMessageStore<I, M> messageStore =
- partitionMessageStores.get(partitionId);
- if (messageStore != null) {
- return messageStore;
- }
- messageStore = partitionStoreFactory.newStore(messageClasses);
- PartitionDiskBackedMessageStore<I, M> store =
- partitionMessageStores.putIfAbsent(partitionId, messageStore);
- return (store == null) ? messageStore : store;
- }
-
- @Override
- public void writePartition(DataOutput out,
- int partitionId) throws IOException {
- PartitionDiskBackedMessageStore<I, M> partitionStore =
- partitionMessageStores.get(partitionId);
- out.writeBoolean(partitionStore != null);
- if (partitionStore != null) {
- partitionStore.write(out);
- }
- }
-
- @Override
- public void readFieldsForPartition(DataInput in,
- int partitionId) throws IOException {
- if (in.readBoolean()) {
- PartitionDiskBackedMessageStore<I, M> messageStore =
- partitionStoreFactory.newStore(messageClasses);
- messageStore.readFields(in);
- partitionMessageStores.put(partitionId, messageStore);
- }
- }
-
-
- /**
- * Create new factory for this message store
- *
- * @param service Service worker
- * @param maxMessagesInMemory Number of messages to keep in memory
- * @param fileStoreFactory Factory for creating file stores when
- * flushing
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @return Factory
- */
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
- CentralizedServiceWorker<I, V, E> service,
- int maxMessagesInMemory,
- MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
- fileStoreFactory) {
- return new DiskBackedMessageStoreFactory<I, V, E, M>(service,
- maxMessagesInMemory,
- fileStoreFactory);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
deleted file mode 100644
index 728a2ed..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.out_of_core;
-
-import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.MessageClasses;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message store factory which persist the messages on the disk.
- *
- * @param <I> vertex id
- * @param <V> vertex data
- * @param <E> edge data
- * @param <M> message data
- */
-public class DiskBackedMessageStoreFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements MessageStoreFactory<I, M, MessageStore<I, M>> {
- /** Service worker */
- private CentralizedServiceWorker<I, V, E> service;
- /** Number of messages to keep in memory */
- private int maxMessagesInMemory;
- /** Factory for creating file stores when flushing */
- private MessageStoreFactory<I, M,
- PartitionDiskBackedMessageStore<I, M>> fileStoreFactory;
-
- /**
- * Default constructor class helps in class invocation via Reflection
- */
- public DiskBackedMessageStoreFactory() {
- }
-
- /**
- * @param service Service worker
- * @param maxMessagesInMemory Number of messages to keep in memory
- * @param fileStoreFactory Factory for creating file stores when flushing
- */
- public DiskBackedMessageStoreFactory(
- CentralizedServiceWorker<I, V, E> service,
- int maxMessagesInMemory,
- MessageStoreFactory<I, M,
- PartitionDiskBackedMessageStore<I, M>> fileStoreFactory) {
- this.service = service;
- this.maxMessagesInMemory = maxMessagesInMemory;
- this.fileStoreFactory = fileStoreFactory;
- }
-
- @Override
- public MessageStore<I, M>
- newStore(MessageClasses<I, M> messageClasses) {
- return new DiskBackedMessageStore<I, V, E, M>(messageClasses,
- service, maxMessagesInMemory, fileStoreFactory);
- }
-
- @Override
- public void initialize(CentralizedServiceWorker service,
- ImmutableClassesGiraphConfiguration conf) {
- this.maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
-
- MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>>
- fileMessageStoreFactory =
- SequentialFileMessageStore.newFactory(conf);
- this.fileStoreFactory =
- PartitionDiskBackedMessageStore.newFactory(conf,
- fileMessageStoreFactory);
-
- this.service = service;
- }
-
- @Override
- public boolean shouldTraverseMessagesInOrder() {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
deleted file mode 100644
index 698281f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.out_of_core;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.MessageClasses;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Message storage with in-memory map of messages and with support for
- * flushing all the messages to the disk. Holds messages for a single partition.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class PartitionDiskBackedMessageStore<I extends WritableComparable,
- M extends Writable> implements Writable {
- /** Message classes */
- private final MessageClasses<I, M> messageClasses;
- /** Message value factory */
- private final MessageValueFactory<M> messageValueFactory;
- /**
- * In-memory message map (must be sorted to insure that the ids are
- * ordered)
- */
- private volatile ConcurrentNavigableMap<I, DataInputOutput>
- inMemoryMessages;
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
- /** Counter for number of messages in-memory */
- private final AtomicInteger numberOfMessagesInMemory;
- /** To keep vertex ids which we have messages for */
- private final Set<I> destinationVertices;
- /** File stores in which we keep flushed messages */
- private final Collection<SequentialFileMessageStore<I, M>> fileStores;
- /** Factory for creating file stores when flushing */
- private final
- MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> fileStoreFactory;
- /** Lock for disk flushing */
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
-
- /**
- * Constructor.
- *
- * @param messageClasses Message classes information
- * @param config Hadoop configuration
- * @param fileStoreFactory Factory for creating file stores when flushing
- */
- public PartitionDiskBackedMessageStore(
- MessageClasses<I, M> messageClasses,
- ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
- fileStoreFactory) {
- inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
- this.messageClasses = messageClasses;
- this.messageValueFactory = messageClasses.createMessageValueFactory(config);
- this.config = config;
- numberOfMessagesInMemory = new AtomicInteger(0);
- destinationVertices =
- Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
- fileStores = Lists.newArrayList();
- this.fileStoreFactory = fileStoreFactory;
- }
-
- /**
- * Add vertex messages
- *
- * @param vertexId Vertex id to use
- * @param messages Messages to add (note that the lifetime of the messages)
- * is only until next() is called again)
- * @return True if the vertex id ownership is taken by this method,
- * false otherwise
- * @throws IOException
- */
- boolean addVertexMessages(I vertexId,
- Iterable<M> messages) throws IOException {
- boolean ownsVertexId = false;
- destinationVertices.add(vertexId);
- rwLock.readLock().lock();
- try {
- DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId);
- if (dataInputOutput == null) {
- DataInputOutput newDataInputOutput =
- config.createMessagesInputOutput();
- dataInputOutput =
- inMemoryMessages.putIfAbsent(vertexId, newDataInputOutput);
- if (dataInputOutput == null) {
- ownsVertexId = true;
- dataInputOutput = newDataInputOutput;
- }
- }
-
- synchronized (dataInputOutput) {
- for (M message : messages) {
- message.write(dataInputOutput.getDataOutput());
- numberOfMessagesInMemory.getAndIncrement();
- }
- }
- } finally {
- rwLock.readLock().unlock();
- }
-
- return ownsVertexId;
- }
-
- /**
- * Get the messages for a vertex.
- *
- * @param vertexId Vertex id for which we want to get messages
- * @return Iterable of messages for a vertex id
- */
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
- DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId);
- if (dataInputOutput == null) {
- dataInputOutput = config.createMessagesInputOutput();
- }
- Iterable<M> combinedIterable = new MessagesIterable<M>(
- dataInputOutput, messageValueFactory);
-
- for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
- combinedIterable = Iterables.concat(combinedIterable,
- fileStore.getVertexMessages(vertexId));
- }
- return combinedIterable;
- }
-
- /**
- * Get number of messages in memory
- *
- * @return Number of messages in memory
- */
- public int getNumberOfMessages() {
- return numberOfMessagesInMemory.get();
- }
-
- /**
- * Check if we have messages for some vertex
- *
- * @param vertexId Id of vertex which we want to check
- * @return True iff we have messages for vertex with required id
- */
- public boolean hasMessagesForVertex(I vertexId) {
- return destinationVertices.contains(vertexId);
- }
-
- /**
- * Gets vertex ids which we have messages for
- *
- * @return Iterable over vertex ids which we have messages for
- */
- public Iterable<I> getDestinationVertices() {
- return destinationVertices;
- }
-
- /**
- * Clears messages for a vertex.
- *
- * @param vertexId Vertex id for which we want to clear messages
- * @throws IOException
- */
- public void clearVertexMessages(I vertexId) throws IOException {
- inMemoryMessages.remove(vertexId);
- }
-
- /**
- * Clears all resources used by this store.
- *
- * @throws IOException
- */
- public void clearAll() throws IOException {
- inMemoryMessages.clear();
- destinationVertices.clear();
- for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
- fileStore.clearAll();
- }
- fileStores.clear();
- }
-
- /**
- * Flushes messages to the disk.
- *
- * @throws IOException
- */
- public void flush() throws IOException {
- ConcurrentNavigableMap<I, DataInputOutput> messagesToFlush = null;
- rwLock.writeLock().lock();
- try {
- messagesToFlush = inMemoryMessages;
- inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
- numberOfMessagesInMemory.set(0);
- } finally {
- rwLock.writeLock().unlock();
- }
- SequentialFileMessageStore<I, M> fileStore =
- fileStoreFactory.newStore(messageClasses);
- fileStore.addMessages(messagesToFlush);
-
- synchronized (fileStores) {
- fileStores.add(fileStore);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // write destination vertices
- out.writeInt(destinationVertices.size());
- for (I vertexId : destinationVertices) {
- vertexId.write(out);
- }
-
- // write of in-memory messages
- out.writeInt(numberOfMessagesInMemory.get());
-
- // write in-memory messages map
- out.writeInt(inMemoryMessages.size());
- for (Entry<I, DataInputOutput> entry : inMemoryMessages.entrySet()) {
- entry.getKey().write(out);
- entry.getValue().write(out);
- }
-
- // write file stores
- out.writeInt(fileStores.size());
- for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
- fileStore.write(out);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- // read destination vertices
- int numVertices = in.readInt();
- for (int v = 0; v < numVertices; v++) {
- I vertexId = config.createVertexId();
- vertexId.readFields(in);
- destinationVertices.add(vertexId);
- }
-
- // read in-memory messages
- numberOfMessagesInMemory.set(in.readInt());
-
- // read in-memory map
- int mapSize = in.readInt();
- for (int m = 0; m < mapSize; m++) {
- I vertexId = config.createVertexId();
- vertexId.readFields(in);
- DataInputOutput dataInputOutput = config.createMessagesInputOutput();
- dataInputOutput.readFields(in);
- inMemoryMessages.put(vertexId, dataInputOutput);
- }
-
- // read file stores
- int numFileStores = in.readInt();
- for (int s = 0; s < numFileStores; s++) {
- SequentialFileMessageStore<I, M> fileStore =
- fileStoreFactory.newStore(messageClasses);
- fileStore.readFields(in);
- fileStores.add(fileStore);
- }
- }
-
-
- /**
- * Create new factory for this message store
- *
- * @param config Hadoop configuration
- * @param fileStoreFactory Factory for creating message stores for
- * partitions
- * @param <I> Vertex id
- * @param <M> Message data
- * @return Factory
- */
- public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> newFactory(
- ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
- fileStoreFactory) {
- return new Factory<I, M>(config, fileStoreFactory);
- }
-
- /**
- * Factory for {@link PartitionDiskBackedMessageStore}
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
- private static class Factory<I extends WritableComparable,
- M extends Writable> implements MessageStoreFactory<I, M,
- PartitionDiskBackedMessageStore<I, M>> {
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
- /** Factory for creating message stores for partitions */
- private final MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
- fileStoreFactory;
-
- /**
- * @param config Hadoop configuration
- * @param fileStoreFactory Factory for creating message stores for
- * partitions
- */
- public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
- fileStoreFactory) {
- this.config = config;
- this.fileStoreFactory = fileStoreFactory;
- }
-
- @Override
- public PartitionDiskBackedMessageStore<I, M> newStore(
- MessageClasses<I, M> messageClasses) {
- return new PartitionDiskBackedMessageStore<I, M>(messageClasses,
- config, fileStoreFactory);
- }
-
- @Override
- public void initialize(CentralizedServiceWorker<I, ?, ?> service,
- ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
- /* Implementation of this method is required if the class is to
- * be exposed publicly and allow instantiating the class via the
- * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is
- * a private class, hence the implementation of this method is skipped
- * as the caller knows the specific required constructor parameters
- * for instantiation.
- */
- }
-
- @Override
- public boolean shouldTraverseMessagesInOrder() {
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
deleted file mode 100644
index 8f589bc..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.out_of_core;
-
-import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.MessageClasses;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- * Used for writing and reading collection of messages to the disk.
- * {@link SequentialFileMessageStore#addMessages(NavigableMap)}
- * should be called only once with the messages we want to store.
- * <p/>
- * It's optimized for retrieving messages in the natural order of vertex ids
- * they are sent to.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class SequentialFileMessageStore<I extends WritableComparable,
- M extends Writable> implements Writable {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SequentialFileMessageStore.class);
- /** Message class */
- private final MessageValueFactory<M> messageValueFactory;
- /** File in which we store data */
- private final File file;
- /** Configuration which we need for reading data */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
- /** Buffer size to use when reading and writing files */
- private final int bufferSize;
- /** File input stream */
- private DataInputStream in;
- /** How many vertices do we have left to read in the file */
- private int verticesLeft;
- /** Id of currently read vertex */
- private I currentVertexId;
-
- /**
- * Stores message on the disk.
- *
- *
- * @param messageValueFactory Used to create message values
- * @param config Configuration used later for reading
- * @param bufferSize Buffer size to use when reading and writing
- * @param fileName File in which we want to store messages
- * @throws IOException
- */
- public SequentialFileMessageStore(
- MessageValueFactory<M> messageValueFactory,
- ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- int bufferSize,
- String fileName) {
- this.messageValueFactory = messageValueFactory;
- this.config = config;
- this.bufferSize = bufferSize;
- file = new File(fileName);
- }
-
- /**
- * Adds messages from one message store to another
- *
- * @param messageMap Add the messages from this map to this store
- * @throws java.io.IOException
- */
- public void addMessages(NavigableMap<I, DataInputOutput> messageMap)
- throws IOException {
- // Writes messages to its file
- if (file.exists()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: Deleting " + file);
- }
- if (!file.delete()) {
- throw new IOException("Failed to delete existing file " + file);
- }
- }
- if (!file.createNewFile()) {
- throw new IOException("Failed to create file " + file);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: Creating " + file);
- }
-
- DataOutputStream out = null;
-
- try {
- out = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file), bufferSize));
- int destinationVertexIdCount = messageMap.size();
- out.writeInt(destinationVertexIdCount);
-
- // Dump the vertices and their messages in a sorted order
- for (Map.Entry<I, DataInputOutput> entry : messageMap.entrySet()) {
- I destinationVertexId = entry.getKey();
- destinationVertexId.write(out);
- DataInputOutput dataInputOutput = entry.getValue();
- Iterable<M> messages = new MessagesIterable<M>(
- dataInputOutput, messageValueFactory);
- int messageCount = Iterables.size(messages);
- out.writeInt(messageCount);
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: For vertex id " + destinationVertexId +
- ", messages = " + messageCount + " to file " + file);
- }
- for (M message : messages) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: Wrote " + message + " to " + file);
- }
- message.write(out);
- }
- }
- } finally {
- if (out != null) {
- out.close();
- }
- }
- }
-
- /**
- * Reads messages for a vertex. It will find the messages only if all
- * previous reads used smaller vertex ids than this one - messages should
- * be retrieved in increasing order of vertex ids.
- *
- * @param vertexId Vertex id for which we want to get messages
- * @return Messages for the selected vertex, or empty list if not used
- * correctly
- * @throws IOException
- */
- public Iterable<M> getVertexMessages(I vertexId) throws
- IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
- " (currently " + currentVertexId + ") from " + file);
- }
- if (in == null) {
- startReading();
- }
-
- I nextVertexId = getCurrentVertexId();
- while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
- nextVertexId = getNextVertexId();
- }
-
- if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
- return EmptyIterable.get();
- }
-
- return readMessagesForCurrentVertex();
- }
-
- /**
- * Clears all resources used by this store.
- */
- public void clearAll() throws IOException {
- endReading();
- if (!file.delete()) {
- LOG.error("clearAll: Failed to delete file " + file);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(file.length());
- FileInputStream input = new FileInputStream(file);
- try {
- byte[] buffer = new byte[bufferSize];
- while (true) {
- int length = input.read(buffer);
- if (length < 0) {
- break;
- }
- out.write(buffer, 0, length);
- }
- } finally {
- input.close();
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- FileOutputStream output = new FileOutputStream(file);
- try {
- long fileLength = in.readLong();
- byte[] buffer = new byte[bufferSize];
- for (long position = 0; position < fileLength; position += bufferSize) {
- int bytes = (int) Math.min(bufferSize, fileLength - position);
- in.readFully(buffer, 0, bytes);
- output.write(buffer);
- }
- } finally {
- output.close();
- }
- }
-
- /**
- * Prepare for reading
- *
- * @throws IOException
- */
- private void startReading() throws IOException {
- currentVertexId = null;
- in = new DataInputStream(
- new BufferedInputStream(new FileInputStream(file), bufferSize));
- verticesLeft = in.readInt();
- if (LOG.isDebugEnabled()) {
- LOG.debug("startReading: File " + file + " with " +
- verticesLeft + " vertices left");
- }
- }
-
- /**
- * Gets current vertex id.
- * <p/>
- * If there is a vertex id whose messages haven't been read yet it
- * will return that vertex id, otherwise it will read and return the next
- * one.
- *
- * @return Current vertex id
- * @throws IOException
- */
- private I getCurrentVertexId() throws IOException {
- if (currentVertexId != null) {
- return currentVertexId;
- } else {
- return getNextVertexId();
- }
- }
-
- /**
- * Gets next vertex id.
- * <p/>
- * If there is a vertex whose messages haven't been read yet it
- * will read and skip over its messages to get to the next vertex.
- *
- * @return Next vertex id
- * @throws IOException
- */
- private I getNextVertexId() throws IOException {
- if (currentVertexId != null) {
- readMessagesForCurrentVertex();
- }
- if (verticesLeft == 0) {
- return null;
- }
- currentVertexId = config.createVertexId();
- currentVertexId.readFields(in);
- return currentVertexId;
- }
-
- /**
- * Reads messages for current vertex.
- *
- * @return Messages for current vertex
- * @throws IOException
- */
- private Collection<M> readMessagesForCurrentVertex() throws IOException {
- int messagesSize = in.readInt();
- List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
- for (int i = 0; i < messagesSize; i++) {
- M message = messageValueFactory.newInstance();
- try {
- message.readFields(in);
- } catch (IOException e) {
- throw new IllegalStateException("readMessagesForCurrentVertex: " +
- "Failed to read message from " + i + " of " +
- messagesSize + " for vertex id " + currentVertexId + " from " +
- file, e);
- }
- messages.add(message);
- }
- currentVertexDone();
- return messages;
- }
-
- /**
- * Release current vertex.
- *
- * @throws IOException
- */
- private void currentVertexDone() throws IOException {
- currentVertexId = null;
- verticesLeft--;
- if (verticesLeft == 0) {
- endReading();
- }
- }
-
- /**
- * Call when we are done reading, for closing files.
- *
- * @throws IOException
- */
- private void endReading() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("endReading: Stopped reading " + file);
- }
- if (in != null) {
- in.close();
- in = null;
- }
- }
-
- /**
- * Create new factory for this message store
- *
- * @param config Hadoop configuration
- * @param <I> Vertex id
- * @param <M> Message data
- * @return Factory
- */
- public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> newFactory(
- ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
- return new Factory<I, M>(config);
- }
-
- /**
- * Factory for {@link SequentialFileMessageStore}
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
- private static class Factory<I extends WritableComparable,
- M extends Writable>
- implements MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> {
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
- /** Directories in which we'll keep necessary files */
- private final String[] directories;
- /** Buffer size to use when reading and writing */
- private final int bufferSize;
- /** Counter for created message stores */
- private final AtomicInteger storeCounter;
-
- /**
- * Constructor.
- *
- * @param config Hadoop configuration
- */
- public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
- this.config = config;
- String jobId = config.get("mapred.job.id", "Unknown Job");
- int taskId = config.getTaskPartition();
- List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
- Collections.shuffle(userPaths);
- directories = new String[userPaths.size()];
- int i = 0;
- for (String path : userPaths) {
- String directory = path + File.separator + jobId + File.separator +
- taskId + File.separator;
- directories[i++] = directory;
- if (!new File(directory).mkdirs()) {
- LOG.error("SequentialFileMessageStore$Factory: Failed to create " +
- directory);
- }
- }
- this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
- storeCounter = new AtomicInteger();
- }
-
- @Override
- public SequentialFileMessageStore<I, M> newStore(
- MessageClasses<I, M> messageClasses) {
- int idx = Math.abs(storeCounter.getAndIncrement());
- String fileName =
- directories[idx % directories.length] + "messages-" + idx;
- return new SequentialFileMessageStore<I, M>(
- messageClasses.createMessageValueFactory(config), config,
- bufferSize, fileName);
- }
-
- @Override
- public void initialize(CentralizedServiceWorker<I, ?, ?> service,
- ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
- /* Implementation of this method is required if the class is to
- * be exposed publicly and allow instantiating the class via the
- * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is
- * a private class, hence the implementation of this method is skipped
- * as the caller knows the specific required constructor parameters
- * for instantiation.
- */
- }
-
- @Override
- public boolean shouldTraverseMessagesInOrder() {
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
deleted file mode 100644
index 7039378..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of out-of-core messages related classes.
- */
-package org.apache.giraph.comm.messages.out_of_core;
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
index 2e39857..57f3ff6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
@@ -142,7 +142,7 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
@Override
public void addPartitionMessages(int partitionId,
- VertexIdMessages<I, M> messages) throws IOException {
+ VertexIdMessages<I, M> messages) {
Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
synchronized (partitionMap) {
VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
@@ -161,22 +161,27 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
dataInputOutput.getDataOutput());
}
} else {
- VertexIdMessageIterator<I, M> iterator =
- messages.getVertexIdMessageIterator();
- while (iterator.hasNext()) {
- iterator.next();
- DataInputOutput dataInputOutput =
- getDataInputOutput(partitionMap, iterator.getCurrentVertexId());
-
- VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
- dataInputOutput.getDataOutput());
+ try {
+ VertexIdMessageIterator<I, M> iterator =
+ messages.getVertexIdMessageIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ DataInputOutput dataInputOutput =
+ getDataInputOutput(partitionMap, iterator.getCurrentVertexId());
+
+ VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
+ dataInputOutput.getDataOutput());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("addPartitionMessages: IOException while" +
+ " adding message for a partition: " + e);
}
}
}
}
@Override
- public void clearPartition(int partitionId) throws IOException {
+ public void clearPartition(int partitionId) {
map.get(partitionId).clear();
}
@@ -193,7 +198,7 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
}
@Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+ public Iterable<M> getVertexMessages(I vertexId) {
DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId);
if (dataInputOutput == null) {
return EmptyIterable.get();
@@ -203,12 +208,12 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
}
@Override
- public void clearVertexMessages(I vertexId) throws IOException {
+ public void clearVertexMessages(I vertexId) {
getPartitionMap(vertexId).remove(vertexId);
}
@Override
- public void clearAll() throws IOException {
+ public void clearAll() {
map.clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
index 42fe992..4463ddb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -125,7 +125,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
@Override
public void addPartitionMessages(
int partitionId,
- VertexIdMessages<I, M> messages) throws IOException {
+ VertexIdMessages<I, M> messages) {
Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
synchronized (partitionMap) {
VertexIdMessageIterator<I, M>
@@ -152,7 +152,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
}
@Override
- public void clearPartition(int partitionId) throws IOException {
+ public void clearPartition(int partitionId) {
map.get(partitionId).clear();
}
@@ -168,8 +168,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
}
@Override
- public Iterable<M> getVertexMessages(
- I vertexId) throws IOException {
+ public Iterable<M> getVertexMessages(I vertexId) {
Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
if (!partitionMap.containsKey(vertexId)) {
return EmptyIterable.get();
@@ -179,12 +178,12 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
}
@Override
- public void clearVertexMessages(I vertexId) throws IOException {
+ public void clearVertexMessages(I vertexId) {
getPartitionMap(vertexId).remove(vertexId);
}
@Override
- public void clearAll() throws IOException {
+ public void clearAll() {
map.clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index 4c363f3..4ef9e76 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -127,8 +127,7 @@ public class IntByteArrayMessageStore<M extends Writable>
@Override
public void addPartitionMessages(int partitionId,
- VertexIdMessages<IntWritable, M> messages) throws
- IOException {
+ VertexIdMessages<IntWritable, M> messages) {
Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
map.get(partitionId);
synchronized (partitionMap) {
@@ -149,14 +148,19 @@ public class IntByteArrayMessageStore<M extends Writable>
dataInputOutput.getDataOutput());
}
} else {
- VertexIdMessageIterator<IntWritable, M>
- iterator = messages.getVertexIdMessageIterator();
- while (iterator.hasNext()) {
- iterator.next();
- DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
- iterator.getCurrentVertexId().get());
- VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
- dataInputOutput.getDataOutput());
+ try {
+ VertexIdMessageIterator<IntWritable, M>
+ iterator = messages.getVertexIdMessageIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+ iterator.getCurrentVertexId().get());
+ VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
+ dataInputOutput.getDataOutput());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("addPartitionMessages: IOException while" +
+ " adding messages for a partition: " + e);
}
}
}
@@ -167,7 +171,7 @@ public class IntByteArrayMessageStore<M extends Writable>
}
@Override
- public void clearPartition(int partitionId) throws IOException {
+ public void clearPartition(int partitionId) {
map.get(partitionId).clear();
}
@@ -185,7 +189,7 @@ public class IntByteArrayMessageStore<M extends Writable>
@Override
public Iterable<M> getVertexMessages(
- IntWritable vertexId) throws IOException {
+ IntWritable vertexId) {
DataInputOutput dataInputOutput =
getPartitionMap(vertexId).get(vertexId.get());
if (dataInputOutput == null) {
@@ -196,12 +200,12 @@ public class IntByteArrayMessageStore<M extends Writable>
}
@Override
- public void clearVertexMessages(IntWritable vertexId) throws IOException {
+ public void clearVertexMessages(IntWritable vertexId) {
getPartitionMap(vertexId).remove(vertexId.get());
}
@Override
- public void clearAll() throws IOException {
+ public void clearAll() {
map.clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 280f5b9..715bf45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -96,8 +96,7 @@ public class IntFloatMessageStore
@Override
public void addPartitionMessages(int partitionId,
- VertexIdMessages<IntWritable, FloatWritable> messages) throws
- IOException {
+ VertexIdMessages<IntWritable, FloatWritable> messages) {
IntWritable reusableVertexId = new IntWritable();
FloatWritable reusableMessage = new FloatWritable();
FloatWritable reusableCurrentMessage = new FloatWritable();
@@ -128,7 +127,7 @@ public class IntFloatMessageStore
}
@Override
- public void clearPartition(int partitionId) throws IOException {
+ public void clearPartition(int partitionId) {
map.get(partitionId).clear();
}
@@ -145,7 +144,7 @@ public class IntFloatMessageStore
@Override
public Iterable<FloatWritable> getVertexMessages(
- IntWritable vertexId) throws IOException {
+ IntWritable vertexId) {
Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
if (!partitionMap.containsKey(vertexId.get())) {
return EmptyIterable.get();
@@ -156,12 +155,12 @@ public class IntFloatMessageStore
}
@Override
- public void clearVertexMessages(IntWritable vertexId) throws IOException {
+ public void clearVertexMessages(IntWritable vertexId) {
getPartitionMap(vertexId).remove(vertexId.get());
}
@Override
- public void clearAll() throws IOException {
+ public void clearAll() {
map.clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index d8a3fde..4fc4843 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -97,8 +97,7 @@ public class LongDoubleMessageStore
@Override
public void addPartitionMessages(int partitionId,
- VertexIdMessages<LongWritable, DoubleWritable> messages) throws
- IOException {
+ VertexIdMessages<LongWritable, DoubleWritable> messages) {
LongWritable reusableVertexId = new LongWritable();
DoubleWritable reusableMessage = new DoubleWritable();
DoubleWritable reusableCurrentMessage = new DoubleWritable();
@@ -129,7 +128,7 @@ public class LongDoubleMessageStore
}
@Override
- public void clearPartition(int partitionId) throws IOException {
+ public void clearPartition(int partitionId) {
map.get(partitionId).clear();
}
@@ -146,7 +145,7 @@ public class LongDoubleMessageStore
@Override
public Iterable<DoubleWritable> getVertexMessages(
- LongWritable vertexId) throws IOException {
+ LongWritable vertexId) {
Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
if (!partitionMap.containsKey(vertexId.get())) {
return EmptyIterable.get();
@@ -157,12 +156,12 @@ public class LongDoubleMessageStore
}
@Override
- public void clearVertexMessages(LongWritable vertexId) throws IOException {
+ public void clearVertexMessages(LongWritable vertexId) {
getPartitionMap(vertexId).remove(vertexId.get());
}
@Override
- public void clearAll() throws IOException {
+ public void clearAll() {
map.clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
index a0c977e..b3ed4b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
@@ -29,7 +29,6 @@ import org.apache.giraph.factories.MessageValueFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import java.io.IOException;
import java.util.List;
/**
@@ -91,7 +90,7 @@ public abstract class LongAbstractMessageStore<M extends Writable, T>
}
@Override
- public void clearPartition(int partitionId) throws IOException {
+ public void clearPartition(int partitionId) {
map.get(partitionId).clear();
}
@@ -107,13 +106,13 @@ public abstract class LongAbstractMessageStore<M extends Writable, T>
}
@Override
- public void clearVertexMessages(LongWritable vertexId) throws IOException {
+ public void clearVertexMessages(LongWritable vertexId) {
getPartitionMap(vertexId).remove(vertexId.get());
}
@Override
- public void clearAll() throws IOException {
+ public void clearAll() {
map.clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
index 092d963..bcdab98 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
@@ -89,7 +89,7 @@ public class LongByteArrayMessageStore<M extends Writable>
@Override
public void addPartitionMessages(int partitionId,
- VertexIdMessages<LongWritable, M> messages) throws IOException {
+ VertexIdMessages<LongWritable, M> messages) {
Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId);
synchronized (partitionMap) {
VertexIdMessageBytesIterator<LongWritable, M>
@@ -109,14 +109,19 @@ public class LongByteArrayMessageStore<M extends Writable>
dataInputOutput.getDataOutput());
}
} else {
- VertexIdMessageIterator<LongWritable, M>
- iterator = messages.getVertexIdMessageIterator();
- while (iterator.hasNext()) {
- iterator.next();
- DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
- iterator.getCurrentVertexId().get());
- VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
- dataInputOutput.getDataOutput());
+ try {
+ VertexIdMessageIterator<LongWritable, M>
+ iterator = messages.getVertexIdMessageIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+ iterator.getCurrentVertexId().get());
+ VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
+ dataInputOutput.getDataOutput());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("addPartitionMessages: IOException while" +
+ " adding messages for a partition: " + e);
}
}
}
@@ -128,7 +133,7 @@ public class LongByteArrayMessageStore<M extends Writable>
@Override
public Iterable<M> getVertexMessages(
- LongWritable vertexId) throws IOException {
+ LongWritable vertexId) {
DataInputOutput dataInputOutput =
getPartitionMap(vertexId).get(vertexId.get());
if (dataInputOutput == null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
index 32296ad..eef75ba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
@@ -80,32 +80,37 @@ public class LongPointerListMessageStore<M extends Writable>
@Override
public void addPartitionMessages(int partitionId,
- VertexIdMessages<LongWritable, M> messages) throws IOException {
- VertexIdMessageIterator<LongWritable, M> iterator =
- messages.getVertexIdMessageIterator();
- long pointer = 0;
- LongArrayList list;
- while (iterator.hasNext()) {
- iterator.next();
- M msg = iterator.getCurrentMessage();
- list = getList(iterator);
- if (iterator.isNewMessage()) {
- IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
- pointer = indexAndDataOut.getIndex();
- pointer <<= 32;
- ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
- pointer += dataOutput.getPos();
- msg.write(dataOutput);
- }
- synchronized (list) { // TODO - any better way?
- list.add(pointer);
+ VertexIdMessages<LongWritable, M> messages) {
+ try {
+ VertexIdMessageIterator<LongWritable, M> iterator =
+ messages.getVertexIdMessageIterator();
+ long pointer = 0;
+ LongArrayList list;
+ while (iterator.hasNext()) {
+ iterator.next();
+ M msg = iterator.getCurrentMessage();
+ list = getList(iterator);
+ if (iterator.isNewMessage()) {
+ IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+ pointer = indexAndDataOut.getIndex();
+ pointer <<= 32;
+ ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+ pointer += dataOutput.getPos();
+ msg.write(dataOutput);
+ }
+ synchronized (list) { // TODO - any better way?
+ list.add(pointer);
+ }
}
+ } catch (IOException e) {
+ throw new RuntimeException("addPartitionMessages: IOException while" +
+ " adding messages for a partition: " + e);
}
}
@Override
public Iterable<M> getVertexMessages(
- LongWritable vertexId) throws IOException {
+ LongWritable vertexId) {
LongArrayList list = getPartitionMap(vertexId).get(
vertexId.get());
if (list == null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
index 04afba5..6273694 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
@@ -113,17 +113,17 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
}
@Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+ public Iterable<M> getVertexMessages(I vertexId) {
return store.getVertexMessages(vertexId);
}
@Override
- public void clearVertexMessages(I vertexId) throws IOException {
+ public void clearVertexMessages(I vertexId) {
store.clearVertexMessages(vertexId);
}
@Override
- public void clearAll() throws IOException {
+ public void clearAll() {
try {
for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
queue.put(SHUTDOWN_QUEUE_MESSAGE);
@@ -147,7 +147,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
@Override
public void addPartitionMessages(
- int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+ int partitionId, VertexIdMessages<I, M> messages) {
int hash = partition2Queue.get(partitionId);
try {
queues[hash].put(new PartitionMessage<>(partitionId, messages));
@@ -167,7 +167,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
}
@Override
- public void clearPartition(int partitionId) throws IOException {
+ public void clearPartition(int partitionId) {
store.clearPartition(partitionId);
}
@@ -232,7 +232,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
return;
}
}
- } catch (IOException | InterruptedException e) {
+ } catch (InterruptedException e) {
LOG.error("MessageStoreQueueWorker.run: " + message, e);
return;
}