You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/05/02 23:51:34 UTC
git commit: updated refs/heads/trunk to dd47ce6
Repository: giraph
Updated Branches:
refs/heads/trunk f732f300f -> dd47ce6ab
GIRAPH-891: Make MessageStoreFactory configurable (rohankarwa via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/dd47ce6a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/dd47ce6a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/dd47ce6a
Branch: refs/heads/trunk
Commit: dd47ce6abdf01adc3584d6e7e0d41c6749d02d30
Parents: f732f30
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri May 2 14:50:22 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri May 2 14:50:22 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../ByteArrayMessagesPerVertexStore.java | 28 ++++--
.../messages/InMemoryMessageStoreFactory.java | 26 ++++--
.../comm/messages/MessageStoreFactory.java | 22 +++++
.../comm/messages/OneMessagePerVertexStore.java | 28 ++++--
.../out_of_core/DiskBackedMessageStore.java | 51 +---------
.../DiskBackedMessageStoreFactory.java | 97 ++++++++++++++++++++
.../PartitionDiskBackedMessageStore.java | 27 +++++-
.../out_of_core/SequentialFileMessageStore.java | 46 +++++++---
.../giraph/comm/netty/NettyWorkerServer.java | 34 ++-----
.../org/apache/giraph/conf/GiraphConstants.java | 13 ++-
.../giraph/partition/SimplePartition.java | 42 ++++++---
.../apache/giraph/graph/TestVertexAndEdges.java | 2 +
.../giraph/jython/TestJythonComputation.java | 19 +++-
14 files changed, 308 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index a3412bb..2b4db33 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-891: Make MessageStoreFactory configurable (rohankarwa via majakabiljo)
+
GIRAPH-825: Fix DiskBackedPartitionStore to work with current trunk (armax00 via claudio)
GIRAPH-864: 'mvn clean test' fails for rexster (armax00 via claudio)
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 2381078..e8b3b30 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -18,25 +18,25 @@
package org.apache.giraph.comm.messages;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataInput;
import org.apache.giraph.utils.RepresentativeByteArrayIterator;
-import org.apache.giraph.utils.VertexIdIterator;
import org.apache.giraph.utils.VerboseByteArrayMessageWrite;
+import org.apache.giraph.utils.VertexIdIterator;
import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import com.google.common.collect.Iterators;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-
/**
* Implementation of {@link SimpleMessageStore} where multiple messages are
* stored per vertex as byte arrays. Used when there is no combiner provided.
@@ -202,9 +202,9 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
private static class Factory<I extends WritableComparable, M extends Writable>
implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Service worker */
- private final CentralizedServiceWorker<I, ?, ?> service;
+ private CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+ private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
/**
* @param service Worker service
@@ -222,5 +222,17 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
return new ByteArrayMessagesPerVertexStore<I, M>(messageValueFactory,
service, config);
}
+
+ @Override
+ public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+ this.service = service;
+ this.config = conf;
+ }
+
+ @Override
+ public boolean shouldTraverseMessagesInOrder() {
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 22a41cd..f691d3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -51,18 +51,14 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
Logger.getLogger(InMemoryMessageStoreFactory.class);
/** Service worker */
- private final CentralizedServiceWorker<I, ?, ?> service;
+ private CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
+ private ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
/**
- * @param service Worker service
- * @param conf Configuration
+ * Default constructor allowing class invocation via Reflection.
*/
- public InMemoryMessageStoreFactory(CentralizedServiceWorker<I, ?, ?> service,
- ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
- this.service = service;
- this.conf = conf;
+ public InMemoryMessageStoreFactory() {
}
@Override
@@ -113,6 +109,18 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
(conf.useMessageCombiner() ? " message combiner " +
conf.getMessageCombinerClass() : " no combiner"));
}
- return (MessageStore<I, M>) messageStore;
+ return messageStore;
+ }
+
+ @Override
+ public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+ this.service = service;
+ this.conf = conf;
+ }
+
+ @Override
+ public boolean shouldTraverseMessagesInOrder() {
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index f582ea2..6149a9c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -18,6 +18,8 @@
package org.apache.giraph.comm.messages;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -42,4 +44,24 @@ public interface MessageStoreFactory<I extends WritableComparable,
* @return New message store
*/
MS newStore(MessageValueFactory<M> messageValueFactory);
+
+ /**
+ * Implementation class should use this method of initialization
+ * of any required internal state.
+ *
+ * @param service Service to get partition mappings
+ * @param conf Configuration
+ */
+ void initialize(CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf);
+
+ /**
+ * This method is more for the performance optimization. If the message
+ * traversal would be done in order then data structure which is optimized
+ * for such traversal can be used.
+ *
+ * @return true if the messages would be traversed in order
+ * else return false
+ */
+ boolean shouldTraverseMessagesInOrder();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index acf68ea..bb581c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -18,6 +18,12 @@
package org.apache.giraph.comm.messages;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -26,12 +32,6 @@ import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentMap;
-
/**
* Implementation of {@link SimpleMessageStore} where we have a single
* message per vertex.
@@ -140,9 +140,9 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
M extends Writable>
implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Service worker */
- private final CentralizedServiceWorker<I, ?, ?> service;
+ private CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+ private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
/**
* @param service Worker service
@@ -160,5 +160,17 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
return new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
config.<M>createMessageCombiner(), config);
}
+
+ @Override
+ public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+ this.service = service;
+ this.config = conf;
+ }
+
+ @Override
+ public boolean shouldTraverseMessagesInOrder() {
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 346e3b3..1a76306 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -19,6 +19,7 @@
package org.apache.giraph.comm.messages.out_of_core;
import com.google.common.collect.Maps;
+
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
@@ -27,7 +28,6 @@ import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.EmptyIterable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -273,54 +273,13 @@ public class DiskBackedMessageStore<I extends WritableComparable,
public static <I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable>
MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
- CentralizedServiceWorker<I, V, E> service,
+ CentralizedServiceWorker<I, V, E> service,
int maxMessagesInMemory,
MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
fileStoreFactory) {
- return new Factory<I, V, E, M>(service, maxMessagesInMemory,
+ return new DiskBackedMessageStoreFactory<I, V, E, M>(service,
+ maxMessagesInMemory,
fileStoreFactory);
}
-
- /**
- * Factory for {@link DiskBackedMessageStore}
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
- private static class Factory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements MessageStoreFactory<I, M, MessageStore<I, M>> {
- /** Service worker */
- private final CentralizedServiceWorker<I, V, E> service;
- /** Number of messages to keep in memory */
- private final int maxMessagesInMemory;
- /** Factory for creating file stores when flushing */
- private final
- MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
- fileStoreFactory;
-
- /**
- * @param service Service worker
- * @param maxMessagesInMemory Number of messages to keep in memory
- * @param fileStoreFactory Factory for creating file stores when
- * flushing
- */
- public Factory(CentralizedServiceWorker<I, V, E> service,
- int maxMessagesInMemory,
- MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
- fileStoreFactory) {
- this.service = service;
- this.maxMessagesInMemory = maxMessagesInMemory;
- this.fileStoreFactory = fileStoreFactory;
- }
-
- @Override
- public MessageStore<I, M> newStore(
- MessageValueFactory<M> messageValueFactory) {
- return new DiskBackedMessageStore<I, V, E, M>(messageValueFactory,
- service, maxMessagesInMemory, fileStoreFactory);
- }
- }
}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
new file mode 100644
index 0000000..f2b31c0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Message store factory which persist the messages on the disk.
+ *
+ * @param <I> vertex id
+ * @param <V> vertex data
+ * @param <E> edge data
+ * @param <M> message data
+ */
+public class DiskBackedMessageStoreFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements MessageStoreFactory<I, M, MessageStore<I, M>> {
+ /** Service worker */
+ private CentralizedServiceWorker<I, V, E> service;
+ /** Number of messages to keep in memory */
+ private int maxMessagesInMemory;
+ /** Factory for creating file stores when flushing */
+ private MessageStoreFactory<I, M,
+ PartitionDiskBackedMessageStore<I, M>> fileStoreFactory;
+
+ /**
+ * Default constructor class helps in class invocation via Reflection
+ */
+ public DiskBackedMessageStoreFactory() {
+ }
+
+ /**
+ * @param service Service worker
+ * @param maxMessagesInMemory Number of messages to keep in memory
+ * @param fileStoreFactory Factory for creating file stores when flushing
+ */
+ public DiskBackedMessageStoreFactory(
+ CentralizedServiceWorker<I, V, E> service,
+ int maxMessagesInMemory,
+ MessageStoreFactory<I, M,
+ PartitionDiskBackedMessageStore<I, M>> fileStoreFactory) {
+ this.service = service;
+ this.maxMessagesInMemory = maxMessagesInMemory;
+ this.fileStoreFactory = fileStoreFactory;
+ }
+
+ @Override
+ public MessageStore<I, M>
+ newStore(MessageValueFactory<M> messageValueFactory) {
+ return new DiskBackedMessageStore<I, V, E, M>(messageValueFactory,
+ service, maxMessagesInMemory, fileStoreFactory);
+ }
+
+ @Override
+ public void initialize(CentralizedServiceWorker service,
+ ImmutableClassesGiraphConfiguration conf) {
+ this.maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
+
+ MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>>
+ fileMessageStoreFactory =
+ SequentialFileMessageStore.newFactory(conf);
+ this.fileStoreFactory =
+ PartitionDiskBackedMessageStore.newFactory(conf,
+ fileMessageStoreFactory);
+
+ this.service = service;
+ }
+
+ @Override
+ public boolean shouldTraverseMessagesInOrder() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
index 7d46d30..bece774 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -18,9 +18,6 @@
package org.apache.giraph.comm.messages.out_of_core;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -34,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.messages.MessagesIterable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -42,6 +40,10 @@ import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
/**
* Message storage with in-memory map of messages and with support for
* flushing all the messages to the disk. Holds messages for a single partition.
@@ -263,7 +265,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
// read destination vertices
int numVertices = in.readInt();
for (int v = 0; v < numVertices; v++) {
- I vertexId = (I) config.createVertexId();
+ I vertexId = config.createVertexId();
vertexId.readFields(in);
destinationVertices.add(vertexId);
}
@@ -343,5 +345,22 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
return new PartitionDiskBackedMessageStore<I, M>(messageValueFactory,
config, fileStoreFactory);
}
+
+ @Override
+ public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+ /* Implementation of this method is required if the class is to
+ * be exposed publicly and allow instantiating the class via the
+ * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is
+ * a private class, hence the implementation of this method is skipped
+ * as the caller knows the specific required constructor parameters
+ * for instantiation.
+ */
+ }
+
+ @Override
+ public boolean shouldTraverseMessagesInOrder() {
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
index 51c05da..5988459 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -18,19 +18,7 @@
package org.apache.giraph.comm.messages.out_of_core;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -49,7 +37,20 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.io.DataInputOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
/**
* Used for writing and reading collection of messages to the disk.
@@ -413,5 +414,22 @@ public class SequentialFileMessageStore<I extends WritableComparable,
return new SequentialFileMessageStore<I, M>(messageValueFactory, config,
bufferSize, fileName);
}
+
+ @Override
+ public void initialize(CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+ /* Implementation of this method is required if the class is to
+ * be exposed publicly and allow instantiating the class via the
+ * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is
+ * a private class, hence the implementation of this method is skipped
+ * as the caller knows the specific required constructor parameters
+ * for instantiation.
+ */
+ }
+
+ @Override
+ public boolean shouldTraverseMessagesInOrder() {
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 4f6c17b..adb96cb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -21,18 +21,15 @@ package org.apache.giraph.comm.netty;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerServer;
-import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
-import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
-import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.partition.Partition;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -46,8 +43,7 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map.Entry;
-import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY;
-import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
/**
* Netty worker server that implement {@link WorkerServer} and contains
@@ -107,24 +103,14 @@ public class NettyWorkerServer<I extends WritableComparable,
*/
private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
createMessageStoreFactory() {
- boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf);
- if (!useOutOfCoreMessaging) {
- return new InMemoryMessageStoreFactory<I, Writable>(service, conf);
- } else {
- int maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
- if (LOG.isInfoEnabled()) {
- LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
- "maxMessagesInMemory = " + maxMessagesInMemory);
- }
- MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>>
- fileStoreFactory = SequentialFileMessageStore.newFactory(conf);
- MessageStoreFactory<I, Writable,
- PartitionDiskBackedMessageStore<I, Writable>>
- partitionStoreFactory =
- PartitionDiskBackedMessageStore.newFactory(conf, fileStoreFactory);
- return DiskBackedMessageStore.newFactory(service,
- maxMessagesInMemory, partitionStoreFactory);
- }
+ Class<? extends MessageStoreFactory> messageStoreFactoryClass =
+ MESSAGE_STORE_FACTORY_CLASS.get(conf);
+
+ MessageStoreFactory messageStoreFactoryInstance =
+ ReflectionUtils.newInstance(messageStoreFactoryClass);
+ messageStoreFactoryInstance.initialize(service, conf);
+
+ return messageStoreFactoryInstance;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index e257b4a..e791d62 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -20,6 +20,8 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.factories.ComputationFactory;
@@ -92,6 +94,13 @@ public interface GiraphConstants {
TypesHolder.class,
"TypesHolder, used if Computation not set - optional");
+ /** Message Store Factory */
+ ClassConfOption<MessageStoreFactory> MESSAGE_STORE_FACTORY_CLASS =
+ ClassConfOption.create("giraph.messageStoreFactoryClass",
+ InMemoryMessageStoreFactory.class,
+ MessageStoreFactory.class,
+ "Message Store Factory Class that is to be used");
+
/** Language user's graph types are implemented in */
PerGraphTypeEnumConfOption<Language> GRAPH_TYPE_LANGUAGES =
PerGraphTypeEnumConfOption.create("giraph.types.language",
@@ -833,10 +842,6 @@ public interface GiraphConstants {
"Comma-separated list of directories in the local file system for " +
"out-of-core messages.");
- /** Whether or not to use out-of-core messages */
- BooleanConfOption USE_OUT_OF_CORE_MESSAGES =
- new BooleanConfOption("giraph.useOutOfCoreMessages", false,
- "Whether or not to use out-of-core messages");
/**
* If using out-of-core messaging, it tells how much messages do we keep
* in memory.
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index 1609846..de2ffd4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -18,15 +18,8 @@
package org.apache.giraph.partition;
-import com.google.common.collect.Maps;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
-import javax.annotation.concurrent.ThreadSafe;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -34,7 +27,18 @@ import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.collect.Maps;
/**
* A simple map-based container that stores vertices. Vertex ids will map to
@@ -60,7 +64,7 @@ public class SimplePartition<I extends WritableComparable,
@Override
public void initialize(int partitionId, Progressable progressable) {
super.initialize(partitionId, progressable);
- if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
+ if (shouldTraverseMessageInOrder()) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>();
} else {
vertexMap = Maps.newConcurrentMap();
@@ -141,7 +145,7 @@ public class SimplePartition<I extends WritableComparable,
@Override
public void readFields(DataInput input) throws IOException {
super.readFields(input);
- if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
+ if (shouldTraverseMessageInOrder()) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>();
} else {
vertexMap = Maps.newConcurrentMap();
@@ -173,4 +177,20 @@ public class SimplePartition<I extends WritableComparable,
public Iterator<Vertex<I, V, E>> iterator() {
return vertexMap.values().iterator();
}
+
+ /**
+ * This method specifies if the message store factory, that is been
+ * configured, has requirement of traversing messages in order.
+ *
+ * @return true if the message store factory has specified traversing
+ * messages in ordered, else return false.
+ */
+ private boolean shouldTraverseMessageInOrder() {
+ Class<? extends MessageStoreFactory> messageStoreFactoryClass =
+ MESSAGE_STORE_FACTORY_CLASS.get(getConf());
+
+ MessageStoreFactory messageStoreFactoryInstance =
+ ReflectionUtils.newInstance(messageStoreFactoryClass);
+ return messageStoreFactoryInstance.shouldTraverseMessagesInOrder();
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
index b404646..86d75a3 100644
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
@@ -19,6 +19,8 @@ package org.apache.giraph.graph;
import com.google.common.collect.Lists;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStoreFactory;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.ArrayListEdges;
http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java b/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java
index 5feabaf..49a338c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java
+++ b/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java
@@ -17,7 +17,11 @@
*/
package org.apache.giraph.jython;
+import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStoreFactory;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.GiraphTypes;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.graph.Language;
@@ -37,8 +41,19 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
public class TestJythonComputation {
+
+ @Test
+ public void testCountEdgesDiskBackedMessageStoreFactory() throws Exception {
+ testCountEdges(DiskBackedMessageStoreFactory.class);
+ }
+
@Test
- public void testCountEdges() throws Exception {
+ public void testCountEdgesInMemoryMessageStoreFactory() throws Exception {
+ testCountEdges(InMemoryMessageStoreFactory.class);
+ }
+
+ public void testCountEdges(Class<? extends MessageStoreFactory>
+ messageStoreFactoryClass) throws Exception {
String[] edges = new String[] {
"1 2",
"2 3",
@@ -57,6 +72,8 @@ public class TestJythonComputation {
conf.setOutEdgesClass(ByteArrayEdges.class);
conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.set(conf,
+ messageStoreFactoryClass);
Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
Map<Integer, Integer> values = parseResults(results);