You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/06/26 01:22:33 UTC
[1/2] GIRAPH-697: Clean up message stores (majakabiljo)
Updated Branches:
refs/heads/trunk f8a3c777e -> 89445670e
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
new file mode 100644
index 0000000..1cda1d9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Message store which separates data by partitions,
+ * and submits them to underlying message store.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class DiskBackedMessageStore<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> implements
+ MessageStore<I, M> {
+ /** Message class */
+ private final Class<M> messageClass;
+ /** Service worker */
+ private final CentralizedServiceWorker<I, V, E> service;
+ /** Number of messages to keep in memory */
+ private final int maxNumberOfMessagesInMemory;
+ /** Factory for creating file stores when flushing */
+ private final MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+ partitionStoreFactory;
+ /** Map from partition id to its message store */
+ private final ConcurrentMap<Integer, PartitionDiskBackedMessageStore<I, M>>
+ partitionMessageStores;
+
+ /**
+ * @param messageClass Message class held in the store
+ * @param service Service worker
+ * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
+ * @param partitionStoreFactory Factory for creating stores for a
+ * partition
+ */
+ public DiskBackedMessageStore(
+ Class<M> messageClass,
+ CentralizedServiceWorker<I, V, E> service,
+ int maxNumberOfMessagesInMemory,
+ MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I,
+ M>> partitionStoreFactory) {
+ this.messageClass = messageClass;
+ this.service = service;
+ this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
+ this.partitionStoreFactory = partitionStoreFactory;
+ partitionMessageStores = Maps.newConcurrentMap();
+ }
+
+ @Override
+ public void addPartitionMessages(
+ int partitionId,
+ ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+ PartitionDiskBackedMessageStore<I, M> partitionMessageStore =
+ getMessageStore(partitionId);
+ ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+ vertexIdMessageIterator =
+ messages.getVertexIdMessageIterator();
+ while (vertexIdMessageIterator.hasNext()) {
+ vertexIdMessageIterator.next();
+ boolean ownsVertexId =
+ partitionMessageStore.addVertexMessages(
+ vertexIdMessageIterator.getCurrentVertexId(),
+ Collections.singleton(
+ vertexIdMessageIterator.getCurrentMessage()));
+ if (ownsVertexId) {
+ vertexIdMessageIterator.releaseCurrentVertexId();
+ }
+ }
+ checkMemory();
+ }
+
+ @Override
+ public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+ if (hasMessagesForVertex(vertexId)) {
+ return getMessageStore(vertexId).getVertexMessages(vertexId);
+ } else {
+ return EmptyIterable.get();
+ }
+ }
+
+ @Override
+ public boolean hasMessagesForVertex(I vertexId) {
+ return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
+ }
+
+ @Override
+ public Iterable<I> getPartitionDestinationVertices(int partitionId) {
+ PartitionDiskBackedMessageStore<I, M> messageStore =
+ partitionMessageStores.get(partitionId);
+ if (messageStore == null) {
+ return Collections.emptyList();
+ } else {
+ return messageStore.getDestinationVertices();
+ }
+ }
+
+ @Override
+ public void clearVertexMessages(I vertexId) throws IOException {
+ if (hasMessagesForVertex(vertexId)) {
+ getMessageStore(vertexId).clearVertexMessages(vertexId);
+ }
+ }
+
+ @Override
+ public void clearPartition(int partitionId) throws IOException {
+ PartitionDiskBackedMessageStore<I, M> messageStore =
+ partitionMessageStores.get(partitionId);
+ if (messageStore != null) {
+ messageStore.clearAll();
+ }
+ }
+
+ @Override
+ public void clearAll() throws IOException {
+ for (PartitionDiskBackedMessageStore<I, M> messageStore :
+ partitionMessageStores.values()) {
+ messageStore.clearAll();
+ }
+ partitionMessageStores.clear();
+ }
+
+ /**
+ * Checks the memory status, flushes if necessary
+ *
+ * @throws IOException
+ */
+ private void checkMemory() throws IOException {
+ while (memoryFull()) {
+ flushOnePartition();
+ }
+ }
+
+ /**
+ * Check if memory is full
+ *
+ * @return True iff memory is full
+ */
+ private boolean memoryFull() {
+ int totalMessages = 0;
+ for (PartitionDiskBackedMessageStore<I, M> messageStore :
+ partitionMessageStores.values()) {
+ totalMessages += messageStore.getNumberOfMessages();
+ }
+ return totalMessages > maxNumberOfMessagesInMemory;
+ }
+
+ /**
+ * Finds biggest partition and flushes it to the disk
+ *
+ * @throws IOException
+ */
+ private void flushOnePartition() throws IOException {
+ int maxMessages = 0;
+ PartitionDiskBackedMessageStore<I, M> biggestStore = null;
+ for (PartitionDiskBackedMessageStore<I, M> messageStore :
+ partitionMessageStores.values()) {
+ int numMessages = messageStore.getNumberOfMessages();
+ if (numMessages > maxMessages) {
+ maxMessages = numMessages;
+ biggestStore = messageStore;
+ }
+ }
+ if (biggestStore != null) {
+ biggestStore.flush();
+ }
+ }
+
+ /**
+ * Get message store for partition which holds vertex with required vertex
+ * id
+ *
+ * @param vertexId Id of vertex for which we are asking for message store
+ * @return Requested message store
+ */
+ private PartitionDiskBackedMessageStore<I, M> getMessageStore(I vertexId) {
+ int partitionId =
+ service.getVertexPartitionOwner(vertexId).getPartitionId();
+ return getMessageStore(partitionId);
+ }
+
+ /**
+ * Get message store for partition id. It it doesn't exist yet,
+ * creates a new one.
+ *
+ * @param partitionId Id of partition for which we are asking for message
+ * store
+ * @return Requested message store
+ */
+ private PartitionDiskBackedMessageStore<I, M> getMessageStore(
+ int partitionId) {
+ PartitionDiskBackedMessageStore<I, M> messageStore =
+ partitionMessageStores.get(partitionId);
+ if (messageStore != null) {
+ return messageStore;
+ }
+ messageStore = partitionStoreFactory.newStore(messageClass);
+ PartitionDiskBackedMessageStore<I, M> store =
+ partitionMessageStores.putIfAbsent(partitionId, messageStore);
+ return (store == null) ? messageStore : store;
+ }
+
+ @Override
+ public void writePartition(DataOutput out,
+ int partitionId) throws IOException {
+ PartitionDiskBackedMessageStore<I, M> partitionStore =
+ partitionMessageStores.get(partitionId);
+ out.writeBoolean(partitionStore != null);
+ if (partitionStore != null) {
+ partitionStore.write(out);
+ }
+ }
+
+ @Override
+ public void readFieldsForPartition(DataInput in,
+ int partitionId) throws IOException {
+ if (in.readBoolean()) {
+ PartitionDiskBackedMessageStore<I, M> messageStore =
+ partitionStoreFactory.newStore(messageClass);
+ messageStore.readFields(in);
+ partitionMessageStores.put(partitionId, messageStore);
+ }
+ }
+
+
+ /**
+ * Create new factory for this message store
+ *
+ * @param service Service worker
+ * @param maxMessagesInMemory Number of messages to keep in memory
+ * @param fileStoreFactory Factory for creating file stores when
+ * flushing
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @return Factory
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
+ CentralizedServiceWorker<I, V, E> service,
+ int maxMessagesInMemory,
+ MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+ fileStoreFactory) {
+ return new Factory<I, V, E, M>(service, maxMessagesInMemory,
+ fileStoreFactory);
+ }
+
+ /**
+ * Factory for {@link DiskBackedMessageStore}
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+ private static class Factory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements MessageStoreFactory<I, M, MessageStore<I, M>> {
+ /** Service worker */
+ private final CentralizedServiceWorker<I, V, E> service;
+ /** Number of messages to keep in memory */
+ private final int maxMessagesInMemory;
+ /** Factory for creating file stores when flushing */
+ private final
+ MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+ fileStoreFactory;
+
+ /**
+ * @param service Service worker
+ * @param maxMessagesInMemory Number of messages to keep in memory
+ * @param fileStoreFactory Factory for creating file stores when
+ * flushing
+ */
+ public Factory(CentralizedServiceWorker<I, V, E> service,
+ int maxMessagesInMemory,
+ MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>>
+ fileStoreFactory) {
+ this.service = service;
+ this.maxMessagesInMemory = maxMessagesInMemory;
+ this.fileStoreFactory = fileStoreFactory;
+ }
+
+ @Override
+ public MessageStore<I, M> newStore(Class<M> messageClass) {
+ return new DiskBackedMessageStore<I, V, E, M>(messageClass,
+ service, maxMessagesInMemory, fileStoreFactory);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
new file mode 100644
index 0000000..4ae805a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Message storage with in-memory map of messages and with support for
+ * flushing all the messages to the disk. Holds messages for a single partition.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class PartitionDiskBackedMessageStore<I extends WritableComparable,
+ M extends Writable> implements Writable {
+ /** Message class */
+ private final Class<M> messageClass;
+ /**
+ * In-memory message map (must be sorted to insure that the ids are
+ * ordered)
+ */
+ private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
+ inMemoryMessages;
+ /** Hadoop configuration */
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+ /** Counter for number of messages in-memory */
+ private final AtomicInteger numberOfMessagesInMemory;
+ /** To keep vertex ids which we have messages for */
+ private final Set<I> destinationVertices;
+ /** File stores in which we keep flushed messages */
+ private final Collection<SequentialFileMessageStore<I, M>> fileStores;
+ /** Factory for creating file stores when flushing */
+ private final
+ MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> fileStoreFactory;
+ /** Lock for disk flushing */
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
+ /**
+ * Constructor.
+ *
+ * @param messageClass Message class held in the store
+ * @param config Hadoop configuration
+ * @param fileStoreFactory Factory for creating file stores when flushing
+ */
+ public PartitionDiskBackedMessageStore(
+ Class<M> messageClass,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+ MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+ fileStoreFactory) {
+ inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+ this.messageClass = messageClass;
+ this.config = config;
+ numberOfMessagesInMemory = new AtomicInteger(0);
+ destinationVertices =
+ Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
+ fileStores = Lists.newArrayList();
+ this.fileStoreFactory = fileStoreFactory;
+ }
+
+ /**
+ * Add vertex messages
+ *
+ * @param vertexId Vertex id to use
+ * @param messages Messages to add (note that the lifetime of the messages)
+ * is only until next() is called again)
+ * @return True if the vertex id ownership is taken by this method,
+ * false otherwise
+ * @throws IOException
+ */
+ boolean addVertexMessages(I vertexId,
+ Iterable<M> messages) throws IOException {
+ boolean ownsVertexId = false;
+ destinationVertices.add(vertexId);
+ rwLock.readLock().lock();
+ try {
+ ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+ if (extendedDataOutput == null) {
+ ExtendedDataOutput newExtendedDataOutput =
+ config.createExtendedDataOutput();
+ extendedDataOutput =
+ inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
+ if (extendedDataOutput == null) {
+ ownsVertexId = true;
+ extendedDataOutput = newExtendedDataOutput;
+ }
+ }
+
+ synchronized (extendedDataOutput) {
+ for (M message : messages) {
+ message.write(extendedDataOutput);
+ numberOfMessagesInMemory.getAndIncrement();
+ }
+ }
+ } finally {
+ rwLock.readLock().unlock();
+ }
+
+ return ownsVertexId;
+ }
+
+ /**
+ * Get the messages for a vertex.
+ *
+ * @param vertexId Vertex id for which we want to get messages
+ * @return Iterable of messages for a vertex id
+ */
+ public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+ ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
+ if (extendedDataOutput == null) {
+ extendedDataOutput = config.createExtendedDataOutput();
+ }
+ Iterable<M> combinedIterable = new MessagesIterable<M>(
+ config, messageClass,
+ extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+
+ for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
+ combinedIterable = Iterables.concat(combinedIterable,
+ fileStore.getVertexMessages(vertexId));
+ }
+ return combinedIterable;
+ }
+
+ /**
+ * Get number of messages in memory
+ *
+ * @return Number of messages in memory
+ */
+ public int getNumberOfMessages() {
+ return numberOfMessagesInMemory.get();
+ }
+
+ /**
+ * Check if we have messages for some vertex
+ *
+ * @param vertexId Id of vertex which we want to check
+ * @return True iff we have messages for vertex with required id
+ */
+ public boolean hasMessagesForVertex(I vertexId) {
+ return destinationVertices.contains(vertexId);
+ }
+
+ /**
+ * Gets vertex ids which we have messages for
+ *
+ * @return Iterable over vertex ids which we have messages for
+ */
+ public Iterable<I> getDestinationVertices() {
+ return destinationVertices;
+ }
+
+ /**
+ * Clears messages for a vertex.
+ *
+ * @param vertexId Vertex id for which we want to clear messages
+ * @throws IOException
+ */
+ public void clearVertexMessages(I vertexId) throws IOException {
+ inMemoryMessages.remove(vertexId);
+ }
+
+ /**
+ * Clears all resources used by this store.
+ *
+ * @throws IOException
+ */
+ public void clearAll() throws IOException {
+ inMemoryMessages.clear();
+ destinationVertices.clear();
+ for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
+ fileStore.clearAll();
+ }
+ fileStores.clear();
+ }
+
+ /**
+ * Flushes messages to the disk.
+ *
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
+ rwLock.writeLock().lock();
+ try {
+ messagesToFlush = inMemoryMessages;
+ inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+ numberOfMessagesInMemory.set(0);
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ SequentialFileMessageStore<I, M> fileStore =
+ fileStoreFactory.newStore(messageClass);
+ fileStore.addMessages(messagesToFlush);
+
+ synchronized (fileStores) {
+ fileStores.add(fileStore);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // write destination vertices
+ out.writeInt(destinationVertices.size());
+ for (I vertexId : destinationVertices) {
+ vertexId.write(out);
+ }
+
+ // write of in-memory messages
+ out.writeInt(numberOfMessagesInMemory.get());
+
+ // write in-memory messages map
+ out.writeInt(inMemoryMessages.size());
+ for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
+ entry.getKey().write(out);
+ out.writeInt(entry.getValue().getPos());
+ out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
+ }
+
+ // write file stores
+ out.writeInt(fileStores.size());
+ for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
+ fileStore.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // read destination vertices
+ int numVertices = in.readInt();
+ for (int v = 0; v < numVertices; v++) {
+ I vertexId = (I) config.createVertexId();
+ vertexId.readFields(in);
+ destinationVertices.add(vertexId);
+ }
+
+ // read in-memory messages
+ numberOfMessagesInMemory.set(in.readInt());
+
+ // read in-memory map
+ int mapSize = in.readInt();
+ for (int m = 0; m < mapSize; m++) {
+ I vertexId = config.createVertexId();
+ vertexId.readFields(in);
+ int messageBytes = in.readInt();
+ byte[] buf = new byte[messageBytes];
+ ExtendedDataOutput extendedDataOutput =
+ config.createExtendedDataOutput(buf, messageBytes);
+ inMemoryMessages.put(vertexId, extendedDataOutput);
+ }
+
+ // read file stores
+ int numFileStores = in.readInt();
+ for (int s = 0; s < numFileStores; s++) {
+ SequentialFileMessageStore<I, M> fileStore =
+ fileStoreFactory.newStore(messageClass);
+ fileStore.readFields(in);
+ fileStores.add(fileStore);
+ }
+ }
+
+
+ /**
+ * Create new factory for this message store
+ *
+ * @param config Hadoop configuration
+ * @param fileStoreFactory Factory for creating message stores for
+ * partitions
+ * @param <I> Vertex id
+ * @param <M> Message data
+ * @return Factory
+ */
+ public static <I extends WritableComparable, M extends Writable>
+ MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> newFactory(
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+ MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+ fileStoreFactory) {
+ return new Factory<I, M>(config, fileStoreFactory);
+ }
+
+ /**
+ * Factory for {@link PartitionDiskBackedMessageStore}
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+ private static class Factory<I extends WritableComparable,
+ M extends Writable> implements MessageStoreFactory<I, M,
+ PartitionDiskBackedMessageStore<I, M>> {
+ /** Hadoop configuration */
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+ /** Factory for creating message stores for partitions */
+ private final MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+ fileStoreFactory;
+
+ /**
+ * @param config Hadoop configuration
+ * @param fileStoreFactory Factory for creating message stores for
+ * partitions
+ */
+ public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+ MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
+ fileStoreFactory) {
+ this.config = config;
+ this.fileStoreFactory = fileStoreFactory;
+ }
+
+ @Override
+ public PartitionDiskBackedMessageStore<I, M> newStore(
+ Class<M> messageClass) {
+ return new PartitionDiskBackedMessageStore<I, M>(messageClass, config,
+ fileStoreFactory);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
new file mode 100644
index 0000000..e5931c6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.out_of_core;
+
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
+
+/**
+ * Used for writing and reading collection of messages to the disk.
+ * {@link SequentialFileMessageStore#addMessages(NavigableMap)}
+ * should be called only once with the messages we want to store.
+ * <p/>
+ * It's optimized for retrieving messages in the natural order of vertex ids
+ * they are sent to.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class SequentialFileMessageStore<I extends WritableComparable,
+ M extends Writable> implements Writable {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SequentialFileMessageStore.class);
+ /** Message class */
+ private final Class<M> messageClass;
+ /** File in which we store data */
+ private final File file;
+ /** Configuration which we need for reading data */
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+ /** Buffer size to use when reading and writing files */
+ private final int bufferSize;
+ /** File input stream */
+ private DataInputStream in;
+ /** How many vertices do we have left to read in the file */
+ private int verticesLeft;
+ /** Id of currently read vertex */
+ private I currentVertexId;
+
+ /**
+ * Stores message on the disk.
+ *
+ * @param messageClass Message class held in the store
+ * @param config Configuration used later for reading
+ * @param bufferSize Buffer size to use when reading and writing
+ * @param fileName File in which we want to store messages
+ * @throws IOException
+ */
+ public SequentialFileMessageStore(
+ Class<M> messageClass,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config,
+ int bufferSize,
+ String fileName) {
+ this.messageClass = messageClass;
+ this.config = config;
+ this.bufferSize = bufferSize;
+ file = new File(fileName);
+ }
+
+ /**
+ * Adds messages from one message store to another
+ *
+ * @param messageMap Add the messages from this map to this store
+ * @throws java.io.IOException
+ */
+ public void addMessages(NavigableMap<I, ExtendedDataOutput> messageMap)
+ throws IOException {
+ // Writes messages to its file
+ if (file.exists()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addMessages: Deleting " + file);
+ }
+ file.delete();
+ }
+ file.createNewFile();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addMessages: Creating " + file);
+ }
+
+ DataOutputStream out = null;
+
+ try {
+ out = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file), bufferSize));
+ int destinationVertexIdCount = messageMap.size();
+ out.writeInt(destinationVertexIdCount);
+
+ // Dump the vertices and their messages in a sorted order
+ for (Map.Entry<I, ExtendedDataOutput> entry : messageMap.entrySet()) {
+ I destinationVertexId = entry.getKey();
+ destinationVertexId.write(out);
+ ExtendedDataOutput extendedDataOutput = entry.getValue();
+ Iterable<M> messages = new MessagesIterable<M>(
+ config, messageClass, extendedDataOutput.getByteArray(), 0,
+ extendedDataOutput.getPos());
+ int messageCount = Iterables.size(messages);
+ out.writeInt(messageCount);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addMessages: For vertex id " + destinationVertexId +
+ ", messages = " + messageCount + " to file " + file);
+ }
+ for (M message : messages) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addMessages: Wrote " + message + " to " + file);
+ }
+ message.write(out);
+ }
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ /**
+ * Reads messages for a vertex. It will find the messages only if all
+ * previous reads used smaller vertex ids than this one - messages should
+ * be retrieved in increasing order of vertex ids.
+ *
+ * @param vertexId Vertex id for which we want to get messages
+ * @return Messages for the selected vertex, or empty list if not used
+ * correctly
+ * @throws IOException
+ */
+ public Iterable<M> getVertexMessages(I vertexId) throws
+ IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
+ " (currently " + currentVertexId + ") from " + file);
+ }
+ if (in == null) {
+ startReading();
+ }
+
+ I nextVertexId = getCurrentVertexId();
+ while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
+ nextVertexId = getNextVertexId();
+ }
+
+ if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
+ return EmptyIterable.get();
+ }
+
+ return readMessagesForCurrentVertex();
+ }
+
+ /**
+ * Clears all resources used by this store.
+ */
+ public void clearAll() throws IOException {
+ endReading();
+ file.delete();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(file.length());
+ FileInputStream input = new FileInputStream(file);
+ try {
+ byte[] buffer = new byte[bufferSize];
+ while (true) {
+ int length = input.read(buffer);
+ if (length < 0) {
+ break;
+ }
+ out.write(buffer, 0, length);
+ }
+ } finally {
+ input.close();
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ FileOutputStream output = new FileOutputStream(file);
+ try {
+ long fileLength = in.readLong();
+ byte[] buffer = new byte[bufferSize];
+ for (long position = 0; position < fileLength; position += bufferSize) {
+ int bytes = (int) Math.min(bufferSize, fileLength - position);
+ in.readFully(buffer, 0, bytes);
+ output.write(buffer);
+ }
+ } finally {
+ output.close();
+ }
+ }
+
+ /**
+ * Prepare for reading
+ *
+ * @throws IOException
+ */
+ private void startReading() throws IOException {
+ currentVertexId = null;
+ in = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(file), bufferSize));
+ verticesLeft = in.readInt();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("startReading: File " + file + " with " +
+ verticesLeft + " vertices left");
+ }
+ }
+
+ /**
+ * Gets current vertex id.
+ * <p/>
+ * If there is a vertex id whose messages haven't been read yet it
+ * will return that vertex id, otherwise it will read and return the next
+ * one.
+ *
+ * @return Current vertex id
+ * @throws IOException
+ */
+ private I getCurrentVertexId() throws IOException {
+ if (currentVertexId != null) {
+ return currentVertexId;
+ } else {
+ return getNextVertexId();
+ }
+ }
+
+ /**
+ * Gets next vertex id.
+ * <p/>
+ * If there is a vertex whose messages haven't been read yet it
+ * will read and skip over its messages to get to the next vertex.
+ *
+ * @return Next vertex id
+ * @throws IOException
+ */
+ private I getNextVertexId() throws IOException {
+ if (currentVertexId != null) {
+ readMessagesForCurrentVertex();
+ }
+ if (verticesLeft == 0) {
+ return null;
+ }
+ currentVertexId = config.createVertexId();
+ currentVertexId.readFields(in);
+ return currentVertexId;
+ }
+
+ /**
+ * Reads messages for current vertex.
+ *
+ * @return Messages for current vertex
+ * @throws IOException
+ */
+ private Collection<M> readMessagesForCurrentVertex() throws IOException {
+ int messagesSize = in.readInt();
+ List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
+ for (int i = 0; i < messagesSize; i++) {
+ M message = ReflectionUtils.newInstance(messageClass);
+ try {
+ message.readFields(in);
+ } catch (IOException e) {
+ throw new IllegalStateException("readMessagesForCurrentVertex: " +
+ "Failed to read message from " + i + " of " +
+ messagesSize + " for vertex id " + currentVertexId + " from " +
+ file, e);
+ }
+ messages.add(message);
+ }
+ currentVertexDone();
+ return messages;
+ }
+
+ /**
+ * Release current vertex.
+ *
+ * @throws IOException
+ */
+ private void currentVertexDone() throws IOException {
+ currentVertexId = null;
+ verticesLeft--;
+ if (verticesLeft == 0) {
+ endReading();
+ }
+ }
+
+ /**
+ * Call when we are done reading, for closing files.
+ *
+ * @throws IOException
+ */
+ private void endReading() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("endReading: Stopped reading " + file);
+ }
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ }
+
+ /**
+ * Create new factory for this message store
+ *
+ * @param config Hadoop configuration
+ * @param <I> Vertex id
+ * @param <M> Message data
+ * @return Factory
+ */
+ public static <I extends WritableComparable, M extends Writable>
+ MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> newFactory(
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+ return new Factory<I, M>(config);
+ }
+
+ /**
+ * Factory for {@link SequentialFileMessageStore}
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+ private static class Factory<I extends WritableComparable,
+ M extends Writable>
+ implements MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> {
+ /** Hadoop configuration */
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
+ /** Directories in which we'll keep necessary files */
+ private final String[] directories;
+ /** Buffer size to use when reading and writing */
+ private final int bufferSize;
+ /** Counter for created message stores */
+ private final AtomicInteger storeCounter;
+
+ /**
+ * Constructor.
+ *
+ * @param config Hadoop configuration
+ */
+ public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+ this.config = config;
+ String jobId = config.get("mapred.job.id", "Unknown Job");
+ int taskId = config.getTaskPartition();
+ List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
+ Collections.shuffle(userPaths);
+ directories = new String[userPaths.size()];
+ int i = 0;
+ for (String path : userPaths) {
+ String directory = path + File.separator + jobId + File.separator +
+ taskId + File.separator;
+ directories[i++] = directory;
+ new File(directory).mkdirs();
+ }
+ this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
+ storeCounter = new AtomicInteger();
+ }
+
+ @Override
+ public SequentialFileMessageStore<I, M> newStore(Class<M> messageClass) {
+ int idx = Math.abs(storeCounter.getAndIncrement());
+ String fileName =
+ directories[idx % directories.length] + "messages-" + idx;
+ return new SequentialFileMessageStore<I, M>(messageClass, config,
+ bufferSize, fileName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
new file mode 100644
index 0000000..7039378
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of out-of-core messages related classes.
+ */
+package org.apache.giraph.comm.messages.out_of_core;
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
index 3c798a9..8721756 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/package-info.java
@@ -16,6 +16,6 @@
* limitations under the License.
*/
/**
- * Package of communication related objects, IPC service.
+ * Package of classes for storing messages.
*/
package org.apache.giraph.comm.messages;
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 2adf19d..d786db5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -26,7 +26,7 @@ import org.apache.giraph.comm.SendPartitionCache;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
import org.apache.giraph.comm.requests.SendVertexRequest;
@@ -215,7 +215,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
private void sendPartitionMessages(WorkerInfo workerInfo,
Partition<I, V, E> partition) {
final int partitionId = partition.getId();
- MessageStoreByPartition<I, Writable> messageStore =
+ MessageStore<I, Writable> messageStore =
serverData.getCurrentMessageStore();
ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
new ByteArrayVertexIdMessages<I, Writable>(
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index b457038..3473de1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -21,14 +21,12 @@ package org.apache.giraph.comm.netty;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerServer;
-import org.apache.giraph.comm.messages.BasicMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
-import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
@@ -107,7 +105,7 @@ public class NettyWorkerServer<I extends WritableComparable,
*
* @return Message store factory
*/
- private MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+ private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
createMessageStoreFactory() {
boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf);
if (!useOutOfCoreMessaging) {
@@ -118,12 +116,13 @@ public class NettyWorkerServer<I extends WritableComparable,
LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
"maxMessagesInMemory = " + maxMessagesInMemory);
}
- MessageStoreFactory<I, Writable, BasicMessageStore<I, Writable>>
+ MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>>
fileStoreFactory = SequentialFileMessageStore.newFactory(conf);
- MessageStoreFactory<I, Writable, FlushableMessageStore<I, Writable>>
+ MessageStoreFactory<I, Writable,
+ PartitionDiskBackedMessageStore<I, Writable>>
partitionStoreFactory =
- DiskBackedMessageStore.newFactory(conf, fileStoreFactory);
- return DiskBackedMessageStoreByPartition.newFactory(service,
+ PartitionDiskBackedMessageStore.newFactory(conf, fileStoreFactory);
+ return DiskBackedMessageStore.newFactory(service,
maxMessagesInMemory, partitionStoreFactory);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 6fdcfb0..a9bf3fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -19,7 +19,7 @@ package org.apache.giraph.graph;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.SimpleVertexWriter;
@@ -81,7 +81,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
/** Thread-safe queue of all partition ids */
private final BlockingQueue<Integer> partitionIdQueue;
/** Message store */
- private final MessageStoreByPartition<I, M1> messageStore;
+ private final MessageStore<I, M1> messageStore;
/** Configuration */
private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** Worker (for NettyWorkerClientRequestProcessor) */
@@ -111,7 +111,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
*/
public ComputeCallable(
Mapper<?, ?, ?, ?>.Context context, GraphState graphState,
- MessageStoreByPartition<I, M1> messageStore,
+ MessageStore<I, M1> messageStore,
BlockingQueue<Integer> partitionIdQueue,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
CentralizedServiceWorker<I, V, E> serviceWorker) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 435dd87..b32c21b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -21,7 +21,7 @@ package org.apache.giraph.graph;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.master.BspServiceMaster;
@@ -253,7 +253,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
graphState = checkSuperstepRestarted(superstep, graphState);
prepareForSuperstep(graphState);
context.progress();
- MessageStoreByPartition<I, Writable> messageStore =
+ MessageStore<I, Writable> messageStore =
serviceWorker.getServerData().getCurrentMessageStore();
int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
int numThreads = Math.min(numComputeThreads, numPartitions);
@@ -691,7 +691,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
List<PartitionStats> partitionStatsList,
final GraphState graphState,
- final MessageStoreByPartition<I, Writable> messageStore,
+ final MessageStore<I, Writable> messageStore,
int numPartitions,
int numThreads) {
final BlockingQueue<Integer> computePartitionIdQueue =
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 5c69161..35e6362 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -102,7 +102,7 @@ public class RequestFailureTest {
private void checkResult(int numRequests) throws IOException {
// Check the output
Iterable<IntWritable> vertices =
- serverData.getIncomingMessageStore().getDestinationVertices();
+ serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
int keySum = 0;
int messageSum = 0;
for (IntWritable vertexId : vertices) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 7016572..c8c09df 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -168,7 +168,7 @@ public class RequestTest {
// Check the output
Iterable<IntWritable> vertices =
- serverData.getIncomingMessageStore().getDestinationVertices();
+ serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
int keySum = 0;
int messageSum = 0;
for (IntWritable vertexId : vertices) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
index 4e8041a..e270816 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java
@@ -24,14 +24,12 @@ import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.BasicMessageStore;
import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
-import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore;
+import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -138,42 +136,25 @@ public class TestMessageStores {
return allMessages;
}
- /**
- * Used for testing only
- */
- private static class InputMessageStore extends
- ByteArrayMessagesPerVertexStore<IntWritable, IntWritable> {
-
- /**
- * Constructor
- *
- * @param service Service worker
- * @param config Hadoop configuration
- */
- InputMessageStore(
- CentralizedServiceWorker<IntWritable, ?, ?> service,
- ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config,
- Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException {
- super(IntWritable.class, service, config);
- // Adds all the messages to the store
- for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
- inputMap.entrySet()) {
- int partitionId = getPartitionId(entry.getKey());
- ByteArrayVertexIdMessages<IntWritable, IntWritable>
- byteArrayVertexIdMessages =
- new ByteArrayVertexIdMessages<IntWritable,
- IntWritable>(IntWritable.class);
- byteArrayVertexIdMessages.setConf(config);
- byteArrayVertexIdMessages.initialize();
- for (IntWritable message : entry.getValue()) {
- byteArrayVertexIdMessages.add(entry.getKey(), message);
- }
- try {
- addPartitionMessages(partitionId, byteArrayVertexIdMessages);
- } catch (IOException e) {
- throw new IllegalStateException("Got IOException", e);
- }
+ private static void addMessages(
+ MessageStore<IntWritable, IntWritable> messageStore,
+ CentralizedServiceWorker<IntWritable, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config,
+ Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException {
+ for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
+ inputMap.entrySet()) {
+ int partitionId =
+ service.getVertexPartitionOwner(entry.getKey()).getPartitionId();
+ ByteArrayVertexIdMessages<IntWritable, IntWritable>
+ byteArrayVertexIdMessages =
+ new ByteArrayVertexIdMessages<IntWritable,
+ IntWritable>(IntWritable.class);
+ byteArrayVertexIdMessages.setConf(config);
+ byteArrayVertexIdMessages.initialize();
+ for (IntWritable message : entry.getValue()) {
+ byteArrayVertexIdMessages.add(entry.getKey(), message);
}
+ messageStore.addPartitionMessages(partitionId, byteArrayVertexIdMessages);
}
}
@@ -184,8 +165,7 @@ public class TestMessageStores {
for (int n = 0; n < testData.numTimes; n++) {
SortedMap<IntWritable, Collection<IntWritable>> batch =
createRandomMessages(testData);
- messageStore.addMessages(new InputMessageStore(service, config,
- batch));
+ addMessages(messageStore, service, config, batch);
for (Entry<IntWritable, Collection<IntWritable>> entry :
batch.entrySet()) {
if (messages.containsKey(entry.getKey())) {
@@ -200,19 +180,24 @@ public class TestMessageStores {
private <I extends WritableComparable, M extends Writable> boolean
equalMessages(
MessageStore<I, M> messageStore,
- Map<I, Collection<M>> expectedMessages) throws IOException {
- TreeSet<I> vertexIds = Sets.newTreeSet();
- Iterables.addAll(vertexIds, messageStore.getDestinationVertices());
- for (I vertexId : vertexIds) {
- Iterable<M> expected = expectedMessages.get(vertexId);
- if (expected == null) {
- return false;
- }
- Iterable<M> actual = messageStore.getVertexMessages(vertexId);
- if (!CollectionUtils.isEqual(expected, actual)) {
- System.err.println("equalMessages: For vertexId " + vertexId +
- " expected " + expected + ", but got " + actual);
- return false;
+ Map<I, Collection<M>> expectedMessages,
+ TestData testData) throws IOException {
+ for (int partitionId = 0; partitionId < testData.numOfPartitions;
+ partitionId++) {
+ TreeSet<I> vertexIds = Sets.newTreeSet();
+ Iterables.addAll(vertexIds,
+ messageStore.getPartitionDestinationVertices(partitionId));
+ for (I vertexId : vertexIds) {
+ Iterable<M> expected = expectedMessages.get(vertexId);
+ if (expected == null) {
+ return false;
+ }
+ Iterable<M> actual = messageStore.getVertexMessages(vertexId);
+ if (!CollectionUtils.isEqual(expected, actual)) {
+ System.err.println("equalMessages: For vertexId " + vertexId +
+ " expected " + expected + ", but got " + actual);
+ return false;
+ }
}
}
return true;
@@ -220,7 +205,7 @@ public class TestMessageStores {
private <S extends MessageStore<IntWritable, IntWritable>> S doCheckpoint(
MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
- S messageStore) throws IOException {
+ S messageStore, TestData testData) throws IOException {
File file = new File(directory, "messageStoreTest");
if (file.exists()) {
file.delete();
@@ -228,14 +213,20 @@ public class TestMessageStores {
file.createNewFile();
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
(new FileOutputStream(file))));
- messageStore.write(out);
+ for (int partitionId = 0; partitionId < testData.numOfPartitions;
+ partitionId++) {
+ messageStore.writePartition(out, partitionId);
+ }
out.close();
messageStore = messageStoreFactory.newStore(IntWritable.class);
DataInputStream in = new DataInputStream(new BufferedInputStream(
(new FileInputStream(file))));
- messageStore.readFields(in);
+ for (int partitionId = 0; partitionId < testData.numOfPartitions;
+ partitionId++) {
+ messageStore.readFieldsForPartition(in, partitionId);
+ }
in.close();
file.delete();
@@ -250,10 +241,10 @@ public class TestMessageStores {
new TreeMap<IntWritable, Collection<IntWritable>>();
S messageStore = messageStoreFactory.newStore(IntWritable.class);
putNTimes(messageStore, messages, testData);
- assertTrue(equalMessages(messageStore, messages));
+ assertTrue(equalMessages(messageStore, messages, testData));
messageStore.clearAll();
- messageStore = doCheckpoint(messageStoreFactory, messageStore);
- assertTrue(equalMessages(messageStore, messages));
+ messageStore = doCheckpoint(messageStoreFactory, messageStore, testData);
+ assertTrue(equalMessages(messageStore, messages, testData));
messageStore.clearAll();
}
@@ -270,31 +261,18 @@ public class TestMessageStores {
}
@Test
- public void testDiskBackedMessageStore() {
- try {
- MessageStoreFactory<IntWritable, IntWritable,
- BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
- SequentialFileMessageStore.newFactory(config);
- MessageStoreFactory<IntWritable, IntWritable,
- FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
- DiskBackedMessageStore.newFactory(config, fileStoreFactory);
- testMessageStore(diskStoreFactory, testData);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Test
public void testDiskBackedMessageStoreByPartition() {
try {
MessageStoreFactory<IntWritable, IntWritable,
- BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
+ SequentialFileMessageStore<IntWritable, IntWritable>>
+ fileStoreFactory =
SequentialFileMessageStore.newFactory(config);
MessageStoreFactory<IntWritable, IntWritable,
- FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
- DiskBackedMessageStore.newFactory(config, fileStoreFactory);
- testMessageStore(DiskBackedMessageStoreByPartition.newFactory(service,
- testData.maxMessagesInMemory, diskStoreFactory), testData);
+ PartitionDiskBackedMessageStore<IntWritable, IntWritable>>
+ partitionStoreFactory =
+ PartitionDiskBackedMessageStore.newFactory(config, fileStoreFactory);
+ testMessageStore(DiskBackedMessageStore.newFactory(service,
+ testData.maxMessagesInMemory, partitionStoreFactory), testData);
} catch (IOException e) {
e.printStackTrace();
}
[2/2] git commit: updated refs/heads/trunk to 8944567
Posted by ma...@apache.org.
GIRAPH-697: Clean up message stores (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/89445670
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/89445670
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/89445670
Branch: refs/heads/trunk
Commit: 89445670eee21e38c0dadde4753310adae14b964
Parents: f8a3c77
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Tue Jun 25 16:05:44 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Tue Jun 25 16:05:44 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/comm/ServerData.java | 23 +-
.../giraph/comm/messages/BasicMessageStore.java | 67 ---
.../ByteArrayMessagesPerVertexStore.java | 77 +---
.../comm/messages/DiskBackedMessageStore.java | 418 -------------------
.../DiskBackedMessageStoreByPartition.java | 390 -----------------
.../comm/messages/FlushableMessageStore.java | 40 --
.../messages/InMemoryMessageStoreFactory.java | 4 +-
.../giraph/comm/messages/MessageStore.java | 75 +++-
.../comm/messages/MessageStoreByPartition.java | 82 ----
.../comm/messages/MessageStoreFactory.java | 6 +-
.../giraph/comm/messages/MessagesIterable.java | 56 +++
.../comm/messages/OneMessagePerVertexStore.java | 65 +--
.../messages/SequentialFileMessageStore.java | 411 ------------------
.../comm/messages/SimpleMessageStore.java | 44 +-
.../out_of_core/DiskBackedMessageStore.java | 322 ++++++++++++++
.../PartitionDiskBackedMessageStore.java | 350 ++++++++++++++++
.../out_of_core/SequentialFileMessageStore.java | 407 ++++++++++++++++++
.../comm/messages/out_of_core/package-info.java | 21 +
.../giraph/comm/messages/package-info.java | 2 +-
.../NettyWorkerClientRequestProcessor.java | 4 +-
.../giraph/comm/netty/NettyWorkerServer.java | 21 +-
.../apache/giraph/graph/ComputeCallable.java | 6 +-
.../apache/giraph/graph/GraphTaskManager.java | 6 +-
.../apache/giraph/comm/RequestFailureTest.java | 2 +-
.../org/apache/giraph/comm/RequestTest.java | 2 +-
.../apache/giraph/comm/TestMessageStores.java | 140 +++----
27 files changed, 1335 insertions(+), 1708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 0f2758e..2a605d8 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-697: Clean up message stores (majakabiljo)
+
GIRAPH-696: Should be able to spill giraph metrics to a specified
directory on HDFS (claudio)
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 788be53..affc260 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -21,7 +21,7 @@ package org.apache.giraph.comm;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -54,21 +54,18 @@ public class ServerData<I extends WritableComparable,
/** Edge store for this worker. */
private final EdgeStore<I, V, E> edgeStore;
/** Message store factory */
- private final
- MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+ private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
messageStoreFactory;
/**
* Message store for incoming messages (messages which will be consumed
* in the next super step)
*/
- private volatile MessageStoreByPartition<I, Writable>
- incomingMessageStore;
+ private volatile MessageStore<I, Writable> incomingMessageStore;
/**
* Message store for current messages (messages which we received in
* previous super step and which will be consumed in current super step)
*/
- private volatile MessageStoreByPartition<I, Writable>
- currentMessageStore;
+ private volatile MessageStore<I, Writable> currentMessageStore;
/**
* Map of partition ids to incoming vertex mutations from other workers.
* (Synchronized access to values)
@@ -95,7 +92,7 @@ public class ServerData<I extends WritableComparable,
public ServerData(
CentralizedServiceWorker<I, V, E> service,
ImmutableClassesGiraphConfiguration<I, V, E> conf,
- MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+ MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
messageStoreFactory,
Mapper<?, ?, ?, ?>.Context context) {
this.conf = conf;
@@ -136,9 +133,8 @@ public class ServerData<I extends WritableComparable,
* @param <M> Message data
* @return Incoming message store
*/
- public <M extends Writable> MessageStoreByPartition<I, M>
- getIncomingMessageStore() {
- return (MessageStoreByPartition<I, M>) incomingMessageStore;
+ public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
+ return (MessageStore<I, M>) incomingMessageStore;
}
/**
@@ -148,9 +144,8 @@ public class ServerData<I extends WritableComparable,
* @param <M> Message data
* @return Current message store
*/
- public <M extends Writable> MessageStoreByPartition<I, M>
- getCurrentMessageStore() {
- return (MessageStoreByPartition<I, M>) currentMessageStore;
+ public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
+ return (MessageStore<I, M>) currentMessageStore;
}
/** Prepare for next super step */
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
deleted file mode 100644
index dcb6223..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Most basic message store with just add, get and clear operations
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface BasicMessageStore<I extends WritableComparable,
- M extends Writable> extends Writable {
- /**
- * Adds messages from one message store to another
- *
- * @param messageStore Add the messages from this message store to this
- * object
- * @throws java.io.IOException
- */
- void addMessages(MessageStore<I, M> messageStore) throws IOException;
-
- /**
- * Gets messages for a vertex. The lifetime of every message is only
- * guaranteed until the iterator's next() method is called. Do not re-use
- * the messages.
- *
- * @param vertexId Vertex id for which we want to get messages
- * @return Iterable of messages for a vertex id
- * @throws IOException
- */
- Iterable<M> getVertexMessages(I vertexId) throws IOException;
-
- /**
- * Clears messages for a vertex.
- *
- * @param vertexId Vertex id for which we want to clear messages
- * @throws IOException
- */
- void clearVertexMessages(I vertexId) throws IOException;
-
- /**
- * Clears all resources used by this store.
- *
- * @throws IOException
- */
- void clearAll() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 97c8a35..fecd7ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -24,7 +24,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
import org.apache.giraph.utils.RepresentativeByteArrayIterator;
import org.apache.giraph.utils.VertexIdIterator;
import org.apache.hadoop.io.Writable;
@@ -33,7 +32,6 @@ import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/**
@@ -128,33 +126,11 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
}
}
- /**
- * Special iterable that recycles the message
- */
- private class MessagesIterable extends RepresentativeByteArrayIterable<M> {
- /**
- * Constructor
- *
- * @param buf Buffer
- * @param off Offset to start in the buffer
- * @param length Length of the buffer
- */
- private MessagesIterable(byte[] buf, int off, int length) {
- super(config, buf, off, length);
- }
-
- @Override
- protected M createWritable() {
- return ReflectionUtils.newInstance(messageClass);
- }
- }
-
@Override
protected Iterable<M> getMessagesAsIterable(
ExtendedDataOutput extendedDataOutput) {
-
- return new MessagesIterable(extendedDataOutput.getByteArray(), 0,
- extendedDataOutput.getPos());
+ return new MessagesIterable<M>(config, messageClass,
+ extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
}
/**
@@ -209,9 +185,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
int byteArraySize = in.readInt();
byte[] messages = new byte[byteArraySize];
in.readFully(messages);
- ExtendedDataOutput extendedDataOutput =
- config.createExtendedDataOutput(messages, 0);
- return extendedDataOutput;
+ return config.createExtendedDataOutput(messages, 0);
}
/**
@@ -224,48 +198,12 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
* @return Factory
*/
public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+ MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
CentralizedServiceWorker<I, ?, ?> service,
ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
return new Factory<I, M>(service, config);
}
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws IOException {
- if (messageStore instanceof ByteArrayMessagesPerVertexStore) {
- ByteArrayMessagesPerVertexStore<I, M>
- byteArrayMessagesPerVertexStore =
- (ByteArrayMessagesPerVertexStore<I, M>) messageStore;
- for (Map.Entry<Integer, ConcurrentMap<I, ExtendedDataOutput>>
- partitionEntry : byteArrayMessagesPerVertexStore.map.entrySet()) {
- for (Map.Entry<I, ExtendedDataOutput> vertexEntry :
- partitionEntry.getValue().entrySet()) {
- ConcurrentMap<I, ExtendedDataOutput> partitionMap =
- getOrCreatePartitionMap(partitionEntry.getKey());
- ExtendedDataOutput extendedDataOutput =
- partitionMap.get(vertexEntry.getKey());
- if (extendedDataOutput == null) {
- ExtendedDataOutput newExtendedDataOutput =
- config.createExtendedDataOutput();
- extendedDataOutput =
- partitionMap.putIfAbsent(vertexEntry.getKey(),
- newExtendedDataOutput);
- if (extendedDataOutput == null) {
- extendedDataOutput = newExtendedDataOutput;
- }
- }
-
- // Add the messages
- extendedDataOutput.write(vertexEntry.getValue().getByteArray(), 0,
- vertexEntry.getValue().getPos());
- }
- }
- } else {
- throw new IllegalArgumentException("addMessages: Illegal argument " +
- messageStore.getClass());
- }
- }
-
/**
* Factory for {@link ByteArrayMessagesPerVertexStore}
*
@@ -273,7 +211,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
* @param <M> Message data
*/
private static class Factory<I extends WritableComparable, M extends Writable>
- implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+ implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Service worker */
private final CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
@@ -290,8 +228,9 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
}
@Override
- public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
- return new ByteArrayMessagesPerVertexStore(messageClass, service, config);
+ public MessageStore<I, M> newStore(Class<M> messageClass) {
+ return new ByteArrayMessagesPerVertexStore<I, M>(
+ messageClass, service, config);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
deleted file mode 100644
index 2712edd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message storage with in-memory map of messages and with support for
- * flushing all the messages to the disk.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class DiskBackedMessageStore<I extends WritableComparable,
- M extends Writable> implements FlushableMessageStore<I, M> {
- /** Message class */
- private final Class<M> messageClass;
- /**
- * In-memory message map (must be sorted to insure that the ids are
- * ordered)
- */
- private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
- inMemoryMessages;
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
- /** Counter for number of messages in-memory */
- private final AtomicInteger numberOfMessagesInMemory;
- /** To keep vertex ids which we have messages for */
- private final Set<I> destinationVertices;
- /** File stores in which we keep flushed messages */
- private final Collection<BasicMessageStore<I, M>> fileStores;
- /** Factory for creating file stores when flushing */
- private final
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
- /** Lock for disk flushing */
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
-
- /**
- * Constructor.
- *
- * @param messageClass Message class held in the store
- * @param config Hadoop configuration
- * @param fileStoreFactory Factory for creating file stores when flushing
- */
- public DiskBackedMessageStore(
- Class<M> messageClass,
- ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
- inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
- this.messageClass = messageClass;
- this.config = config;
- numberOfMessagesInMemory = new AtomicInteger(0);
- destinationVertices =
- Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
- fileStores = Lists.newArrayList();
- this.fileStoreFactory = fileStoreFactory;
- }
-
- /**
- * Add vertex messages
- *
- * @param vertexId Vertex id to use
- * @param messages Messages to add (note that the lifetime of the messages)
- * is only until next() is called again)
- * @return True if the vertex id ownership is taken by this method,
- * false otherwise
- * @throws IOException
- */
- boolean addVertexMessages(I vertexId,
- Iterable<M> messages) throws IOException {
- boolean ownsVertexId = false;
- destinationVertices.add(vertexId);
- rwLock.readLock().lock();
- try {
- ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
- if (extendedDataOutput == null) {
- ExtendedDataOutput newExtendedDataOutput =
- config.createExtendedDataOutput();
- extendedDataOutput =
- inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
- if (extendedDataOutput == null) {
- ownsVertexId = true;
- extendedDataOutput = newExtendedDataOutput;
- }
- }
-
- synchronized (extendedDataOutput) {
- for (M message : messages) {
- message.write(extendedDataOutput);
- numberOfMessagesInMemory.getAndIncrement();
- }
- }
- } finally {
- rwLock.readLock().unlock();
- }
-
- return ownsVertexId;
- }
-
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws
- IOException {
- for (I destinationVertex : messageStore.getDestinationVertices()) {
- addVertexMessages(destinationVertex,
- messageStore.getVertexMessages(destinationVertex));
- }
- }
-
- /**
- * Special iterable that recycles the message
- */
- private class MessageIterable extends RepresentativeByteArrayIterable<M> {
- /**
- * Constructor
- *
- * @param buf Buffer
- * @param off Offset to start in the buffer
- * @param length Length of the buffer
- */
- public MessageIterable(
- byte[] buf, int off, int length) {
- super(config, buf, off, length);
- }
-
- @Override
- protected M createWritable() {
- return ReflectionUtils.newInstance(messageClass);
- }
- }
-
- @Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
- ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
- if (extendedDataOutput == null) {
- extendedDataOutput = config.createExtendedDataOutput();
- }
- Iterable<M> combinedIterable = new MessageIterable(
- extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
-
- for (BasicMessageStore<I, M> fileStore : fileStores) {
- combinedIterable = Iterables.concat(combinedIterable,
- fileStore.getVertexMessages(vertexId));
- }
- return combinedIterable;
- }
-
- @Override
- public int getNumberOfMessages() {
- return numberOfMessagesInMemory.get();
- }
-
- @Override
- public boolean hasMessagesForVertex(I vertexId) {
- return destinationVertices.contains(vertexId);
- }
-
- @Override
- public Iterable<I> getDestinationVertices() {
- return destinationVertices;
- }
-
- @Override
- public void clearVertexMessages(I vertexId) throws IOException {
- inMemoryMessages.remove(vertexId);
- }
-
- @Override
- public void clearAll() throws IOException {
- inMemoryMessages.clear();
- destinationVertices.clear();
- for (BasicMessageStore<I, M> fileStore : fileStores) {
- fileStore.clearAll();
- }
- fileStores.clear();
- }
-
- /**
- * Special temporary message store for passing along in-memory messages
- */
- private class TemporaryMessageStore implements MessageStore<I, M> {
- /**
- * In-memory message map (must be sorted to insure that the ids are
- * ordered)
- */
- private final ConcurrentNavigableMap<I, ExtendedDataOutput>
- temporaryMessages;
-
- /**
- * Constructor.
- *
- * @param temporaryMessages Messages to be owned by this object
- */
- private TemporaryMessageStore(
- ConcurrentNavigableMap<I, ExtendedDataOutput>
- temporaryMessages) {
- this.temporaryMessages = temporaryMessages;
- }
-
- @Override
- public int getNumberOfMessages() {
- throw new IllegalAccessError("getNumberOfMessages: Not supported");
- }
-
- @Override
- public boolean hasMessagesForVertex(I vertexId) {
- return temporaryMessages.containsKey(vertexId);
- }
-
- @Override
- public Iterable<I> getDestinationVertices() {
- return temporaryMessages.keySet();
- }
-
- @Override
- public void addMessages(MessageStore<I, M> messageStore)
- throws IOException {
- throw new IllegalAccessError("addMessages: Not supported");
- }
-
- @Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
- ExtendedDataOutput extendedDataOutput = temporaryMessages.get(vertexId);
- if (extendedDataOutput == null) {
- extendedDataOutput = config.createExtendedDataOutput();
- }
- return new MessageIterable(extendedDataOutput.getByteArray(), 0,
- extendedDataOutput.getPos());
- }
-
- @Override
- public void clearVertexMessages(I vertexId) throws IOException {
- temporaryMessages.remove(vertexId);
- }
-
- @Override
- public void clearAll() throws IOException {
- temporaryMessages.clear();
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- throw new IllegalAccessError("write: Not supported");
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- throw new IllegalAccessError("readFields: Not supported");
- }
- }
-
- @Override
- public void flush() throws IOException {
- ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
- rwLock.writeLock().lock();
- try {
- messagesToFlush = inMemoryMessages;
- inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
- numberOfMessagesInMemory.set(0);
- } finally {
- rwLock.writeLock().unlock();
- }
- BasicMessageStore<I, M> fileStore =
- fileStoreFactory.newStore(messageClass);
- fileStore.addMessages(new TemporaryMessageStore(messagesToFlush));
-
- synchronized (fileStores) {
- fileStores.add(fileStore);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // write destination vertices
- out.writeInt(destinationVertices.size());
- for (I vertexId : destinationVertices) {
- vertexId.write(out);
- }
-
- // write of in-memory messages
- out.writeInt(numberOfMessagesInMemory.get());
-
- // write in-memory messages map
- out.writeInt(inMemoryMessages.size());
- for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
- entry.getKey().write(out);
- out.writeInt(entry.getValue().getPos());
- out.write(entry.getValue().getByteArray(), 0, entry.getValue().getPos());
- }
-
- // write file stores
- out.writeInt(fileStores.size());
- for (BasicMessageStore<I, M> fileStore : fileStores) {
- fileStore.write(out);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- // read destination vertices
- int numVertices = in.readInt();
- for (int v = 0; v < numVertices; v++) {
- I vertexId = (I) config.createVertexId();
- vertexId.readFields(in);
- destinationVertices.add(vertexId);
- }
-
- // read in-memory messages
- numberOfMessagesInMemory.set(in.readInt());
-
- // read in-memory map
- int mapSize = in.readInt();
- for (int m = 0; m < mapSize; m++) {
- I vertexId = config.createVertexId();
- vertexId.readFields(in);
- int messageBytes = in.readInt();
- byte[] buf = new byte[messageBytes];
- ExtendedDataOutput extendedDataOutput =
- config.createExtendedDataOutput(buf, messageBytes);
- inMemoryMessages.put(vertexId, extendedDataOutput);
- }
-
- // read file stores
- int numFileStores = in.readInt();
- for (int s = 0; s < numFileStores; s++) {
- BasicMessageStore<I, M> fileStore =
- fileStoreFactory.newStore(messageClass);
- fileStore.readFields(in);
- fileStores.add(fileStore);
- }
- }
-
-
- /**
- * Create new factory for this message store
- *
- * @param config Hadoop configuration
- * @param fileStoreFactory Factory for creating message stores for
- * partitions
- * @param <I> Vertex id
- * @param <M> Message data
- * @return Factory
- */
- public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(
- ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
- return new Factory<I, M>(config, fileStoreFactory);
- }
-
- /**
- * Factory for {@link DiskBackedMessageStore}
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
- private static class Factory<I extends WritableComparable,
- M extends Writable> implements MessageStoreFactory<I, M,
- FlushableMessageStore<I, M>> {
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration config;
- /** Factory for creating message stores for partitions */
- private final
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
-
- /**
- * @param config Hadoop configuration
- * @param fileStoreFactory Factory for creating message stores for
- * partitions
- */
- public Factory(ImmutableClassesGiraphConfiguration config,
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
- this.config = config;
- this.fileStoreFactory = fileStoreFactory;
- }
-
- @Override
- public FlushableMessageStore<I, M> newStore(Class<M> messageClass) {
- return new DiskBackedMessageStore<I, M>(messageClass, config,
- fileStoreFactory);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
deleted file mode 100644
index 4a28949..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Message store which separates data by partitions,
- * and submits them to underlying message store.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class DiskBackedMessageStoreByPartition<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- MessageStoreByPartition<I, M> {
- /** Message class */
- private final Class<M> messageClass;
- /** Service worker */
- private final CentralizedServiceWorker<I, V, E> service;
- /** Number of messages to keep in memory */
- private final int maxNumberOfMessagesInMemory;
- /** Factory for creating file stores when flushing */
- private final
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
- /** Map from partition id to its message store */
- private final
- ConcurrentMap<Integer, FlushableMessageStore<I, M>> partitionMessageStores;
- /**
- * @param messageClass Message class held in the store
- * @param service Service worker
- * @param maxNumberOfMessagesInMemory Number of messages to keep in memory
- * @param fileStoreFactory Factory for creating file stores
- * when flushing
- */
- public DiskBackedMessageStoreByPartition(
- Class<M> messageClass,
- CentralizedServiceWorker<I, V, E> service,
- int maxNumberOfMessagesInMemory,
- MessageStoreFactory<I, M, FlushableMessageStore<I,
- M>> fileStoreFactory) {
- this.messageClass = messageClass;
- this.service = service;
- this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory;
- this.fileStoreFactory = fileStoreFactory;
- partitionMessageStores = Maps.newConcurrentMap();
- }
-
- @Override
- public void addPartitionMessages(
- int partitionId,
- ByteArrayVertexIdMessages<I, M> messages) throws IOException {
- FlushableMessageStore<I, M> flushableMessageStore =
- getMessageStore(partitionId);
- if (flushableMessageStore instanceof DiskBackedMessageStore) {
- DiskBackedMessageStore<I, M> diskBackedMessageStore =
- (DiskBackedMessageStore<I, M>) flushableMessageStore;
- ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
- vertexIdMessageIterator =
- messages.getVertexIdMessageIterator();
- while (vertexIdMessageIterator.hasNext()) {
- vertexIdMessageIterator.next();
- boolean ownsVertexId =
- diskBackedMessageStore.addVertexMessages(
- vertexIdMessageIterator.getCurrentVertexId(),
- Collections.singleton(
- vertexIdMessageIterator.getCurrentMessage()));
- if (ownsVertexId) {
- vertexIdMessageIterator.releaseCurrentVertexId();
- }
- }
- } else {
- throw new IllegalStateException("addPartitionMessages: Doesn't support " +
- "class " + flushableMessageStore.getClass());
- }
- checkMemory();
- }
-
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws IOException {
- for (I destinationVertex : messageStore.getDestinationVertices()) {
- FlushableMessageStore<I, M> flushableMessageStore =
- getMessageStore(destinationVertex);
- if (flushableMessageStore instanceof DiskBackedMessageStore) {
- DiskBackedMessageStore<I, M> diskBackedMessageStore =
- (DiskBackedMessageStore<I, M>) flushableMessageStore;
- Iterable<M> messages =
- messageStore.getVertexMessages(destinationVertex);
- diskBackedMessageStore.addVertexMessages(destinationVertex, messages);
- } else {
- throw new IllegalStateException("addMessages: Doesn't support " +
- "class " + flushableMessageStore.getClass());
- }
- }
- checkMemory();
- }
-
- @Override
- public Iterable<M> getVertexMessages(I vertexId) throws IOException {
- if (hasMessagesForVertex(vertexId)) {
- return getMessageStore(vertexId).getVertexMessages(vertexId);
- } else {
- return EmptyIterable.get();
- }
- }
-
- @Override
- public int getNumberOfMessages() {
- int numOfMessages = 0;
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- numOfMessages += messageStore.getNumberOfMessages();
- }
- return numOfMessages;
- }
-
- @Override
- public boolean hasMessagesForVertex(I vertexId) {
- return getMessageStore(vertexId).hasMessagesForVertex(vertexId);
- }
-
- @Override
- public Iterable<I> getDestinationVertices() {
- List<I> vertices = Lists.newArrayList();
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- Iterables.addAll(vertices, messageStore.getDestinationVertices());
- }
- return vertices;
- }
-
- @Override
- public Iterable<I> getPartitionDestinationVertices(int partitionId) {
- FlushableMessageStore<I, M> messageStore =
- partitionMessageStores.get(partitionId);
- if (messageStore == null) {
- return Collections.emptyList();
- } else {
- return messageStore.getDestinationVertices();
- }
- }
-
- @Override
- public void clearVertexMessages(I vertexId) throws IOException {
- if (hasMessagesForVertex(vertexId)) {
- getMessageStore(vertexId).clearVertexMessages(vertexId);
- }
- }
-
- @Override
- public void clearPartition(int partitionId) throws IOException {
- FlushableMessageStore<I, M> messageStore =
- partitionMessageStores.get(partitionId);
- if (messageStore != null) {
- messageStore.clearAll();
- }
- }
-
- @Override
- public void clearAll() throws IOException {
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- messageStore.clearAll();
- }
- partitionMessageStores.clear();
- }
-
- /**
- * Checks the memory status, flushes if necessary
- *
- * @throws IOException
- */
- private void checkMemory() throws IOException {
- while (memoryFull()) {
- flushOnePartition();
- }
- }
-
- /**
- * Check if memory is full
- *
- * @return True iff memory is full
- */
- private boolean memoryFull() {
- int totalMessages = 0;
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- totalMessages += messageStore.getNumberOfMessages();
- }
- return totalMessages > maxNumberOfMessagesInMemory;
- }
-
- /**
- * Finds biggest partition and flushes it to the disk
- *
- * @throws IOException
- */
- private void flushOnePartition() throws IOException {
- int maxMessages = 0;
- FlushableMessageStore<I, M> biggestStore = null;
- for (FlushableMessageStore<I, M> messageStore :
- partitionMessageStores.values()) {
- int numMessages = messageStore.getNumberOfMessages();
- if (numMessages > maxMessages) {
- maxMessages = numMessages;
- biggestStore = messageStore;
- }
- }
- if (biggestStore != null) {
- biggestStore.flush();
- }
- }
-
- /**
- * Get message store for partition which holds vertex with required vertex
- * id
- *
- * @param vertexId Id of vertex for which we are asking for message store
- * @return Requested message store
- */
- private FlushableMessageStore<I, M> getMessageStore(I vertexId) {
- int partitionId =
- service.getVertexPartitionOwner(vertexId).getPartitionId();
- return getMessageStore(partitionId);
- }
-
- /**
- * Get message store for partition id. It it doesn't exist yet,
- * creates a new one.
- *
- * @param partitionId Id of partition for which we are asking for message
- * store
- * @return Requested message store
- */
- private FlushableMessageStore<I, M> getMessageStore(int partitionId) {
- FlushableMessageStore<I, M> messageStore =
- partitionMessageStores.get(partitionId);
- if (messageStore != null) {
- return messageStore;
- }
- messageStore = fileStoreFactory.newStore(messageClass);
- FlushableMessageStore<I, M> store =
- partitionMessageStores.putIfAbsent(partitionId, messageStore);
- return (store == null) ? messageStore : store;
- }
-
- @Override
- public void writePartition(DataOutput out,
- int partitionId) throws IOException {
- FlushableMessageStore<I, M> partitionStore =
- partitionMessageStores.get(partitionId);
- out.writeBoolean(partitionStore != null);
- if (partitionStore != null) {
- partitionStore.write(out);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(partitionMessageStores.size());
- for (Entry<Integer, FlushableMessageStore<I, M>> entry :
- partitionMessageStores.entrySet()) {
- out.writeInt(entry.getKey());
- entry.getValue().write(out);
- }
- }
-
- @Override
- public void readFieldsForPartition(DataInput in,
- int partitionId) throws IOException {
- if (in.readBoolean()) {
- FlushableMessageStore<I, M> messageStore =
- fileStoreFactory.newStore(messageClass);
- messageStore.readFields(in);
- partitionMessageStores.put(partitionId, messageStore);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int numStores = in.readInt();
- for (int s = 0; s < numStores; s++) {
- int partitionId = in.readInt();
- FlushableMessageStore<I, M> messageStore =
- fileStoreFactory.newStore(messageClass);
- messageStore.readFields(in);
- partitionMessageStores.put(partitionId, messageStore);
- }
- }
-
-
- /**
- * Create new factory for this message store
- *
- * @param service Service worker
- * @param maxMessagesInMemory Number of messages to keep in memory
- * @param fileStoreFactory Factory for creating file stores when
- * flushing
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- * @return Factory
- */
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
- CentralizedServiceWorker<I, V, E> service,
- int maxMessagesInMemory,
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
- fileStoreFactory) {
- return new Factory<I, V, E, M>(service, maxMessagesInMemory,
- fileStoreFactory);
- }
-
- /**
- * Factory for {@link DiskBackedMessageStoreByPartition}
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
- private static class Factory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
- /** Service worker */
- private final CentralizedServiceWorker<I, V, E> service;
- /** Number of messages to keep in memory */
- private final int maxMessagesInMemory;
- /** Factory for creating file stores when flushing */
- private final
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
-
- /**
- * @param service Service worker
- * @param maxMessagesInMemory Number of messages to keep in memory
- * @param fileStoreFactory Factory for creating file stores when
- * flushing
- */
- public Factory(CentralizedServiceWorker<I, V, E> service,
- int maxMessagesInMemory,
- MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
- fileStoreFactory) {
- this.service = service;
- this.maxMessagesInMemory = maxMessagesInMemory;
- this.fileStoreFactory = fileStoreFactory;
- }
-
- @Override
- public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
- return new DiskBackedMessageStoreByPartition<I, V, E, M>(messageClass,
- service, maxMessagesInMemory, fileStoreFactory);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
deleted file mode 100644
index 6e7fe55..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-
-/**
- * Message stores which has flush operation
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface FlushableMessageStore<I extends WritableComparable,
- M extends Writable> extends MessageStore<I, M> {
- /**
- * Flushes messages to the disk.
- *
- * @throws IOException
- */
- void flush() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index 9086d78..ba8a005 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
*/
public class InMemoryMessageStoreFactory<I extends WritableComparable,
M extends Writable>
- implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+ implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(InMemoryMessageStoreFactory.class);
@@ -56,7 +56,7 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
}
@Override
- public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+ public MessageStore<I, M> newStore(Class<M> messageClass) {
if (conf.useCombiner()) {
if (LOG.isInfoEnabled()) {
LOG.info("newStore: " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index a6f174d..2af7642 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -18,6 +18,10 @@
package org.apache.giraph.comm.messages;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -28,13 +32,32 @@ import org.apache.hadoop.io.WritableComparable;
* @param <M> Message data
*/
public interface MessageStore<I extends WritableComparable,
- M extends Writable> extends BasicMessageStore<I, M> {
+ M extends Writable> {
/**
- * Get number of messages in memory
+ * Gets messages for a vertex. The lifetime of every message is only
+ * guaranteed until the iterator's next() method is called. Do not hold
+ * references to objects returned by this iterator.
*
- * @return Number of messages in memory
+ * @param vertexId Vertex id for which we want to get messages
+ * @return Iterable of messages for a vertex id
+ * @throws java.io.IOException
*/
- int getNumberOfMessages();
+ Iterable<M> getVertexMessages(I vertexId) throws IOException;
+
+ /**
+ * Clears messages for a vertex.
+ *
+ * @param vertexId Vertex id for which we want to clear messages
+ * @throws IOException
+ */
+ void clearVertexMessages(I vertexId) throws IOException;
+
+ /**
+ * Clears all resources used by this store.
+ *
+ * @throws IOException
+ */
+ void clearAll() throws IOException;
/**
* Check if we have messages for some vertex
@@ -45,9 +68,49 @@ public interface MessageStore<I extends WritableComparable,
boolean hasMessagesForVertex(I vertexId);
/**
- * Gets vertex ids which we have messages for
+ * Adds messages for partition
+ *
+ * @param partitionId Id of partition
+ * @param messages Collection of vertex ids and messages we want to add
+ * @throws IOException
+ */
+ void addPartitionMessages(
+ int partitionId, ByteArrayVertexIdMessages<I, M> messages)
+ throws IOException;
+
+ /**
+ * Gets vertex ids from selected partition which we have messages for
*
+ * @param partitionId Id of partition
* @return Iterable over vertex ids which we have messages for
*/
- Iterable<I> getDestinationVertices();
+ Iterable<I> getPartitionDestinationVertices(int partitionId);
+
+ /**
+ * Clears messages for a partition.
+ *
+ * @param partitionId Partition id for which we want to clear messages
+ * @throws IOException
+ */
+ void clearPartition(int partitionId) throws IOException;
+
+ /**
+ * Serialize messages for one partition.
+ *
+ * @param out {@link DataOutput} to serialize this object into
+ * @param partitionId Id of partition
+ * @throws IOException
+ */
+ void writePartition(DataOutput out, int partitionId) throws IOException;
+
+ /**
+ * Deserialize messages for one partition
+ *
+ * @param in {@link DataInput} to deserialize this object
+ * from.
+ * @param partitionId Id of partition
+ * @throws IOException
+ */
+ void readFieldsForPartition(DataInput in,
+ int partitionId) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
deleted file mode 100644
index d2143c6..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Message store which stores data by partition
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface MessageStoreByPartition<I extends WritableComparable,
- M extends Writable> extends MessageStore<I, M> {
- /**
- * Adds messages for partition
- *
- * @param partitionId Id of partition
- * @param messages Collection of vertex ids and messages we want to add
- * @throws IOException
- */
- void addPartitionMessages(
- int partitionId, ByteArrayVertexIdMessages<I, M> messages)
- throws IOException;
-
- /**
- * Gets vertex ids from selected partition which we have messages for
- *
- * @param partitionId Id of partition
- * @return Iterable over vertex ids which we have messages for
- */
- Iterable<I> getPartitionDestinationVertices(int partitionId);
-
- /**
- * Clears messages for a partition.
- *
- * @param partitionId Partition id for which we want to clear messages
- * @throws IOException
- */
- void clearPartition(int partitionId) throws IOException;
-
- /**
- * Serialize messages for one partition.
- *
- * @param out {@link DataOutput} to serialize this object into
- * @param partitionId Id of partition
- * @throws IOException
- */
- void writePartition(DataOutput out, int partitionId) throws IOException;
-
- /**
- * Deserialize messages for one partition
- *
- * @param in {@link DataInput} to deserialize this object
- * from.
- * @param partitionId Id of partition
- * @throws IOException
- */
- void readFieldsForPartition(DataInput in,
- int partitionId) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
index dec9a92..aa6a63e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.io.WritableComparable;
*
* @param <I> Vertex id
* @param <M> Message data
- * @param <S> Message store
+ * @param <MS> Message store
*/
public interface MessageStoreFactory<I extends WritableComparable,
- M extends Writable, S extends BasicMessageStore<I, M>> {
+ M extends Writable, MS> {
/**
* Creates new message store.
*
@@ -39,5 +39,5 @@ public interface MessageStoreFactory<I extends WritableComparable,
* @param messageClass Message class held in the store
* @return New message store
*/
- S newStore(Class<M> messageClass);
+ MS newStore(Class<M> messageClass);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
new file mode 100644
index 0000000..970d76d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special iterable that recycles the message
+ *
+ * @param <M> Message data
+ */
+public class MessagesIterable<M extends Writable>
+ extends RepresentativeByteArrayIterable<M> {
+ /** Message class */
+ private final Class<M> messageClass;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ * @param messageClass Message class
+ * @param buf Buffer
+ * @param off Offset to start in the buffer
+ * @param length Length of the buffer
+ */
+ public MessagesIterable(
+ ImmutableClassesGiraphConfiguration conf, Class<M> messageClass,
+ byte[] buf, int off, int length) {
+ super(conf, buf, off, length);
+ this.messageClass = messageClass;
+ }
+
+ @Override
+ protected M createWritable() {
+ return ReflectionUtils.newInstance(messageClass);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 8710dac..f18af5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -22,7 +22,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
-import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -60,44 +59,6 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
this.combiner = combiner;
}
- /**
- * If there is already a message related to the vertex id in the
- * partition map return that message, otherwise create a new one,
- * put it in the map and return it
- *
- * @param vertexId Id of vertex
- * @param partitionMap Partition map
- * @return Message for this vertex
- */
- private M getOrCreateCurrentMessage(I vertexId,
- ConcurrentMap<I, M> partitionMap) {
- M currentMessage = partitionMap.get(vertexId);
- if (currentMessage == null) {
- M newMessage = combiner.createInitialMessage();
- currentMessage = partitionMap.putIfAbsent(vertexId, newMessage);
- if (currentMessage == null) {
- currentMessage = newMessage;
- }
- }
- return currentMessage;
- }
-
- /**
- * Add a single message for vertex to a partition map
- *
- * @param vertexId Id of vertex which received message
- * @param message Message to add
- * @param partitionMap Partition map to add the message to
- * @throws IOException
- */
- private void addVertexMessageToPartition(I vertexId, M message,
- ConcurrentMap<I, M> partitionMap) throws IOException {
- M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
- synchronized (currentMessage) {
- combiner.combine(vertexId, currentMessage, message);
- }
- }
-
@Override
public void addPartitionMessages(
int partitionId,
@@ -149,26 +110,6 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
return message;
}
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws IOException {
- if (messageStore instanceof OneMessagePerVertexStore) {
- OneMessagePerVertexStore<I, M> oneMessagePerVertexStore =
- (OneMessagePerVertexStore<I, M>) messageStore;
- for (Map.Entry<Integer, ConcurrentMap<I, M>>
- partitionEntry : oneMessagePerVertexStore.map.entrySet()) {
- ConcurrentMap<I, M> partitionMap =
- getOrCreatePartitionMap(partitionEntry.getKey());
- for (Map.Entry<I, M> vertexEntry :
- partitionEntry.getValue().entrySet()) {
- addVertexMessageToPartition(vertexEntry.getKey(),
- vertexEntry.getValue(), partitionMap);
- }
- }
- } else {
- throw new IllegalArgumentException("addMessages: Illegal argument " +
- messageStore.getClass());
- }
- }
/**
* Create new factory for this message store
@@ -180,7 +121,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
* @return Factory
*/
public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+ MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
CentralizedServiceWorker<I, ?, ?> service,
ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
return new Factory<I, M>(service, config);
@@ -194,7 +135,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
*/
private static class Factory<I extends WritableComparable,
M extends Writable>
- implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+ implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Service worker */
private final CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
@@ -211,7 +152,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
}
@Override
- public MessageStoreByPartition<I, M> newStore(Class<M> messageClass) {
+ public MessageStore<I, M> newStore(Class<M> messageClass) {
return new OneMessagePerVertexStore<I, M>(messageClass, service,
config.<M>createCombiner(), config);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
deleted file mode 100644
index 23bbbc5..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY;
-
-/**
- * Used for writing and reading collection of messages to the disk.
- * {@link SequentialFileMessageStore#addMessages(MessageStore)}
- * should be called only once with the messages we want to store.
- * <p/>
- * It's optimized for retrieving messages in the natural order of vertex ids
- * they are sent to.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class SequentialFileMessageStore<I extends WritableComparable,
- M extends Writable> implements BasicMessageStore<I, M> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SequentialFileMessageStore.class);
- /** Message class */
- private final Class<M> messageClass;
- /** File in which we store data */
- private final File file;
- /** Configuration which we need for reading data */
- private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
- /** Buffer size to use when reading and writing files */
- private final int bufferSize;
- /** File input stream */
- private DataInputStream in;
- /** How many vertices do we have left to read in the file */
- private int verticesLeft;
- /** Id of currently read vertex */
- private I currentVertexId;
-
- /**
- * Stores message on the disk.
- *
- * @param messageClass Message class held in the store
- * @param config Configuration used later for reading
- * @param bufferSize Buffer size to use when reading and writing
- * @param fileName File in which we want to store messages
- * @throws IOException
- */
- public SequentialFileMessageStore(
- Class<M> messageClass,
- ImmutableClassesGiraphConfiguration<I, ?, ?> config,
- int bufferSize,
- String fileName) {
- this.messageClass = messageClass;
- this.config = config;
- this.bufferSize = bufferSize;
- file = new File(fileName);
- }
-
- @Override
- public void addMessages(MessageStore<I, M> messageStore) throws IOException {
- // Writes messages to its file
- if (file.exists()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: Deleting " + file);
- }
- file.delete();
- }
- file.createNewFile();
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: Creating " + file);
- }
-
- DataOutputStream out = null;
-
- try {
- out = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file), bufferSize));
- int destinationVertexIdCount =
- Iterables.size(messageStore.getDestinationVertices());
- out.writeInt(destinationVertexIdCount);
-
- // Since the message store messages might not be sorted, sort them if
- // necessary
- SortedSet<I> sortedSet;
- if (messageStore.getDestinationVertices() instanceof SortedSet) {
- sortedSet = (SortedSet<I>) messageStore.getDestinationVertices();
- } else {
- sortedSet =
- Sets.newTreeSet(messageStore.getDestinationVertices());
- for (I destinationVertexId : messageStore.getDestinationVertices()) {
- sortedSet.add(destinationVertexId);
- }
- }
-
- // Dump the vertices and their messages in a sorted order
- for (I destinationVertexId : sortedSet) {
- destinationVertexId.write(out);
- Iterable<M> messages =
- messageStore.getVertexMessages(destinationVertexId);
- int messageCount = Iterables.size(messages);
- out.writeInt(messageCount);
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: For vertex id " + destinationVertexId +
- ", messages = " + messageCount + " to file " + file);
- }
- for (M message : messages) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addMessages: Wrote " + message + " to " + file);
- }
- message.write(out);
- }
- }
- } finally {
- if (out != null) {
- out.close();
- }
- }
- }
-
- /**
- * Reads messages for a vertex. It will find the messages only if all
- * previous reads used smaller vertex ids than this one - messages should
- * be retrieved in increasing order of vertex ids.
- *
- * @param vertexId Vertex id for which we want to get messages
- * @return Messages for the selected vertex, or empty list if not used
- * correctly
- * @throws IOException
- */
- @Override
- public Iterable<M> getVertexMessages(I vertexId) throws
- IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("getVertexMessages: Reading for vertex id " + vertexId +
- " (currently " + currentVertexId + ") from " + file);
- }
- if (in == null) {
- startReading();
- }
-
- I nextVertexId = getCurrentVertexId();
- while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) {
- nextVertexId = getNextVertexId();
- }
-
- if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) {
- return EmptyIterable.get();
- }
-
- return readMessagesForCurrentVertex();
- }
-
- @Override
- public void clearVertexMessages(I vertexId) throws IOException { }
-
- @Override
- public void clearAll() throws IOException {
- endReading();
- file.delete();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(file.length());
- FileInputStream input = new FileInputStream(file);
- try {
- byte[] buffer = new byte[bufferSize];
- while (true) {
- int length = input.read(buffer);
- if (length < 0) {
- break;
- }
- out.write(buffer, 0, length);
- }
- } finally {
- input.close();
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- FileOutputStream output = new FileOutputStream(file);
- try {
- long fileLength = in.readLong();
- byte[] buffer = new byte[bufferSize];
- for (long position = 0; position < fileLength; position += bufferSize) {
- int bytes = (int) Math.min(bufferSize, fileLength - position);
- in.readFully(buffer, 0, bytes);
- output.write(buffer);
- }
- } finally {
- output.close();
- }
- }
-
- /**
- * Prepare for reading
- *
- * @throws IOException
- */
- private void startReading() throws IOException {
- currentVertexId = null;
- in = new DataInputStream(
- new BufferedInputStream(new FileInputStream(file), bufferSize));
- verticesLeft = in.readInt();
- if (LOG.isDebugEnabled()) {
- LOG.debug("startReading: File " + file + " with " +
- verticesLeft + " vertices left");
- }
- }
-
- /**
- * Gets current vertex id.
- * <p/>
- * If there is a vertex id whose messages haven't been read yet it
- * will return that vertex id, otherwise it will read and return the next
- * one.
- *
- * @return Current vertex id
- * @throws IOException
- */
- private I getCurrentVertexId() throws IOException {
- if (currentVertexId != null) {
- return currentVertexId;
- } else {
- return getNextVertexId();
- }
- }
-
- /**
- * Gets next vertex id.
- * <p/>
- * If there is a vertex whose messages haven't been read yet it
- * will read and skip over its messages to get to the next vertex.
- *
- * @return Next vertex id
- * @throws IOException
- */
- private I getNextVertexId() throws IOException {
- if (currentVertexId != null) {
- readMessagesForCurrentVertex();
- }
- if (verticesLeft == 0) {
- return null;
- }
- currentVertexId = config.createVertexId();
- currentVertexId.readFields(in);
- return currentVertexId;
- }
-
- /**
- * Reads messages for current vertex.
- *
- * @return Messages for current vertex
- * @throws IOException
- */
- private Collection<M> readMessagesForCurrentVertex() throws IOException {
- int messagesSize = in.readInt();
- List<M> messages = Lists.newArrayListWithCapacity(messagesSize);
- for (int i = 0; i < messagesSize; i++) {
- M message = ReflectionUtils.newInstance(messageClass);
- try {
- message.readFields(in);
- } catch (IOException e) {
- throw new IllegalStateException("readMessagesForCurrentVertex: " +
- "Failed to read message from " + i + " of " +
- messagesSize + " for vertex id " + currentVertexId + " from " +
- file, e);
- }
- messages.add(message);
- }
- currentVertexDone();
- return messages;
- }
-
- /**
- * Release current vertex.
- *
- * @throws IOException
- */
- private void currentVertexDone() throws IOException {
- currentVertexId = null;
- verticesLeft--;
- if (verticesLeft == 0) {
- endReading();
- }
- }
-
- /**
- * Call when we are done reading, for closing files.
- *
- * @throws IOException
- */
- private void endReading() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("endReading: Stopped reading " + file);
- }
- if (in != null) {
- in.close();
- in = null;
- }
- }
-
- /**
- * Create new factory for this message store
- *
- * @param config Hadoop configuration
- * @param <I> Vertex id
- * @param <M> Message data
- * @return Factory
- */
- public static <I extends WritableComparable, M extends Writable>
- MessageStoreFactory<I, M, BasicMessageStore<I, M>> newFactory(
- ImmutableClassesGiraphConfiguration config) {
- return new Factory<I, M>(config);
- }
-
- /**
- * Factory for {@link SequentialFileMessageStore}
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
- private static class Factory<I extends WritableComparable,
- M extends Writable>
- implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> {
- /** Hadoop configuration */
- private final ImmutableClassesGiraphConfiguration config;
- /** Directories in which we'll keep necessary files */
- private final String[] directories;
- /** Buffer size to use when reading and writing */
- private final int bufferSize;
- /** Counter for created message stores */
- private final AtomicInteger storeCounter;
-
- /**
- * Constructor.
- *
- * @param config Hadoop configuration
- */
- public Factory(ImmutableClassesGiraphConfiguration config) {
- this.config = config;
- String jobId = config.get("mapred.job.id", "Unknown Job");
- int taskId = config.getTaskPartition();
- List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
- Collections.shuffle(userPaths);
- directories = new String[userPaths.size()];
- int i = 0;
- for (String path : userPaths) {
- String directory = path + File.separator + jobId + File.separator +
- taskId + File.separator;
- directories[i++] = directory;
- new File(directory).mkdirs();
- }
- this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
- storeCounter = new AtomicInteger();
- }
-
- @Override
- public BasicMessageStore<I, M> newStore(Class<M> messageClass) {
- int idx = Math.abs(storeCounter.getAndIncrement());
- String fileName =
- directories[idx % directories.length] + "messages-" + idx;
- return new SequentialFileMessageStore<I, M>(messageClass, config,
- bufferSize, fileName);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/89445670/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 1a91dfb..c9d869c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -18,14 +18,12 @@
package org.apache.giraph.comm.messages;
-import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -34,7 +32,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
- * Abstract class for {@link MessageStoreByPartition} which allows any kind
+ * Abstract class for {@link MessageStore} which allows any kind
* of object to hold messages for one vertex.
* Simple in memory message store implemented with a two level concurrent
* hash map.
@@ -44,7 +42,7 @@ import org.apache.hadoop.io.WritableComparable;
* @param <T> Type of object which holds messages for one vertex
*/
public abstract class SimpleMessageStore<I extends WritableComparable,
- M extends Writable, T> implements MessageStoreByPartition<I, M> {
+ M extends Writable, T> implements MessageStore<I, M> {
/** Message class */
protected final Class<M> messageClass;
/** Service worker */
@@ -150,16 +148,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
public boolean hasMessagesForVertex(I vertexId) {
ConcurrentMap<I, ?> partitionMap =
map.get(getPartitionId(vertexId));
- return (partitionMap == null) ? false : partitionMap.containsKey(vertexId);
- }
-
- @Override
- public Iterable<I> getDestinationVertices() {
- List<I> vertices = Lists.newArrayList();
- for (ConcurrentMap<I, ?> partitionMap : map.values()) {
- vertices.addAll(partitionMap.keySet());
- }
- return vertices;
+ return partitionMap != null && partitionMap.containsKey(vertexId);
}
@Override
@@ -174,15 +163,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public int getNumberOfMessages() {
- int numberOfMessages = 0;
- for (ConcurrentMap<I, T> partitionMap : map.values()) {
- numberOfMessages += getNumberOfMessagesIn(partitionMap);
- }
- return numberOfMessages;
- }
-
- @Override
public void writePartition(DataOutput out,
int partitionId) throws IOException {
ConcurrentMap<I, T> partitionMap = map.get(partitionId);
@@ -197,15 +177,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(map.size());
- for (int partitionId : map.keySet()) {
- out.writeInt(partitionId);
- writePartition(out, partitionId);
- }
- }
-
- @Override
public void readFieldsForPartition(DataInput in,
int partitionId) throws IOException {
if (in.readBoolean()) {
@@ -221,15 +192,6 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
- public void readFields(DataInput in) throws IOException {
- int numPartitions = in.readInt();
- for (int p = 0; p < numPartitions; p++) {
- int partitionId = in.readInt();
- readFieldsForPartition(in, partitionId);
- }
- }
-
- @Override
public void clearVertexMessages(I vertexId) throws IOException {
ConcurrentMap<I, ?> partitionMap =
map.get(getPartitionId(vertexId));