You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:07 UTC
[11/12] GIRAPH-667: Decouple Vertex data and Computation,
make Computation and Combiner classes switchable (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 3cd1175..97c8a35 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -23,6 +23,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.RepresentativeByteArrayIterable;
import org.apache.giraph.utils.RepresentativeByteArrayIterator;
import org.apache.giraph.utils.VertexIdIterator;
@@ -47,13 +48,15 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
/**
* Constructor
*
+ * @param messageClass Message class held in the store
* @param service Service worker
* @param config Hadoop configuration
*/
public ByteArrayMessagesPerVertexStore(
- CentralizedServiceWorker<I, ?, ?, M> service,
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
- super(service, config);
+ Class<M> messageClass,
+ CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+ super(messageClass, service, config);
}
/**
@@ -142,7 +145,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
@Override
protected M createWritable() {
- return config.createMessageValue();
+ return ReflectionUtils.newInstance(messageClass);
}
}
@@ -175,7 +178,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
@Override
protected M createWritable() {
- return config.createMessageValue();
+ return ReflectionUtils.newInstance(messageClass);
}
}
@@ -222,8 +225,8 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
*/
public static <I extends WritableComparable, M extends Writable>
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
- CentralizedServiceWorker<I, ?, ?, M> service,
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+ CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
return new Factory<I, M>(service, config);
}
@@ -272,23 +275,23 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
private static class Factory<I extends WritableComparable, M extends Writable>
implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
/** Service worker */
- private final CentralizedServiceWorker<I, ?, ?, M> service;
+ private final CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
/**
* @param service Worker service
* @param config Hadoop configuration
*/
- public Factory(CentralizedServiceWorker<I, ?, ?, M> service,
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+ public Factory(CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
this.service = service;
this.config = config;
}
@Override
- public MessageStoreByPartition<I, M> newStore() {
- return new ByteArrayMessagesPerVertexStore(service, config);
+ public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+ return new ByteArrayMessagesPerVertexStore(messageClass, service, config);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
index 26abb94..2712edd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.RepresentativeByteArrayIterable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -48,6 +49,8 @@ import org.apache.hadoop.io.WritableComparable;
*/
public class DiskBackedMessageStore<I extends WritableComparable,
M extends Writable> implements FlushableMessageStore<I, M> {
+ /** Message class */
+ private final Class<M> messageClass;
/**
* In-memory message map (must be sorted to insure that the ids are
* ordered)
@@ -55,7 +58,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
inMemoryMessages;
/** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
/** Counter for number of messages in-memory */
private final AtomicInteger numberOfMessagesInMemory;
/** To keep vertex ids which we have messages for */
@@ -71,13 +74,16 @@ public class DiskBackedMessageStore<I extends WritableComparable,
/**
* Constructor.
*
+ * @param messageClass Message class held in the store
* @param config Hadoop configuration
* @param fileStoreFactory Factory for creating file stores when flushing
*/
public DiskBackedMessageStore(
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
+ Class<M> messageClass,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config,
MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+ this.messageClass = messageClass;
this.config = config;
numberOfMessagesInMemory = new AtomicInteger(0);
destinationVertices =
@@ -154,7 +160,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
@Override
protected M createWritable() {
- return config.createMessageValue();
+ return ReflectionUtils.newInstance(messageClass);
}
}
@@ -289,7 +295,8 @@ public class DiskBackedMessageStore<I extends WritableComparable,
} finally {
rwLock.writeLock().unlock();
}
- BasicMessageStore<I, M> fileStore = fileStoreFactory.newStore();
+ BasicMessageStore<I, M> fileStore =
+ fileStoreFactory.newStore(messageClass);
fileStore.addMessages(new TemporaryMessageStore(messagesToFlush));
synchronized (fileStores) {
@@ -351,7 +358,8 @@ public class DiskBackedMessageStore<I extends WritableComparable,
// read file stores
int numFileStores = in.readInt();
for (int s = 0; s < numFileStores; s++) {
- BasicMessageStore<I, M> fileStore = fileStoreFactory.newStore();
+ BasicMessageStore<I, M> fileStore =
+ fileStoreFactory.newStore(messageClass);
fileStore.readFields(in);
fileStores.add(fileStore);
}
@@ -370,7 +378,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
*/
public static <I extends WritableComparable, M extends Writable>
MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config,
MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
return new Factory<I, M>(config, fileStoreFactory);
}
@@ -402,8 +410,9 @@ public class DiskBackedMessageStore<I extends WritableComparable,
}
@Override
- public FlushableMessageStore<I, M> newStore() {
- return new DiskBackedMessageStore<I, M>(config, fileStoreFactory);
+ public FlushableMessageStore<I, M> newStore(Class<M> messageClass) {
+ return new DiskBackedMessageStore<I, M>(messageClass, config,
+ fileStoreFactory);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
index 02bfb1f..4a28949 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
@@ -47,8 +47,10 @@ import java.util.concurrent.ConcurrentMap;
public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> implements
MessageStoreByPartition<I, M> {
+ /** Message class */
+ private final Class<M> messageClass;
/** Service worker */
- private final CentralizedServiceWorker<I, V, E, M> service;
+ private final CentralizedServiceWorker<I, V, E> service;
/** Number of messages to keep in memory */
private final int maxNumberOfMessagesInMemory;
/** Factory for creating file stores when flushing */
@@ -57,18 +59,20 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
/** Map from partition id to its message store */
private final
ConcurrentMap<Integer, FlushableMessageStore<I, M>> partitionMessageStores;
-
/**
+ * @param messageClass Message class held in the store
* @param service Service worker
* @param maxNumberOfMessagesInMemory Number of messages to keep in memory
* @param fileStoreFactory Factory for creating file stores
* when flushing
*/
public DiskBackedMessageStoreByPartition(
- CentralizedServiceWorker<I, V, E, M> service,
+ Class<M> messageClass,
+ CentralizedServiceWorker<I, V, E> service,
int maxNumberOfMessagesInMemory,
MessageStoreFactory<I, M, FlushableMessageStore<I,
M>> fileStoreFactory) {
+ this.messageClass = messageClass;
this.service = service;
this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
this.fileStoreFactory = fileStoreFactory;
@@ -267,7 +271,7 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
if (messageStore != null) {
return messageStore;
}
- messageStore = fileStoreFactory.newStore();
+ messageStore = fileStoreFactory.newStore(messageClass);
FlushableMessageStore<I, M> store =
partitionMessageStores.putIfAbsent(partitionId, messageStore);
return (store == null) ? messageStore : store;
@@ -298,7 +302,8 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
public void readFieldsForPartition(DataInput in,
int partitionId) throws IOException {
if (in.readBoolean()) {
- FlushableMessageStore<I, M> messageStore = fileStoreFactory.newStore();
+ FlushableMessageStore<I, M> messageStore =
+ fileStoreFactory.newStore(messageClass);
messageStore.readFields(in);
partitionMessageStores.put(partitionId, messageStore);
}
@@ -309,7 +314,8 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
int numStores = in.readInt();
for (int s = 0; s < numStores; s++) {
int partitionId = in.readInt();
- FlushableMessageStore<I, M> messageStore = fileStoreFactory.newStore();
+ FlushableMessageStore<I, M> messageStore =
+ fileStoreFactory.newStore(messageClass);
messageStore.readFields(in);
partitionMessageStores.put(partitionId, messageStore);
}
@@ -332,7 +338,7 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
public static <I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable>
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
- CentralizedServiceWorker<I, V, E, M> service,
+ CentralizedServiceWorker<I, V, E> service,
int maxMessagesInMemory,
MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
fileStoreFactory) {
@@ -352,7 +358,7 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
/** Service worker */
- private final CentralizedServiceWorker<I, V, E, M> service;
+ private final CentralizedServiceWorker<I, V, E> service;
/** Number of messages to keep in memory */
private final int maxMessagesInMemory;
/** Factory for creating file stores when flushing */
@@ -365,7 +371,7 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
* @param fileStoreFactory Factory for creating file stores when
* flushing
*/
- public Factory(CentralizedServiceWorker<I, V, E, M> service,
+ public Factory(CentralizedServiceWorker<I, V, E> service,
int maxMessagesInMemory,
MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
fileStoreFactory) {
@@ -375,9 +381,9 @@ public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
}
@Override
- public MessageStoreByPartition<I, M> newStore() {
- return new DiskBackedMessageStoreByPartition<I, V, E, M>(service,
- maxMessagesInMemory, fileStoreFactory);
+ public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+ return new DiskBackedMessageStoreByPartition<I, V, E, M>(messageClass,
+ service, maxMessagesInMemory, fileStoreFactory);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
new file mode 100644
index 0000000..9086d78
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Message store factory which produces message stores which hold all
+ * messages in memory. Depending on whether or not combiner is currently used,
+ * this factory creates {@link OneMessagePerVertexStore} or
+ * {@link ByteArrayMessagesPerVertexStore}
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class InMemoryMessageStoreFactory<I extends WritableComparable,
+ M extends Writable>
+ implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(InMemoryMessageStoreFactory.class);
+
+ /** Service worker */
+ private final CentralizedServiceWorker<I, ?, ?> service;
+ /** Hadoop configuration */
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
+
+ /**
+ * @param service Worker service
+ * @param conf Configuration
+ */
+ public InMemoryMessageStoreFactory(CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+ this.service = service;
+ this.conf = conf;
+ }
+
+ @Override
+ public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+ if (conf.useCombiner()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("newStore: " +
+ "Using OneMessagePerVertexStore with " + conf.getCombinerClass());
+ }
+ return new OneMessagePerVertexStore<I, M>(
+ messageClass, service, conf.<M>createCombiner(), conf);
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("newStore: " +
+ "Using ByteArrayMessagesPerVertexStore since there is no combiner");
+ }
+ return new ByteArrayMessagesPerVertexStore<I, M>(
+ messageClass, service, conf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index 3c13c47..dec9a92 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -31,9 +31,13 @@ import org.apache.hadoop.io.WritableComparable;
public interface MessageStoreFactory<I extends WritableComparable,
M extends Writable, S extends BasicMessageStore<I, M>> {
/**
- * Creates new message store
+ * Creates new message store.
*
+ * Note: Combiner class in Configuration can be changed,
+ * this method should return MessageStore which uses current combiner
+ *
+ * @param messageClass Message class held in the store
* @return New message store
*/
- S newStore();
+ S newStore(Class<M> messageClass);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 7db0266..8710dac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -28,6 +28,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -45,14 +46,17 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
private final Combiner<I, M> combiner;
/**
+ * @param messageClass Message class held in the store
* @param service Service worker
* @param combiner Combiner for messages
* @param config Hadoop configuration
*/
- OneMessagePerVertexStore(CentralizedServiceWorker<I, ?, ?, M> service,
+ OneMessagePerVertexStore(
+ Class<M> messageClass,
+ CentralizedServiceWorker<I, ?, ?> service,
Combiner<I, M> combiner,
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
- super(service, config);
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+ super(messageClass, service, config);
this.combiner = combiner;
}
@@ -140,7 +144,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
@Override
protected M readFieldsForMessages(DataInput in) throws IOException {
- M message = config.createMessageValue();
+ M message = ReflectionUtils.newInstance(messageClass);
message.readFields(in);
return message;
}
@@ -177,8 +181,8 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
*/
public static <I extends WritableComparable, M extends Writable>
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
- CentralizedServiceWorker<I, ?, ?, M> service,
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+ CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
return new Factory<I, M>(service, config);
}
@@ -192,26 +196,24 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
M extends Writable>
implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
/** Service worker */
- private final CentralizedServiceWorker<I, ?, ?, M> service;
+ private final CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
- /** Combiner for messages */
- private final Combiner<I, M> combiner;
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
/**
* @param service Worker service
* @param config Hadoop configuration
*/
- public Factory(CentralizedServiceWorker<I, ?, ?, M> service,
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+ public Factory(CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
this.service = service;
this.config = config;
- combiner = config.createCombiner();
}
@Override
- public MessageStoreByPartition<I, M> newStore() {
- return new OneMessagePerVertexStore<I, M>(service, combiner, config);
+ public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+ return new OneMessagePerVertexStore<I, M>(messageClass, service,
+ config.<M>createCombiner(), config);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
index 3fe4430..f0a8f6d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.messages;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -63,10 +64,12 @@ public class SequentialFileMessageStore<I extends WritableComparable,
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SequentialFileMessageStore.class);
+ /** Message class */
+ private final Class<M> messageClass;
/** File in which we store data */
private final File file;
/** Configuration which we need for reading data */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
/** Buffer size to use when reading and writing files */
private final int bufferSize;
/** File input stream */
@@ -79,15 +82,18 @@ public class SequentialFileMessageStore<I extends WritableComparable,
/**
* Stores message on the disk.
*
- * @param config Configuration used later for reading
- * @param bufferSize Buffer size to use when reading and writing
- * @param fileName File in which we want to store messages
+ * @param messageClass Message class held in the store
+ * @param config Configuration used later for reading
+ * @param bufferSize Buffer size to use when reading and writing
+ * @param fileName File in which we want to store messages
* @throws IOException
*/
public SequentialFileMessageStore(
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
+ Class<M> messageClass,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config,
int bufferSize,
String fileName) {
+ this.messageClass = messageClass;
this.config = config;
this.bufferSize = bufferSize;
file = new File(fileName);
@@ -295,7 +301,7 @@ public class SequentialFileMessageStore<I extends WritableComparable,
int messagesSize = in.readInt();
List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
for (int i = 0; i < messagesSize; i++) {
- M message = config.createMessageValue();
+ M message = ReflectionUtils.newInstance(messageClass);
try {
message.readFields(in);
} catch (IOException e) {
@@ -394,12 +400,12 @@ public class SequentialFileMessageStore<I extends WritableComparable,
}
@Override
- public BasicMessageStore<I, M> newStore() {
+ public BasicMessageStore<I, M> newStore(Class<M> messageClass) {
int idx = Math.abs(storeCounter.getAndIncrement());
String fileName =
directories[idx % directories.length] + "messages-" + idx;
- return new SequentialFileMessageStore<I, M>(config, bufferSize,
- fileName);
+ return new SequentialFileMessageStore<I, M>(messageClass, config,
+ bufferSize, fileName);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index fc6560d..1a91dfb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -45,22 +45,27 @@ import org.apache.hadoop.io.WritableComparable;
*/
public abstract class SimpleMessageStore<I extends WritableComparable,
M extends Writable, T> implements MessageStoreByPartition<I, M> {
+ /** Message class */
+ protected final Class<M> messageClass;
/** Service worker */
- protected final CentralizedServiceWorker<I, ?, ?, M> service;
+ protected final CentralizedServiceWorker<I, ?, ?> service;
/** Map from partition id to map from vertex id to messages for that vertex */
protected final ConcurrentMap<Integer, ConcurrentMap<I, T>> map;
/** Giraph configuration */
- protected final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+ protected final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
/**
* Constructor
*
+ * @param messageClass Message class held in the store
* @param service Service worker
* @param config Giraph configuration
*/
public SimpleMessageStore(
- CentralizedServiceWorker<I, ?, ?, M> service,
- ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+ Class<M> messageClass,
+ CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+ this.messageClass = messageClass;
this.service = service;
this.config = config;
map = new MapMaker().concurrencyLevel(
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index 319f41a..c982209 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -39,7 +39,7 @@ public class NettyMasterClient implements MasterClient {
/** Netty client that does the actual I/O */
private final NettyClient nettyClient;
/** Worker information for current superstep */
- private CentralizedServiceMaster<?, ?, ?, ?> service;
+ private CentralizedServiceMaster<?, ?, ?> service;
/** Cached map of partition ids to serialized aggregator data */
private final SendAggregatorCache sendAggregatorCache =
new SendAggregatorCache();
@@ -57,7 +57,7 @@ public class NettyMasterClient implements MasterClient {
*/
public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration configuration,
- CentralizedServiceMaster<?, ?, ?, ?> service) {
+ CentralizedServiceMaster<?, ?, ?> service) {
this.nettyClient =
new NettyClient(context, configuration, service.getMasterInfo());
this.service = service;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
index 1f04bcf..cb36c3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
@@ -41,7 +41,7 @@ public class NettyMasterServer implements MasterServer {
* @param progressable Progressable for reporting progress
*/
public NettyMasterServer(ImmutableClassesGiraphConfiguration conf,
- CentralizedServiceMaster<?, ?, ?, ?> service,
+ CentralizedServiceMaster<?, ?, ?> service,
Progressable progressable) {
nettyServer = new NettyServer(conf,
new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
index d1cce64..798e09c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
@@ -41,9 +41,9 @@ public class NettyWorkerAggregatorRequestProcessor
/** Progressable used to report progress */
private final Progressable progressable;
/** NettyClient that could be shared among one or more instances */
- private final WorkerClient<?, ?, ?, ?> workerClient;
+ private final WorkerClient<?, ?, ?> workerClient;
/** Service worker */
- private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
+ private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
/** Cached map of partition ids to serialized aggregator data */
private final SendAggregatedValueCache sendAggregatedValueCache =
new SendAggregatedValueCache();
@@ -59,8 +59,8 @@ public class NettyWorkerAggregatorRequestProcessor
*/
public NettyWorkerAggregatorRequestProcessor(
Progressable progressable,
- ImmutableClassesGiraphConfiguration<?, ?, ?, ?> configuration,
- CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
+ ImmutableClassesGiraphConfiguration<?, ?, ?> configuration,
+ CentralizedServiceWorker<?, ?, ?> serviceWorker) {
this.serviceWorker = serviceWorker;
this.workerClient = serviceWorker.getWorkerClient();
this.progressable = progressable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index 9c09524..28f3656 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -50,20 +50,19 @@ import java.util.Map;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class NettyWorkerClient<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- WorkerClient<I, V, E, M>, ResetSuperstepMetricsObserver {
+ V extends Writable, E extends Writable> implements
+ WorkerClient<I, V, E>, ResetSuperstepMetricsObserver {
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
/** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Netty client that does that actual I/O */
private final NettyClient nettyClient;
/** Centralized service, needed to get vertex ranges */
- private final CentralizedServiceWorker<I, V, E, M> service;
+ private final CentralizedServiceWorker<I, V, E> service;
// Metrics
/** Per-superstep, per-request counters */
@@ -78,8 +77,8 @@ public class NettyWorkerClient<I extends WritableComparable,
*/
public NettyWorkerClient(
Mapper<?, ?, ?, ?>.Context context,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- CentralizedServiceWorker<I, V, E, M> service) {
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+ CentralizedServiceWorker<I, V, E> service) {
this.nettyClient =
new NettyClient(context, configuration, service.getWorkerInfo());
this.conf = configuration;
@@ -111,7 +110,7 @@ public class NettyWorkerClient<I extends WritableComparable,
metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_WORKER_REQUESTS));
}
- public CentralizedServiceWorker<I, V, E, M> getService() {
+ public CentralizedServiceWorker<I, V, E> getService() {
return service;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index db4ff5d..06965ca 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -71,26 +71,25 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_MUTATIONS_PER_REQUEST;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("unchecked")
public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- WorkerClientRequestProcessor<I, V, E, M> {
+ V extends Writable, E extends Writable> implements
+ WorkerClientRequestProcessor<I, V, E> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(NettyWorkerClientRequestProcessor.class);
/** Cached partitions of vertices to send */
- private final SendPartitionCache<I, V, E, M> sendPartitionCache;
+ private final SendPartitionCache<I, V, E> sendPartitionCache;
/** Cached map of partitions to vertex indices to messages */
- private final SendMessageCache<I, M> sendMessageCache;
+ private final SendMessageCache<I, Writable> sendMessageCache;
/** Cache of edges to be sent. */
private final SendEdgeCache<I, E> sendEdgeCache;
/** Cached map of partitions to vertex indices to mutations */
- private final SendMutationsCache<I, V, E, M> sendMutationsCache =
- new SendMutationsCache<I, V, E, M>();
+ private final SendMutationsCache<I, V, E> sendMutationsCache =
+ new SendMutationsCache<I, V, E>();
/** NettyClient that could be shared among one or more instances */
- private final WorkerClient<I, V, E, M> workerClient;
+ private final WorkerClient<I, V, E> workerClient;
/** Messages sent during the last superstep */
private long totalMsgsSentInSuperstep = 0;
/** Maximum size of messages per remote worker to cache before sending */
@@ -100,11 +99,11 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
/** Maximum number of mutations per partition before sending */
private final int maxMutationsPerPartition;
/** Giraph configuration */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** Service worker */
- private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
+ private final CentralizedServiceWorker<I, V, E> serviceWorker;
/** Server data from the server (used for local requests) */
- private final ServerData<I, V, E, M> serverData;
+ private final ServerData<I, V, E> serverData;
// Per-Superstep Metrics
/** Number of requests that went on the wire */
@@ -121,13 +120,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
*/
public NettyWorkerClientRequestProcessor(
Mapper<?, ?, ?, ?>.Context context,
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
- CentralizedServiceWorker<I, V, E, M> serviceWorker) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
+ CentralizedServiceWorker<I, V, E> serviceWorker) {
this.workerClient = serviceWorker.getWorkerClient();
this.configuration = conf;
- sendPartitionCache = new SendPartitionCache<I, V, E, M>(context, conf);
- sendMessageCache = new SendMessageCache<I, M>(conf, serviceWorker);
+ sendPartitionCache = new SendPartitionCache<I, V, E>(context, conf);
+ sendMessageCache = new SendMessageCache<I, Writable>(conf, serviceWorker);
sendEdgeCache = new SendEdgeCache<I, E>(conf, serviceWorker);
maxMessagesSizePerWorker = MAX_MSG_REQUEST_SIZE.get(conf);
maxEdgesSizePerWorker = MAX_EDGE_REQUEST_SIZE.get(conf);
@@ -160,7 +159,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
}
@Override
- public boolean sendMessageRequest(I destVertexId, M message) {
+ public boolean sendMessageRequest(I destVertexId, Writable message) {
PartitionOwner owner =
serviceWorker.getVertexPartitionOwner(destVertexId);
WorkerInfo workerInfo = owner.getWorkerInfo();
@@ -178,11 +177,11 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
// Send a request if the cache of outgoing message to
// the remote worker 'workerInfo' is full enough to be flushed
if (workerMessageSize >= maxMessagesSizePerWorker) {
- PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+ PairList<Integer, ByteArrayVertexIdMessages<I, Writable>>
workerMessages =
sendMessageCache.removeWorkerMessages(workerInfo);
WritableRequest writableRequest =
- new SendWorkerMessagesRequest<I, M>(workerMessages);
+ new SendWorkerMessagesRequest<I, Writable>(workerMessages);
doRequest(workerInfo, writableRequest);
return true;
}
@@ -192,14 +191,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
@Override
public void sendPartitionRequest(WorkerInfo workerInfo,
- Partition<I, V, E, M> partition) {
+ Partition<I, V, E> partition) {
if (LOG.isTraceEnabled()) {
LOG.trace("sendVertexRequest: Sending to " + workerInfo +
", with partition " + partition);
}
- WritableRequest vertexRequest =
- new SendVertexRequest<I, V, E, M>(partition);
+ WritableRequest vertexRequest = new SendVertexRequest<I, V, E>(partition);
doRequest(workerInfo, vertexRequest);
// Messages are stored separately
@@ -215,12 +213,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
* @param partition Partition whose messages to send
*/
private void sendPartitionMessages(WorkerInfo workerInfo,
- Partition<I, V, E, M> partition) {
+ Partition<I, V, E> partition) {
final int partitionId = partition.getId();
- MessageStoreByPartition<I, M> messageStore =
+ MessageStoreByPartition<I, Writable> messageStore =
serverData.getCurrentMessageStore();
- ByteArrayVertexIdMessages<I, M> vertexIdMessages =
- new ByteArrayVertexIdMessages<I, M>();
+ ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
+ new ByteArrayVertexIdMessages<I, Writable>(
+ configuration.getOutgoingMessageValueClass());
vertexIdMessages.setConf(configuration);
vertexIdMessages.initialize();
for (I vertexId :
@@ -228,8 +227,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
try {
// Messages cannot be re-used from this iterable, but add()
// serializes the message, making this safe
- Iterable<M> messages = messageStore.getVertexMessages(vertexId);
- for (M message : messages) {
+ Iterable<Writable> messages = messageStore.getVertexMessages(vertexId);
+ for (Writable message : messages) {
vertexIdMessages.add(vertexId, message);
}
} catch (IOException e) {
@@ -238,18 +237,19 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
}
if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
WritableRequest messagesRequest = new
- SendPartitionCurrentMessagesRequest<I, V, E, M>(
+ SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
partitionId, vertexIdMessages);
doRequest(workerInfo, messagesRequest);
vertexIdMessages =
- new ByteArrayVertexIdMessages<I, M>();
+ new ByteArrayVertexIdMessages<I, Writable>(
+ configuration.getOutgoingMessageValueClass());
vertexIdMessages.setConf(configuration);
vertexIdMessages.initialize();
}
}
if (!vertexIdMessages.isEmpty()) {
WritableRequest messagesRequest = new
- SendPartitionCurrentMessagesRequest<I, V, E, M>(
+ SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
partitionId, vertexIdMessages);
doRequest(workerInfo, messagesRequest);
}
@@ -257,8 +257,8 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
@Override
public void sendVertexRequest(PartitionOwner partitionOwner,
- Vertex<I, V, E, M> vertex) {
- Partition<I, V, E, M> partition =
+ Vertex<I, V, E> vertex) {
+ Partition<I, V, E> partition =
sendPartitionCache.addVertex(partitionOwner, vertex);
if (partition == null) {
return;
@@ -329,10 +329,10 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
int partitionMutationCount) {
// Send a request if enough mutations are there for a partition
if (partitionMutationCount >= maxMutationsPerPartition) {
- Map<I, VertexMutations<I, V, E, M>> partitionMutations =
+ Map<I, VertexMutations<I, V, E>> partitionMutations =
sendMutationsCache.removePartitionMutations(partitionId);
WritableRequest writableRequest =
- new SendPartitionMutationsRequest<I, V, E, M>(
+ new SendPartitionMutationsRequest<I, V, E>(
partitionId, partitionMutations);
doRequest(partitionOwner.getWorkerInfo(), writableRequest);
}
@@ -360,7 +360,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
}
@Override
- public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
+ public void addVertexRequest(Vertex<I, V, E> vertex) throws IOException {
PartitionOwner partitionOwner =
serviceWorker.getVertexPartitionOwner(vertex.getId());
int partitionId = partitionOwner.getPartitionId();
@@ -398,7 +398,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
@Override
public void flush() throws IOException {
// Execute the remaining send partitions (if any)
- for (Map.Entry<PartitionOwner, Partition<I, V, E, M>> entry :
+ for (Map.Entry<PartitionOwner, Partition<I, V, E>> entry :
sendPartitionCache.getOwnerPartitionMap().entrySet()) {
sendPartitionRequest(entry.getKey().getWorkerInfo(), entry.getValue());
}
@@ -406,15 +406,15 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
// Execute the remaining sends messages (if any)
PairList<WorkerInfo, PairList<Integer,
- ByteArrayVertexIdMessages<I, M>>>
+ ByteArrayVertexIdMessages<I, Writable>>>
remainingMessageCache = sendMessageCache.removeAllMessages();
PairList<WorkerInfo,
- PairList<Integer, ByteArrayVertexIdMessages<I, M>>>.Iterator
+ PairList<Integer, ByteArrayVertexIdMessages<I, Writable>>>.Iterator
iterator = remainingMessageCache.getIterator();
while (iterator.hasNext()) {
iterator.next();
WritableRequest writableRequest =
- new SendWorkerMessagesRequest<I, M>(
+ new SendWorkerMessagesRequest<I, Writable>(
iterator.getCurrentSecond());
doRequest(iterator.getCurrentFirst(), writableRequest);
}
@@ -435,12 +435,12 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
}
// Execute the remaining sends mutations (if any)
- Map<Integer, Map<I, VertexMutations<I, V, E, M>>> remainingMutationsCache =
+ Map<Integer, Map<I, VertexMutations<I, V, E>>> remainingMutationsCache =
sendMutationsCache.removeAllPartitionMutations();
- for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry :
+ for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E>>> entry :
remainingMutationsCache.entrySet()) {
WritableRequest writableRequest =
- new SendPartitionMutationsRequest<I, V, E, M>(
+ new SendPartitionMutationsRequest<I, V, E>(
entry.getKey(), entry.getValue());
PartitionOwner partitionOwner =
serviceWorker.getVertexPartitionOwner(
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index ed0861e..b457038 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -22,17 +22,15 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerServer;
import org.apache.giraph.comm.messages.BasicMessageStore;
-import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
import org.apache.giraph.comm.messages.DiskBackedMessageStore;
import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.OneMessagePerVertexStore;
import org.apache.giraph.comm.messages.SequentialFileMessageStore;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexResolver;
@@ -60,23 +58,24 @@ import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class NettyWorkerServer<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements WorkerServer<I, V, E, M> {
+ V extends Writable, E extends Writable>
+ implements WorkerServer<I, V, E> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(NettyWorkerServer.class);
/** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Service worker */
- private final CentralizedServiceWorker<I, V, E, M> service;
+ private final CentralizedServiceWorker<I, V, E> service;
/** Netty server that does that actual I/O */
private final NettyServer nettyServer;
/** Server data storage */
- private final ServerData<I, V, E, M> serverData;
+ private final ServerData<I, V, E> serverData;
+ /** Mapper context */
+ private final Mapper<?, ?, ?, ?>.Context context;
/**
* Constructor to start the server.
@@ -85,18 +84,19 @@ public class NettyWorkerServer<I extends WritableComparable,
* @param service Service to get partition mappings
* @param context Mapper context
*/
- public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
- CentralizedServiceWorker<I, V, E, M> service,
+ public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E> conf,
+ CentralizedServiceWorker<I, V, E> service,
Mapper<?, ?, ?, ?>.Context context) {
this.conf = conf;
this.service = service;
+ this.context = context;
serverData =
- new ServerData<I, V, E, M>(service, conf, createMessageStoreFactory(),
+ new ServerData<I, V, E>(service, conf, createMessageStoreFactory(),
context);
nettyServer = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData),
+ new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
service.getWorkerInfo(), context);
nettyServer.start();
}
@@ -107,33 +107,20 @@ public class NettyWorkerServer<I extends WritableComparable,
*
* @return Message store factory
*/
- private MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+ private MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
createMessageStoreFactory() {
boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf);
if (!useOutOfCoreMessaging) {
- if (conf.useCombiner()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("createMessageStoreFactory: " +
- "Using OneMessagePerVertexStore since combiner enabled");
- }
- return OneMessagePerVertexStore.newFactory(service, conf);
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("createMessageStoreFactory: " +
- "Using ByteArrayMessagesPerVertexStore " +
- "since there is no combiner");
- }
- return ByteArrayMessagesPerVertexStore.newFactory(service, conf);
- }
+ return new InMemoryMessageStoreFactory<I, Writable>(service, conf);
} else {
int maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf);
if (LOG.isInfoEnabled()) {
LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
"maxMessagesInMemory = " + maxMessagesInMemory);
}
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory =
- SequentialFileMessageStore.newFactory(conf);
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
+ MessageStoreFactory<I, Writable, BasicMessageStore<I, Writable>>
+ fileStoreFactory = SequentialFileMessageStore.newFactory(conf);
+ MessageStoreFactory<I, Writable, FlushableMessageStore<I, Writable>>
partitionStoreFactory =
DiskBackedMessageStore.newFactory(conf, fileStoreFactory);
return DiskBackedMessageStoreByPartition.newFactory(service,
@@ -147,21 +134,19 @@ public class NettyWorkerServer<I extends WritableComparable,
}
@Override
- public void prepareSuperstep(GraphState<I, V, E, M> graphState) {
+ public void prepareSuperstep() {
serverData.prepareSuperstep();
- resolveMutations(graphState);
+ resolveMutations();
}
/**
* Resolve mutation requests.
- *
- * @param graphState Graph state
*/
- private void resolveMutations(GraphState<I, V, E, M> graphState) {
+ private void resolveMutations() {
Multimap<Integer, I> resolveVertexIndices = HashMultimap.create(
service.getPartitionStore().getNumPartitions(), 100);
// Add any mutated vertex indices to be resolved
- for (Entry<I, VertexMutations<I, V, E, M>> e :
+ for (Entry<I, VertexMutations<I, V, E>> e :
serverData.getVertexMutations().entrySet()) {
I vertexId = e.getKey();
Integer partitionId = service.getPartitionId(vertexId);
@@ -176,7 +161,7 @@ public class NettyWorkerServer<I extends WritableComparable,
Iterable<I> destinations = serverData.getCurrentMessageStore().
getPartitionDestinationVertices(partitionId);
if (!Iterables.isEmpty(destinations)) {
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
service.getPartitionStore().getPartition(partitionId);
for (I vertexId : destinations) {
if (partition.getVertex(vertexId) == null) {
@@ -191,18 +176,17 @@ public class NettyWorkerServer<I extends WritableComparable,
}
}
// Resolve all graph mutations
- VertexResolver<I, V, E, M> vertexResolver =
- conf.createVertexResolver(graphState);
+ VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
for (Entry<Integer, Collection<I>> e :
resolveVertexIndices.asMap().entrySet()) {
- Partition<I, V, E, M> partition =
+ Partition<I, V, E> partition =
service.getPartitionStore().getPartition(e.getKey());
for (I vertexIndex : e.getValue()) {
- Vertex<I, V, E, M> originalVertex =
+ Vertex<I, V, E> originalVertex =
partition.getVertex(vertexIndex);
- VertexMutations<I, V, E, M> mutations = null;
- VertexMutations<I, V, E, M> vertexMutations =
+ VertexMutations<I, V, E> mutations = null;
+ VertexMutations<I, V, E> vertexMutations =
serverData.getVertexMutations().get(vertexIndex);
if (vertexMutations != null) {
synchronized (vertexMutations) {
@@ -210,11 +194,11 @@ public class NettyWorkerServer<I extends WritableComparable,
}
serverData.getVertexMutations().remove(vertexIndex);
}
- Vertex<I, V, E, M> vertex = vertexResolver.resolve(
+ Vertex<I, V, E> vertex = vertexResolver.resolve(
vertexIndex, originalVertex, mutations,
serverData.getCurrentMessageStore().
hasMessagesForVertex(vertexIndex));
- graphState.getContext().progress();
+ context.progress();
if (LOG.isDebugEnabled()) {
LOG.debug("resolveMutations: Resolved vertex index " +
@@ -240,7 +224,7 @@ public class NettyWorkerServer<I extends WritableComparable,
}
@Override
- public ServerData<I, V, E, M> getServerData() {
+ public ServerData<I, V, E> getServerData() {
return serverData;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
index b4e7dda..f64c373 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.io.WritableComparable;
*/
public class WorkerRequestServerHandler<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> extends
- RequestServerHandler<WorkerRequest<I, V, E, M>> {
+ RequestServerHandler<WorkerRequest<I, V, E>> {
/** Data that can be accessed for handling requests */
- private final ServerData<I, V, E, M> serverData;
+ private final ServerData<I, V, E> serverData;
/**
* Constructor with external server data
@@ -47,7 +47,7 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
* @param conf Configuration
* @param myTaskInfo Current task info
*/
- public WorkerRequestServerHandler(ServerData<I, V, E, M> serverData,
+ public WorkerRequestServerHandler(ServerData<I, V, E> serverData,
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
TaskInfo myTaskInfo) {
@@ -56,23 +56,23 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
}
@Override
- public void processRequest(WorkerRequest<I, V, E, M> request) {
+ public void processRequest(WorkerRequest<I, V, E> request) {
request.doRequest(serverData);
}
/** Factory for {@link WorkerRequestServerHandler} */
public static class Factory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
+ V extends Writable, E extends Writable> implements
RequestServerHandler.Factory {
/** Data that can be accessed for handling requests */
- private final ServerData<I, V, E, M> serverData;
+ private final ServerData<I, V, E> serverData;
/**
* Constructor
*
* @param serverData Data held by the server
*/
- public Factory(ServerData<I, V, E, M> serverData) {
+ public Factory(ServerData<I, V, E> serverData) {
this.serverData = serverData;
}
@@ -81,8 +81,8 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
TaskInfo myTaskInfo) {
- return new WorkerRequestServerHandler<I, V, E,
- M>(serverData, workerRequestReservedMap, conf, myTaskInfo);
+ return new WorkerRequestServerHandler<I, V, E, Writable>(serverData,
+ workerRequestReservedMap, conf, myTaskInfo);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
index 037f4a0..88641c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.io.WritableComparable;
*/
public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> extends
- WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
+ WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
/** Destination partition for these vertices' messages*/
private int partitionId;
/** Map of destination vertex ID's to message lists */
@@ -67,7 +67,10 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
@Override
public void readFieldsRequest(DataInput input) throws IOException {
partitionId = input.readInt();
- vertexIdMessageMap = new ByteArrayVertexIdMessages<I, M>();
+ // At this moment the Computation class have already been replaced with
+ // the new one, and we deal with messages from previous superstep
+ vertexIdMessageMap = new ByteArrayVertexIdMessages<I, M>(
+ getConf().getIncomingMessageValueClass());
vertexIdMessageMap.setConf(getConf());
vertexIdMessageMap.initialize();
vertexIdMessageMap.readFields(input);
@@ -80,9 +83,9 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
}
@Override
- public void doRequest(ServerData<I, V, E, M> serverData) {
+ public void doRequest(ServerData<I, V, E> serverData) {
try {
- serverData.getCurrentMessageStore().addPartitionMessages(partitionId,
+ serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId,
vertexIdMessageMap);
} catch (IOException e) {
throw new RuntimeException("doRequest: Got IOException ", e);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
index a96842d..de0d098 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
@@ -42,19 +42,18 @@ import java.util.concurrent.ConcurrentHashMap;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class SendPartitionMutationsRequest<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> extends
- WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
+ V extends Writable, E extends Writable> extends
+ WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendPartitionMutationsRequest.class);
/** Partition id */
private int partitionId;
/** Mutations sent for a partition */
- private Map<I, VertexMutations<I, V, E, M>> vertexIdMutations;
+ private Map<I, VertexMutations<I, V, E>> vertexIdMutations;
/**
* Constructor used for reflection only
@@ -69,7 +68,7 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
*/
public SendPartitionMutationsRequest(
int partitionId,
- Map<I, VertexMutations<I, V, E, M>> vertexIdMutations) {
+ Map<I, VertexMutations<I, V, E>> vertexIdMutations) {
this.partitionId = partitionId;
this.vertexIdMutations = vertexIdMutations;
}
@@ -82,8 +81,8 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
for (int i = 0; i < vertexIdMutationsSize; ++i) {
I vertexId = getConf().createVertexId();
vertexId.readFields(input);
- VertexMutations<I, V, E, M> vertexMutations =
- new VertexMutations<I, V, E, M>();
+ VertexMutations<I, V, E> vertexMutations =
+ new VertexMutations<I, V, E>();
vertexMutations.setConf(getConf());
vertexMutations.readFields(input);
if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
@@ -97,7 +96,7 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
public void writeRequest(DataOutput output) throws IOException {
output.writeInt(partitionId);
output.writeInt(vertexIdMutations.size());
- for (Entry<I, VertexMutations<I, V, E, M>> entry :
+ for (Entry<I, VertexMutations<I, V, E>> entry :
vertexIdMutations.entrySet()) {
entry.getKey().write(output);
entry.getValue().write(output);
@@ -110,15 +109,15 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
}
@Override
- public void doRequest(ServerData<I, V, E, M> serverData) {
- ConcurrentHashMap<I, VertexMutations<I, V, E, M>> vertexMutations =
+ public void doRequest(ServerData<I, V, E> serverData) {
+ ConcurrentHashMap<I, VertexMutations<I, V, E>> vertexMutations =
serverData.getVertexMutations();
Histogram verticesInMutationHist = GiraphMetrics.get().perSuperstep()
.getUniformHistogram(MetricNames.VERTICES_IN_MUTATION_REQUEST);
verticesInMutationHist.update(vertexMutations.size());
- for (Entry<I, VertexMutations<I, V, E, M>> entry :
+ for (Entry<I, VertexMutations<I, V, E>> entry :
vertexIdMutations.entrySet()) {
- VertexMutations<I, V, E, M> mutations =
+ VertexMutations<I, V, E> mutations =
vertexMutations.get(entry.getKey());
if (mutations == null) {
mutations = vertexMutations.putIfAbsent(
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
index 1de3cbb..e0cb916 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
@@ -34,17 +34,16 @@ import java.io.IOException;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class SendVertexRequest<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> extends
- WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
+ V extends Writable, E extends Writable> extends
+ WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendVertexRequest.class);
/** Partition */
- private Partition<I, V, E, M> partition;
+ private Partition<I, V, E> partition;
/**
* Constructor used for reflection only
@@ -56,7 +55,7 @@ public class SendVertexRequest<I extends WritableComparable,
*
* @param partition Partition to send the request to
*/
- public SendVertexRequest(Partition<I, V, E, M> partition) {
+ public SendVertexRequest(Partition<I, V, E> partition) {
this.partition = partition;
}
@@ -77,7 +76,7 @@ public class SendVertexRequest<I extends WritableComparable,
}
@Override
- public void doRequest(ServerData<I, V, E, M> serverData) {
+ public void doRequest(ServerData<I, V, E> serverData) {
serverData.getPartitionStore().addPartition(partition);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index 04b633b..f6bf9bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -54,7 +54,8 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
@Override
public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
- return new ByteArrayVertexIdMessages<I, M>();
+ return new ByteArrayVertexIdMessages<I, M>(
+ getConf().getOutgoingMessageValueClass());
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
index 4d9382f..0ceb3eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WorkerRequest.java
@@ -28,14 +28,13 @@ import org.apache.hadoop.io.WritableComparable;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
public interface WorkerRequest<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
/**
* Execute the request
*
* @param serverData Accessible data that can be mutated per the request
*/
- void doRequest(ServerData<I, V, E, M> serverData);
+ void doRequest(ServerData<I, V, E> serverData);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
index fad20b0..181e681 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
@@ -32,12 +32,10 @@ import org.apache.hadoop.io.WritableComparable;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
public abstract class WritableRequest<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements Writable,
- ImmutableClassesGiraphConfigurable<I, V, E, M> {
+ V extends Writable, E extends Writable> implements Writable,
+ ImmutableClassesGiraphConfigurable<I, V, E> {
/**
* Value to use when size of the request in serialized form is not known
* or too expensive to calculate
@@ -45,7 +43,7 @@ public abstract class WritableRequest<I extends WritableComparable,
public static final int UNKNOWN_SIZE = -1;
/** Configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Client id */
private int clientId = -1;
/** Request id */
@@ -103,13 +101,12 @@ public abstract class WritableRequest<I extends WritableComparable,
abstract void writeRequest(DataOutput output) throws IOException;
@Override
- public final ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+ public final ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
return conf;
}
@Override
- public final void setConf(ImmutableClassesGiraphConfiguration<I, V,
- E, M> conf) {
+ public final void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
this.conf = conf;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
index cceaaef..5d150d0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
@@ -24,7 +24,7 @@ import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
/**
* Tracks all of the Giraph options
@@ -75,7 +75,7 @@ public class AllOptions {
public static void main(String[] args) {
// This is necessary to trigger the static constants in GiraphConstants to
// get loaded. Without it we get no output.
- VERTEX_CLASS.toString();
+ COMPUTATION_CLASS.toString();
LOG.info(allOptionsString());
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java b/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java
index 77564ee..c5096fa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/DefaultImmutableClassesGiraphConfigurable.java
@@ -27,22 +27,21 @@ import org.apache.hadoop.io.WritableComparable;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
public class DefaultImmutableClassesGiraphConfigurable<
I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> implements
- ImmutableClassesGiraphConfigurable<I, V, E, M> {
+ E extends Writable> implements
+ ImmutableClassesGiraphConfigurable<I, V, E> {
/** Configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E> conf;
@Override
- public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
this.conf = conf;
}
@Override
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+ public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
return conf;
}
}